1 # SPDX-License-Identifier: GPL-3.0-or-later
 
   3 # This file is part of Nominatim. (https://nominatim.org)
 
   5 # Copyright (C) 2023 by the Nominatim developer community.
 
   6 # For a full list of authors see the git log.
 
   8 Custom types for SQLAlchemy.
 
  10 from __future__ import annotations
 
  11 from typing import Callable, Any, cast
 
  14 import sqlalchemy as sa
 
  15 from sqlalchemy.ext.compiler import compiles
 
  16 from sqlalchemy import types
 
  18 from nominatim.typing import SaColumn, SaBind
 
  22 class Geometry_DistanceSpheroid(sa.sql.expression.FunctionElement[float]):
 
  23     """ Function to compute the spherical distance in meters.
 
  26     name = 'Geometry_DistanceSpheroid'
 
  30 @compiles(Geometry_DistanceSpheroid) # type: ignore[no-untyped-call, misc]
 
  31 def _default_distance_spheroid(element: Geometry_DistanceSpheroid,
 
  32                                compiler: 'sa.Compiled', **kw: Any) -> str:
 
  33     return "ST_DistanceSpheroid(%s,"\
 
  34            " 'SPHEROID[\"WGS 84\",6378137,298.257223563, AUTHORITY[\"EPSG\",\"7030\"]]')"\
 
  35              % compiler.process(element.clauses, **kw)
 
  38 @compiles(Geometry_DistanceSpheroid, 'sqlite') # type: ignore[no-untyped-call, misc]
 
  39 def _spatialite_distance_spheroid(element: Geometry_DistanceSpheroid,
 
  40                                   compiler: 'sa.Compiled', **kw: Any) -> str:
 
  41     return "COALESCE(Distance(%s, true), 0.0)" % compiler.process(element.clauses, **kw)
 
  44 class Geometry_IsLineLike(sa.sql.expression.FunctionElement[Any]):
 
  45     """ Check if the geometry is a line or multiline.
 
  47     name = 'Geometry_IsLineLike'
 
  51 @compiles(Geometry_IsLineLike) # type: ignore[no-untyped-call, misc]
 
  52 def _default_is_line_like(element: Geometry_IsLineLike,
 
  53                           compiler: 'sa.Compiled', **kw: Any) -> str:
 
  54     return "ST_GeometryType(%s) IN ('ST_LineString', 'ST_MultiLineString')" % \
 
  55                compiler.process(element.clauses, **kw)
 
  58 @compiles(Geometry_IsLineLike, 'sqlite') # type: ignore[no-untyped-call, misc]
 
  59 def _sqlite_is_line_like(element: Geometry_IsLineLike,
 
  60                          compiler: 'sa.Compiled', **kw: Any) -> str:
 
  61     return "ST_GeometryType(%s) IN ('LINESTRING', 'MULTILINESTRING')" % \
 
  62                compiler.process(element.clauses, **kw)
 
  65 class Geometry_IsAreaLike(sa.sql.expression.FunctionElement[Any]):
 
  66     """ Check if the geometry is a polygon or multipolygon.
 
  68     name = 'Geometry_IsLineLike'
 
  72 @compiles(Geometry_IsAreaLike) # type: ignore[no-untyped-call, misc]
 
  73 def _default_is_area_like(element: Geometry_IsAreaLike,
 
  74                           compiler: 'sa.Compiled', **kw: Any) -> str:
 
  75     return "ST_GeometryType(%s) IN ('ST_Polygon', 'ST_MultiPolygon')" % \
 
  76                compiler.process(element.clauses, **kw)
 
  79 @compiles(Geometry_IsAreaLike, 'sqlite') # type: ignore[no-untyped-call, misc]
 
  80 def _sqlite_is_area_like(element: Geometry_IsAreaLike,
 
  81                          compiler: 'sa.Compiled', **kw: Any) -> str:
 
  82     return "ST_GeometryType(%s) IN ('POLYGON', 'MULTIPOLYGON')" % \
 
  83                compiler.process(element.clauses, **kw)
 
  86 class Geometry_IntersectsBbox(sa.sql.expression.FunctionElement[Any]):
 
  87     """ Check if the bounding boxes of the given geometries intersect.
 
  89     name = 'Geometry_IntersectsBbox'
 
  93 @compiles(Geometry_IntersectsBbox) # type: ignore[no-untyped-call, misc]
 
  94 def _default_intersects(element: Geometry_IntersectsBbox,
 
  95                         compiler: 'sa.Compiled', **kw: Any) -> str:
 
  96     arg1, arg2 = list(element.clauses)
 
  97     return "%s && %s" % (compiler.process(arg1, **kw), compiler.process(arg2, **kw))
 
 100 @compiles(Geometry_IntersectsBbox, 'sqlite') # type: ignore[no-untyped-call, misc]
 
 101 def _sqlite_intersects(element: Geometry_IntersectsBbox,
 
 102                        compiler: 'sa.Compiled', **kw: Any) -> str:
 
 103     return "MbrIntersects(%s) = 1" % compiler.process(element.clauses, **kw)
 
 106 class Geometry_ColumnIntersectsBbox(sa.sql.expression.FunctionElement[Any]):
 
 107     """ Check if the bounding box of the geometry intersects with the
 
 108         given table column, using the spatial index for the column.
 
 110         The index must exist or the query may return nothing.
 
 112     name = 'Geometry_ColumnIntersectsBbox'
 
 116 @compiles(Geometry_ColumnIntersectsBbox) # type: ignore[no-untyped-call, misc]
 
 117 def default_intersects_column(element: Geometry_ColumnIntersectsBbox,
 
 118                               compiler: 'sa.Compiled', **kw: Any) -> str:
 
 119     arg1, arg2 = list(element.clauses)
 
 120     return "%s && %s" % (compiler.process(arg1, **kw), compiler.process(arg2, **kw))
 
 123 @compiles(Geometry_ColumnIntersectsBbox, 'sqlite') # type: ignore[no-untyped-call, misc]
 
 124 def spatialite_intersects_column(element: Geometry_ColumnIntersectsBbox,
 
 125                                  compiler: 'sa.Compiled', **kw: Any) -> str:
 
 126     arg1, arg2 = list(element.clauses)
 
 127     return "MbrIntersects(%s, %s) = 1 and "\
 
 128            "%s.ROWID IN (SELECT ROWID FROM SpatialIndex "\
 
 129                         "WHERE f_table_name = '%s' AND f_geometry_column = '%s' "\
 
 130                         "AND search_frame = %s)" %(
 
 131               compiler.process(arg1, **kw),
 
 132               compiler.process(arg2, **kw),
 
 133               arg1.table.name, arg1.table.name, arg1.name,
 
 134               compiler.process(arg2, **kw))
 
 137 class Geometry_ColumnDWithin(sa.sql.expression.FunctionElement[Any]):
 
 138     """ Check if the geometry is within the distance of the
 
 139         given table column, using the spatial index for the column.
 
 141         The index must exist or the query may return nothing.
 
 143     name = 'Geometry_ColumnDWithin'
 
 147 @compiles(Geometry_ColumnDWithin) # type: ignore[no-untyped-call, misc]
 
 148 def default_dwithin_column(element: Geometry_ColumnDWithin,
 
 149                            compiler: 'sa.Compiled', **kw: Any) -> str:
 
 150     return "ST_DWithin(%s)" % compiler.process(element.clauses, **kw)
 
 152 @compiles(Geometry_ColumnDWithin, 'sqlite') # type: ignore[no-untyped-call, misc]
 
 153 def spatialite_dwithin_column(element: Geometry_ColumnDWithin,
 
 154                               compiler: 'sa.Compiled', **kw: Any) -> str:
 
 155     geom1, geom2, dist = list(element.clauses)
 
 156     return "ST_Distance(%s, %s) < %s and "\
 
 157            "%s.ROWID IN (SELECT ROWID FROM SpatialIndex "\
 
 158                         "WHERE f_table_name = '%s' AND f_geometry_column = '%s' "\
 
 159                         "AND search_frame = ST_Expand(%s, %s))" %(
 
 160               compiler.process(geom1, **kw),
 
 161               compiler.process(geom2, **kw),
 
 162               compiler.process(dist, **kw),
 
 163               geom1.table.name, geom1.table.name, geom1.name,
 
 164               compiler.process(geom2, **kw),
 
 165               compiler.process(dist, **kw))
 
 168 class Geometry(types.UserDefinedType): # type: ignore[type-arg]
 
 169     """ Simplified type decorator for PostGIS geometry. This type
 
 170         only supports geometries in 4326 projection.
 
 174     def __init__(self, subtype: str = 'Geometry'):
 
 175         self.subtype = subtype
 
 178     def get_col_spec(self) -> str:
 
 179         return f'GEOMETRY({self.subtype}, 4326)'
 
 182     def bind_processor(self, dialect: 'sa.Dialect') -> Callable[[Any], str]:
 
 183         def process(value: Any) -> str:
 
 184             if isinstance(value, str):
 
 187             return cast(str, value.to_wkt())
 
 191     def result_processor(self, dialect: 'sa.Dialect', coltype: object) -> Callable[[Any], str]:
 
 192         def process(value: Any) -> str:
 
 193             assert isinstance(value, str)
 
 198     def column_expression(self, col: SaColumn) -> SaColumn:
 
 199         return sa.func.ST_AsEWKB(col)
 
 202     def bind_expression(self, bindvalue: SaBind) -> SaColumn:
 
 203         return sa.func.ST_GeomFromText(bindvalue, sa.text('4326'), type_=self)
 
 206     class comparator_factory(types.UserDefinedType.Comparator): # type: ignore[type-arg]
 
 208         def intersects(self, other: SaColumn, use_index: bool = True) -> 'sa.Operators':
 
 210                 return Geometry_IntersectsBbox(sa.func.coalesce(sa.null(), self.expr), other)
 
 212             if isinstance(self.expr, sa.Column):
 
 213                 return Geometry_ColumnIntersectsBbox(self.expr, other)
 
 215             return Geometry_IntersectsBbox(self.expr, other)
 
 218         def is_line_like(self) -> SaColumn:
 
 219             return Geometry_IsLineLike(self)
 
 222         def is_area(self) -> SaColumn:
 
 223             return Geometry_IsAreaLike(self)
 
 226         def within_distance(self, other: SaColumn, distance: SaColumn) -> SaColumn:
 
 227             if isinstance(self.expr, sa.Column):
 
 228                 return Geometry_ColumnDWithin(self.expr, other, distance)
 
 230             return self.ST_Distance(other) < distance
 
 233         def ST_Distance(self, other: SaColumn) -> SaColumn:
 
 234             return sa.func.ST_Distance(self, other, type_=sa.Float)
 
 237         def ST_Contains(self, other: SaColumn) -> SaColumn:
 
 238             return sa.func.ST_Contains(self, other, type_=sa.Boolean)
 
 241         def ST_CoveredBy(self, other: SaColumn) -> SaColumn:
 
 242             return sa.func.ST_CoveredBy(self, other, type_=sa.Boolean)
 
 245         def ST_ClosestPoint(self, other: SaColumn) -> SaColumn:
 
 246             return sa.func.coalesce(sa.func.ST_ClosestPoint(self, other, type_=Geometry),
 
 250         def ST_Buffer(self, other: SaColumn) -> SaColumn:
 
 251             return sa.func.ST_Buffer(self, other, type_=Geometry)
 
 254         def ST_Expand(self, other: SaColumn) -> SaColumn:
 
 255             return sa.func.ST_Expand(self, other, type_=Geometry)
 
 258         def ST_Collect(self) -> SaColumn:
 
 259             return sa.func.ST_Collect(self, type_=Geometry)
 
 262         def ST_Centroid(self) -> SaColumn:
 
 263             return sa.func.ST_Centroid(self, type_=Geometry)
 
 266         def ST_LineInterpolatePoint(self, other: SaColumn) -> SaColumn:
 
 267             return sa.func.ST_LineInterpolatePoint(self, other, type_=Geometry)
 
 270         def ST_LineLocatePoint(self, other: SaColumn) -> SaColumn:
 
 271             return sa.func.ST_LineLocatePoint(self, other, type_=sa.Float)
 
 274         def distance_spheroid(self, other: SaColumn) -> SaColumn:
 
 275             return Geometry_DistanceSpheroid(self, other)
 
 278 @compiles(Geometry, 'sqlite') # type: ignore[no-untyped-call]
 
 279 def get_col_spec(self, *args, **kwargs): # type: ignore[no-untyped-def]
 
 283 SQLITE_FUNCTION_ALIAS = (
 
 284     ('ST_AsEWKB', sa.Text, 'AsEWKB'),
 
 285     ('ST_GeomFromEWKT', Geometry, 'GeomFromEWKT'),
 
 286     ('ST_AsGeoJSON', sa.Text, 'AsGeoJSON'),
 
 287     ('ST_AsKML', sa.Text, 'AsKML'),
 
 288     ('ST_AsSVG', sa.Text, 'AsSVG'),
 
 289     ('ST_LineLocatePoint', sa.Float, 'ST_Line_Locate_Point'),
 
 290     ('ST_LineInterpolatePoint', sa.Float, 'ST_Line_Interpolate_Point'),
 
 293 def _add_function_alias(func: str, ftype: type, alias: str) -> None:
 
 294     _FuncDef = type(func, (sa.sql.functions.GenericFunction, ), {
 
 298         "inherit_cache": True})
 
 300     func_templ = f"{alias}(%s)"
 
 302     def _sqlite_impl(element: Any, compiler: Any, **kw: Any) -> Any:
 
 303         return func_templ % compiler.process(element.clauses, **kw)
 
 305     compiles(_FuncDef, 'sqlite')(_sqlite_impl) # type: ignore[no-untyped-call]
 
 307 for alias in SQLITE_FUNCTION_ALIAS:
 
 308     _add_function_alias(*alias)