1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Custom types for SQLAlchemy.
10 from __future__ import annotations
11 from typing import Callable, Any, cast
14 import sqlalchemy as sa
15 from sqlalchemy.ext.compiler import compiles
16 from sqlalchemy import types
18 from nominatim.typing import SaColumn, SaBind
22 class Geometry_DistanceSpheroid(sa.sql.expression.FunctionElement[float]):
23 """ Function to compute the spherical distance in meters.
26 name = 'Geometry_DistanceSpheroid'
30 @compiles(Geometry_DistanceSpheroid) # type: ignore[no-untyped-call, misc]
31 def _default_distance_spheroid(element: SaColumn,
32 compiler: 'sa.Compiled', **kw: Any) -> str:
33 return "ST_DistanceSpheroid(%s,"\
34 " 'SPHEROID[\"WGS 84\",6378137,298.257223563, AUTHORITY[\"EPSG\",\"7030\"]]')"\
35 % compiler.process(element.clauses, **kw)
38 @compiles(Geometry_DistanceSpheroid, 'sqlite') # type: ignore[no-untyped-call, misc]
39 def _spatialite_distance_spheroid(element: SaColumn,
40 compiler: 'sa.Compiled', **kw: Any) -> str:
41 return "Distance(%s, true)" % compiler.process(element.clauses, **kw)
44 class Geometry(types.UserDefinedType): # type: ignore[type-arg]
45 """ Simplified type decorator for PostGIS geometry. This type
46 only supports geometries in 4326 projection.
50 def __init__(self, subtype: str = 'Geometry'):
51 self.subtype = subtype
54 def get_col_spec(self) -> str:
55 return f'GEOMETRY({self.subtype}, 4326)'
58 def bind_processor(self, dialect: 'sa.Dialect') -> Callable[[Any], str]:
59 def process(value: Any) -> str:
60 if isinstance(value, str):
63 return cast(str, value.to_wkt())
67 def result_processor(self, dialect: 'sa.Dialect', coltype: object) -> Callable[[Any], str]:
68 def process(value: Any) -> str:
69 assert isinstance(value, str)
74 def column_expression(self, col: SaColumn) -> SaColumn:
75 return sa.func.ST_AsEWKB(col)
78 def bind_expression(self, bindvalue: SaBind) -> SaColumn:
79 return sa.func.ST_GeomFromText(bindvalue, sa.text('4326'), type_=self)
82 class comparator_factory(types.UserDefinedType.Comparator): # type: ignore[type-arg]
84 def intersects(self, other: SaColumn) -> 'sa.Operators':
85 return self.op('&&')(other)
87 def is_line_like(self) -> SaColumn:
88 return sa.func.ST_GeometryType(self, type_=sa.String).in_(('ST_LineString',
89 'ST_MultiLineString'))
91 def is_area(self) -> SaColumn:
92 return sa.func.ST_GeometryType(self, type_=sa.String).in_(('ST_Polygon',
96 def ST_DWithin(self, other: SaColumn, distance: SaColumn) -> SaColumn:
97 return sa.func.ST_DWithin(self, other, distance, type_=sa.Boolean)
100 def ST_DWithin_no_index(self, other: SaColumn, distance: SaColumn) -> SaColumn:
101 return sa.func.ST_DWithin(sa.func.coalesce(sa.null(), self),
102 other, distance, type_=sa.Boolean)
105 def ST_Intersects_no_index(self, other: SaColumn) -> 'sa.Operators':
106 return sa.func.coalesce(sa.null(), self).op('&&')(other)
109 def ST_Distance(self, other: SaColumn) -> SaColumn:
110 return sa.func.ST_Distance(self, other, type_=sa.Float)
113 def ST_Contains(self, other: SaColumn) -> SaColumn:
114 return sa.func.ST_Contains(self, other, type_=sa.Boolean)
117 def ST_CoveredBy(self, other: SaColumn) -> SaColumn:
118 return sa.func.ST_CoveredBy(self, other, type_=sa.Boolean)
121 def ST_ClosestPoint(self, other: SaColumn) -> SaColumn:
122 return sa.func.ST_ClosestPoint(self, other, type_=Geometry)
125 def ST_Buffer(self, other: SaColumn) -> SaColumn:
126 return sa.func.ST_Buffer(self, other, type_=Geometry)
129 def ST_Expand(self, other: SaColumn) -> SaColumn:
130 return sa.func.ST_Expand(self, other, type_=Geometry)
133 def ST_Collect(self) -> SaColumn:
134 return sa.func.ST_Collect(self, type_=Geometry)
137 def ST_Centroid(self) -> SaColumn:
138 return sa.func.ST_Centroid(self, type_=Geometry)
141 def ST_LineInterpolatePoint(self, other: SaColumn) -> SaColumn:
142 return sa.func.ST_LineInterpolatePoint(self, other, type_=Geometry)
145 def ST_LineLocatePoint(self, other: SaColumn) -> SaColumn:
146 return sa.func.ST_LineLocatePoint(self, other, type_=sa.Float)
149 def distance_spheroid(self, other: SaColumn) -> SaColumn:
150 return Geometry_DistanceSpheroid(self, other)
153 @compiles(Geometry, 'sqlite') # type: ignore[no-untyped-call]
154 def get_col_spec(self, *args, **kwargs): # type: ignore[no-untyped-def]
158 SQLITE_FUNCTION_ALIAS = (
159 ('ST_AsEWKB', sa.Text, 'AsEWKB'),
160 ('ST_GeomFromEWKT', Geometry, 'GeomFromEWKT'),
161 ('ST_AsGeoJSON', sa.Text, 'AsGeoJSON'),
162 ('ST_AsKML', sa.Text, 'AsKML'),
163 ('ST_AsSVG', sa.Text, 'AsSVG'),
166 def _add_function_alias(func: str, ftype: type, alias: str) -> None:
167 _FuncDef = type(func, (sa.sql.functions.GenericFunction, ), {
171 "inherit_cache": True})
173 func_templ = f"{alias}(%s)"
175 def _sqlite_impl(element: Any, compiler: Any, **kw: Any) -> Any:
176 return func_templ % compiler.process(element.clauses, **kw)
178 compiles(_FuncDef, 'sqlite')(_sqlite_impl) # type: ignore[no-untyped-call]
180 for alias in SQLITE_FUNCTION_ALIAS:
181 _add_function_alias(*alias)