]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/db/sqlalchemy_types.py
4171d70b0d19ce20d764cea22a3ec55a7e87d5c9
[nominatim.git] / nominatim / db / sqlalchemy_types.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Custom types for SQLAlchemy.
9 """
10 from __future__ import annotations
11 from typing import Callable, Any, cast
12 import sys
13
14 import sqlalchemy as sa
15 from sqlalchemy.ext.compiler import compiles
16 from sqlalchemy import types
17
18 from nominatim.typing import SaColumn, SaBind
19
20 #pylint: disable=all
21
22 class Geometry_DistanceSpheroid(sa.sql.expression.FunctionElement[float]):
23     """ Function to compute the spherical distance in meters.
24     """
25     type = sa.Float()
26     name = 'Geometry_DistanceSpheroid'
27     inherit_cache = True
28
29
30 @compiles(Geometry_DistanceSpheroid) # type: ignore[no-untyped-call, misc]
31 def _default_distance_spheroid(element: SaColumn,
32                                compiler: 'sa.Compiled', **kw: Any) -> str:
33     return "ST_DistanceSpheroid(%s,"\
34            " 'SPHEROID[\"WGS 84\",6378137,298.257223563, AUTHORITY[\"EPSG\",\"7030\"]]')"\
35              % compiler.process(element.clauses, **kw)
36
37
38 @compiles(Geometry_DistanceSpheroid, 'sqlite') # type: ignore[no-untyped-call, misc]
39 def _spatialite_distance_spheroid(element: SaColumn,
40                                   compiler: 'sa.Compiled', **kw: Any) -> str:
41     return "Distance(%s, true)" % compiler.process(element.clauses, **kw)
42
43
44 class Geometry(types.UserDefinedType): # type: ignore[type-arg]
45     """ Simplified type decorator for PostGIS geometry. This type
46         only supports geometries in 4326 projection.
47     """
48     cache_ok = True
49
50     def __init__(self, subtype: str = 'Geometry'):
51         self.subtype = subtype
52
53
54     def get_col_spec(self) -> str:
55         return f'GEOMETRY({self.subtype}, 4326)'
56
57
58     def bind_processor(self, dialect: 'sa.Dialect') -> Callable[[Any], str]:
59         def process(value: Any) -> str:
60             if isinstance(value, str):
61                 return value
62
63             return cast(str, value.to_wkt())
64         return process
65
66
67     def result_processor(self, dialect: 'sa.Dialect', coltype: object) -> Callable[[Any], str]:
68         def process(value: Any) -> str:
69             assert isinstance(value, str)
70             return value
71         return process
72
73
74     def column_expression(self, col: SaColumn) -> SaColumn:
75         return sa.func.ST_AsEWKB(col)
76
77
78     def bind_expression(self, bindvalue: SaBind) -> SaColumn:
79         return sa.func.ST_GeomFromText(bindvalue, sa.text('4326'), type_=self)
80
81
82     class comparator_factory(types.UserDefinedType.Comparator): # type: ignore[type-arg]
83
84         def intersects(self, other: SaColumn) -> 'sa.Operators':
85             return self.op('&&')(other)
86
87         def is_line_like(self) -> SaColumn:
88             return sa.func.ST_GeometryType(self, type_=sa.String).in_(('ST_LineString',
89                                                                        'ST_MultiLineString'))
90
91         def is_area(self) -> SaColumn:
92             return sa.func.ST_GeometryType(self, type_=sa.String).in_(('ST_Polygon',
93                                                                        'ST_MultiPolygon'))
94
95
96         def ST_DWithin(self, other: SaColumn, distance: SaColumn) -> SaColumn:
97             return sa.func.ST_DWithin(self, other, distance, type_=sa.Boolean)
98
99
100         def ST_DWithin_no_index(self, other: SaColumn, distance: SaColumn) -> SaColumn:
101             return sa.func.ST_DWithin(sa.func.coalesce(sa.null(), self),
102                                       other, distance, type_=sa.Boolean)
103
104
105         def ST_Intersects_no_index(self, other: SaColumn) -> 'sa.Operators':
106             return sa.func.coalesce(sa.null(), self).op('&&')(other)
107
108
109         def ST_Distance(self, other: SaColumn) -> SaColumn:
110             return sa.func.ST_Distance(self, other, type_=sa.Float)
111
112
113         def ST_Contains(self, other: SaColumn) -> SaColumn:
114             return sa.func.ST_Contains(self, other, type_=sa.Boolean)
115
116
117         def ST_CoveredBy(self, other: SaColumn) -> SaColumn:
118             return sa.func.ST_CoveredBy(self, other, type_=sa.Boolean)
119
120
121         def ST_ClosestPoint(self, other: SaColumn) -> SaColumn:
122             return sa.func.ST_ClosestPoint(self, other, type_=Geometry)
123
124
125         def ST_Buffer(self, other: SaColumn) -> SaColumn:
126             return sa.func.ST_Buffer(self, other, type_=Geometry)
127
128
129         def ST_Expand(self, other: SaColumn) -> SaColumn:
130             return sa.func.ST_Expand(self, other, type_=Geometry)
131
132
133         def ST_Collect(self) -> SaColumn:
134             return sa.func.ST_Collect(self, type_=Geometry)
135
136
137         def ST_Centroid(self) -> SaColumn:
138             return sa.func.ST_Centroid(self, type_=Geometry)
139
140
141         def ST_LineInterpolatePoint(self, other: SaColumn) -> SaColumn:
142             return sa.func.ST_LineInterpolatePoint(self, other, type_=Geometry)
143
144
145         def ST_LineLocatePoint(self, other: SaColumn) -> SaColumn:
146             return sa.func.ST_LineLocatePoint(self, other, type_=sa.Float)
147
148
149         def distance_spheroid(self, other: SaColumn) -> SaColumn:
150             return Geometry_DistanceSpheroid(self, other)
151
152
153 @compiles(Geometry, 'sqlite') # type: ignore[no-untyped-call]
154 def get_col_spec(self, *args, **kwargs): # type: ignore[no-untyped-def]
155     return 'GEOMETRY'
156
157
158 SQLITE_FUNCTION_ALIAS = (
159     ('ST_AsEWKB', sa.Text, 'AsEWKB'),
160     ('ST_GeomFromEWKT', Geometry, 'GeomFromEWKT'),
161     ('ST_AsGeoJSON', sa.Text, 'AsGeoJSON'),
162     ('ST_AsKML', sa.Text, 'AsKML'),
163     ('ST_AsSVG', sa.Text, 'AsSVG'),
164 )
165
166 def _add_function_alias(func: str, ftype: type, alias: str) -> None:
167     _FuncDef = type(func, (sa.sql.functions.GenericFunction, ), {
168         "type": ftype,
169         "name": func,
170         "identifier": func,
171         "inherit_cache": True})
172
173     func_templ = f"{alias}(%s)"
174
175     def _sqlite_impl(element: Any, compiler: Any, **kw: Any) -> Any:
176         return func_templ % compiler.process(element.clauses, **kw)
177
178     compiles(_FuncDef, 'sqlite')(_sqlite_impl) # type: ignore[no-untyped-call]
179
180 for alias in SQLITE_FUNCTION_ALIAS:
181     _add_function_alias(*alias)
182
183
184