1 # SPDX-License-Identifier: GPL-3.0-or-later
 
   3 # This file is part of Nominatim. (https://nominatim.org)
 
   5 # Copyright (C) 2023 by the Nominatim developer community.
 
   6 # For a full list of authors see the git log.
 
   8 Complex datatypes used by the Nominatim API.
 
  10 from typing import Optional, Union, Tuple, NamedTuple, TypeVar, Type, Dict, \
 
  12 from collections import abc
 
  16 from struct import unpack
 
  17 from binascii import unhexlify
 
  19 from nominatim.errors import UsageError
 
  20 from nominatim.api.localization import Locales
 
  22 # pylint: disable=no-member,too-many-boolean-expressions,too-many-instance-attributes
 
  24 @dataclasses.dataclass
 
  26     """ Reference a place by Nominatim's internal ID.
 
  28         A PlaceID may reference place from the main table placex, from
 
  29         the interpolation tables or the postcode tables. Place IDs are not
 
  30         stable between installations. You may use this type theefore only
 
  31         with place IDs obtained from the same database.
 
  35     The internal ID of the place to reference.
 
  39 @dataclasses.dataclass
 
  41     """ Reference a place by its OSM ID and potentially the basic category.
 
  43         The OSM ID may refer to places in the main table placex and OSM
 
  47     """ OSM type of the object. Must be one of `N`(node), `W`(way) or
 
  51     """ The OSM ID of the object.
 
  53     osm_class: Optional[str] = None
 
  54     """ The same OSM object may appear multiple times in the database under
 
  55         different categories. The optional class parameter allows to distinguish
 
  56         the different categories and corresponds to the key part of the category.
 
  57         If there are multiple objects in the database and `osm_class` is
 
  58         left out, then one of the objects is returned at random.
 
  61     def __post_init__(self) -> None:
 
  62         if self.osm_type not in ('N', 'W', 'R'):
 
  63             raise ValueError(f"Illegal OSM type '{self.osm_type}'. Must be one of N, W, R.")
 
  66 PlaceRef = Union[PlaceID, OsmID]
 
  69 class Point(NamedTuple):
 
  70     """ A geographic point in WGS84 projection.
 
  77     def lat(self) -> float:
 
  78         """ Return the latitude of the point.
 
  84     def lon(self) -> float:
 
  85         """ Return the longitude of the point.
 
  90     def to_geojson(self) -> str:
 
  91         """ Return the point in GeoJSON format.
 
  93         return f'{{"type": "Point","coordinates": [{self.x}, {self.y}]}}'
 
  97     def from_wkb(wkb: Union[str, bytes]) -> 'Point':
 
  98         """ Create a point from EWKB as returned from the database.
 
 100         if isinstance(wkb, str):
 
 103             raise ValueError(f"Point wkb has unexpected length {len(wkb)}")
 
 105             gtype, srid, x, y = unpack('>iidd', wkb[1:])
 
 107             gtype, srid, x, y = unpack('<iidd', wkb[1:])
 
 109             raise ValueError("WKB has unknown endian value.")
 
 111         if gtype != 0x20000001:
 
 112             raise ValueError("WKB must be a point geometry.")
 
 114             raise ValueError("Only WGS84 WKB supported.")
 
 120     def from_param(inp: Any) -> 'Point':
 
 121         """ Create a point from an input parameter. The parameter
 
 122             may be given as a point, a string or a sequence of
 
 123             strings or floats. Raises a UsageError if the format is
 
 126         if isinstance(inp, Point):
 
 130         if isinstance(inp, str):
 
 132         elif isinstance(inp, abc.Sequence):
 
 136             raise UsageError('Point parameter needs 2 coordinates.')
 
 138             x, y = filter(math.isfinite, map(float, seq))
 
 139         except ValueError as exc:
 
 140             raise UsageError('Point parameter needs to be numbers.') from exc
 
 142         if x < -180.0 or x > 180.0 or y < -90.0 or y > 90.0:
 
 143             raise UsageError('Point coordinates invalid.')
 
 148     def to_wkt(self) -> str:
 
 149         """ Return the WKT representation of the point.
 
 151         return f'POINT({self.x} {self.y})'
 
 155 AnyPoint = Union[Point, Tuple[float, float]]
 
 157 WKB_BBOX_HEADER_LE = b'\x01\x03\x00\x00\x20\xE6\x10\x00\x00\x01\x00\x00\x00\x05\x00\x00\x00'
 
 158 WKB_BBOX_HEADER_BE = b'\x00\x20\x00\x00\x03\x00\x00\x10\xe6\x00\x00\x00\x01\x00\x00\x00\x05'
 
 161     """ A bounding box in WGS84 projection.
 
 163         The coordinates are available as an array in the 'coord'
 
 164         property in the order (minx, miny, maxx, maxy).
 
 166     def __init__(self, minx: float, miny: float, maxx: float, maxy: float) -> None:
 
 167         """ Create a new bounding box with the given coordinates in WGS84
 
 170         self.coords = (minx, miny, maxx, maxy)
 
 174     def minlat(self) -> float:
 
 175         """ Southern-most latitude, corresponding to the minimum y coordinate.
 
 177         return self.coords[1]
 
 181     def maxlat(self) -> float:
 
 182         """ Northern-most latitude, corresponding to the maximum y coordinate.
 
 184         return self.coords[3]
 
 188     def minlon(self) -> float:
 
 189         """ Western-most longitude, corresponding to the minimum x coordinate.
 
 191         return self.coords[0]
 
 195     def maxlon(self) -> float:
 
 196         """ Eastern-most longitude, corresponding to the maximum x coordinate.
 
 198         return self.coords[2]
 
 202     def area(self) -> float:
 
 203         """ Return the area of the box in WGS84.
 
 205         return (self.coords[2] - self.coords[0]) * (self.coords[3] - self.coords[1])
 
 208     def contains(self, pt: Point) -> bool:
 
 209         """ Check if the point is inside or on the boundary of the box.
 
 211         return self.coords[0] <= pt[0] and self.coords[1] <= pt[1]\
 
 212                and self.coords[2] >= pt[0] and self.coords[3] >= pt[1]
 
 215     def to_wkt(self) -> str:
 
 216         """ Return the WKT representation of the Bbox. This
 
 217             is a simple polygon with four points.
 
 219         return 'POLYGON(({0} {1},{0} {3},{2} {3},{2} {1},{0} {1}))'\
 
 220                   .format(*self.coords) # pylint: disable=consider-using-f-string
 
 224     def from_wkb(wkb: Union[None, str, bytes]) -> 'Optional[Bbox]':
 
 225         """ Create a Bbox from a bounding box polygon as returned by
 
 226             the database. Returns `None` if the input value is None.
 
 231         if isinstance(wkb, str):
 
 235             raise ValueError("WKB must be a bounding box polygon")
 
 236         if wkb.startswith(WKB_BBOX_HEADER_LE):
 
 237             x1, y1, _, _, x2, y2 = unpack('<dddddd', wkb[17:65])
 
 238         elif wkb.startswith(WKB_BBOX_HEADER_BE):
 
 239             x1, y1, _, _, x2, y2 = unpack('>dddddd', wkb[17:65])
 
 241             raise ValueError("WKB has wrong header")
 
 243         return Bbox(min(x1, x2), min(y1, y2), max(x1, x2), max(y1, y2))
 
 247     def from_point(pt: Point, buffer: float) -> 'Bbox':
 
 248         """ Return a Bbox around the point with the buffer added to all sides.
 
 250         return Bbox(pt[0] - buffer, pt[1] - buffer,
 
 251                     pt[0] + buffer, pt[1] + buffer)
 
 255     def from_param(inp: Any) -> 'Bbox':
 
 256         """ Return a Bbox from an input parameter. The box may be
 
 257             given as a Bbox, a string or a list or strings or integer.
 
 258             Raises a UsageError if the format is incorrect.
 
 260         if isinstance(inp, Bbox):
 
 264         if isinstance(inp, str):
 
 266         elif isinstance(inp, abc.Sequence):
 
 270             raise UsageError('Bounding box parameter needs 4 coordinates.')
 
 272             x1, y1, x2, y2 = filter(math.isfinite, map(float, seq))
 
 273         except ValueError as exc:
 
 274             raise UsageError('Bounding box parameter needs to be numbers.') from exc
 
 276         x1 = min(180, max(-180, x1))
 
 277         x2 = min(180, max(-180, x2))
 
 278         y1 = min(90, max(-90, y1))
 
 279         y2 = min(90, max(-90, y2))
 
 281         if x1 == x2 or y1 == y2:
 
 282             raise UsageError('Bounding box with invalid parameters.')
 
 284         return Bbox(min(x1, x2), min(y1, y2), max(x1, x2), max(y1, y2))
 
 287 class GeometryFormat(enum.Flag):
 
 288     """ All search functions support returning the full geometry of a place in
 
 289         various formats. The internal geometry is converted by PostGIS to
 
 290         the desired format and then returned as a string. It is possible to
 
 291         request multiple formats at the same time.
 
 294     """ No geometry requested. Alias for a empty flag.
 
 296     GEOJSON = enum.auto()
 
 298     [GeoJSON](https://geojson.org/) format
 
 302     [KML](https://en.wikipedia.org/wiki/Keyhole_Markup_Language) format
 
 306     [SVG](http://www.w3.org/TR/SVG/paths.html) format
 
 310     [WKT](https://en.wikipedia.org/wiki/Well-known_text_representation_of_geometry) format
 
 314 class DataLayer(enum.Flag):
 
 315     """ The `DataLayer` flag type defines the layers that can be selected
 
 316         for reverse and forward search.
 
 318     ADDRESS = enum.auto()
 
 319     """ The address layer contains all places relevant for addresses:
 
 320         fully qualified addresses with a house number (or a house name equivalent,
 
 321         for some addresses) and places that can be part of an address like
 
 322         roads, cities, states.
 
 325     """ Layer for points of interest like shops, restaurants but also
 
 326         recycling bins or postboxes.
 
 328     RAILWAY = enum.auto()
 
 329     """ Layer with railway features including tracks and other infrastructure.
 
 330         Note that in Nominatim's standard configuration, only very few railway
 
 331         features are imported into the database. Thus a custom configuration
 
 332         is required to make full use of this layer.
 
 334     NATURAL = enum.auto()
 
 335     """ Layer with natural features like rivers, lakes and mountains.
 
 337     MANMADE = enum.auto()
 
 338     """ Layer with other human-made features and boundaries. This layer is
 
 339         the catch-all and includes all features not covered by the other
 
 340         layers. A typical example for this layer are national park boundaries.
 
 344 def format_country(cc: Any) -> List[str]:
 
 345     """ Extract a list of country codes from the input which may be either
 
 346         a string or list of strings. Filters out all values that are not
 
 350     if isinstance(cc, str):
 
 351         clist = cc.split(',')
 
 352     elif isinstance(cc, abc.Sequence):
 
 355         raise UsageError("Parameter 'country' needs to be a comma-separated list "
 
 356                          "or a Python list of strings.")
 
 358     return [cc.lower() for cc in clist if isinstance(cc, str) and len(cc) == 2]
 
 361 def format_excluded(ids: Any) -> List[int]:
 
 362     """ Extract a list of place ids from the input which may be either
 
 363         a string or a list of strings or ints. Ignores empty value but
 
 364         throws a UserError on anything that cannot be converted to int.
 
 367     if isinstance(ids, str):
 
 368         plist = [s.strip() for s in ids.split(',')]
 
 369     elif isinstance(ids, abc.Sequence):
 
 372         raise UsageError("Parameter 'excluded' needs to be a comma-separated list "
 
 373                          "or a Python list of numbers.")
 
 374     if not all(isinstance(i, int) or
 
 375                (isinstance(i, str) and (not i or i.isdigit())) for i in plist):
 
 376         raise UsageError("Parameter 'excluded' only takes place IDs.")
 
 378     return [int(id) for id in plist if id] or [0]
 
 381 def format_categories(categories: List[Tuple[str, str]]) -> List[Tuple[str, str]]:
 
 382     """ Extract a list of categories. Currently a noop.
 
 386 TParam = TypeVar('TParam', bound='LookupDetails') # pylint: disable=invalid-name
 
 388 @dataclasses.dataclass
 
 390     """ Collection of parameters that define which kind of details are
 
 391         returned with a lookup or details result.
 
 393     geometry_output: GeometryFormat = GeometryFormat.NONE
 
 394     """ Add the full geometry of the place to the result. Multiple
 
 395         formats may be selected. Note that geometries can become quite large.
 
 397     address_details: bool = False
 
 398     """ Get detailed information on the places that make up the address
 
 401     linked_places: bool = False
 
 402     """ Get detailed information on the places that link to the result.
 
 404     parented_places: bool = False
 
 405     """ Get detailed information on all places that this place is a parent
 
 406         for, i.e. all places for which it provides the address details.
 
 407         Only POI places can have parents.
 
 409     keywords: bool = False
 
 410     """ Add information about the search terms used for this place.
 
 412     geometry_simplification: float = 0.0
 
 413     """ Simplification factor for a geometry in degrees WGS. A factor of
 
 414         0.0 means the original geometry is kept. The higher the value, the
 
 415         more the geometry gets simplified.
 
 417     locales: Locales = Locales()
 
 418     """ Preferred languages for localization of results.
 
 422     def from_kwargs(cls: Type[TParam], kwargs: Dict[str, Any]) -> TParam:
 
 423         """ Load the data fields of the class from a dictionary.
 
 424             Unknown entries in the dictionary are ignored, missing ones
 
 425             get the default setting.
 
 427             The function supports type checking and throws a UsageError
 
 428             when the value does not fit.
 
 430         def _check_field(v: Any, field: 'dataclasses.Field[Any]') -> Any:
 
 432                 return field.default_factory() \
 
 433                        if field.default_factory != dataclasses.MISSING \
 
 435             if field.metadata and 'transform' in field.metadata:
 
 436                 return field.metadata['transform'](v)
 
 437             if not isinstance(v, field.type):
 
 438                 raise UsageError(f"Parameter '{field.name}' needs to be of {field.type!s}.")
 
 441         return cls(**{f.name: _check_field(kwargs[f.name], f)
 
 442                       for f in dataclasses.fields(cls) if f.name in kwargs})
 
 445 @dataclasses.dataclass
 
 446 class ReverseDetails(LookupDetails):
 
 447     """ Collection of parameters for the reverse call.
 
 449     max_rank: int = dataclasses.field(default=30,
 
 450                                       metadata={'transform': lambda v: max(0, min(v, 30))}
 
 452     """ Highest address rank to return.
 
 454     layers: DataLayer = DataLayer.ADDRESS | DataLayer.POI
 
 455     """ Filter which kind of data to include.
 
 458 @dataclasses.dataclass
 
 459 class SearchDetails(LookupDetails):
 
 460     """ Collection of parameters for the search call.
 
 462     max_results: int = 10
 
 463     """ Maximum number of results to be returned. The actual number of results
 
 466     min_rank: int = dataclasses.field(default=0,
 
 467                                       metadata={'transform': lambda v: max(0, min(v, 30))}
 
 469     """ Lowest address rank to return.
 
 471     max_rank: int = dataclasses.field(default=30,
 
 472                                       metadata={'transform': lambda v: max(0, min(v, 30))}
 
 474     """ Highest address rank to return.
 
 476     layers: Optional[DataLayer] = dataclasses.field(default=None,
 
 477                                                     metadata={'transform': lambda r : r})
 
 478     """ Filter which kind of data to include. When 'None' (the default) then
 
 479         filtering by layers is disabled.
 
 481     countries: List[str] = dataclasses.field(default_factory=list,
 
 482                                              metadata={'transform': format_country})
 
 483     """ Restrict search results to the given countries. An empty list (the
 
 484         default) will disable this filter.
 
 486     excluded: List[int] = dataclasses.field(default_factory=list,
 
 487                                             metadata={'transform': format_excluded})
 
 488     """ List of OSM objects to exclude from the results. Currently only
 
 489         works when the internal place ID is given.
 
 490         An empty list (the default) will disable this filter.
 
 492     viewbox: Optional[Bbox] = dataclasses.field(default=None,
 
 493                                                 metadata={'transform': Bbox.from_param})
 
 494     """ Focus the search on a given map area.
 
 496     bounded_viewbox: bool = False
 
 497     """ Use 'viewbox' as a filter and restrict results to places within the
 
 500     near: Optional[Point] = dataclasses.field(default=None,
 
 501                                               metadata={'transform': Point.from_param})
 
 502     """ Order results by distance to the given point.
 
 504     near_radius: Optional[float] = dataclasses.field(default=None,
 
 505                                               metadata={'transform': lambda r : r})
 
 506     """ Use near point as a filter and drop results outside the given
 
 507         radius. Radius is given in degrees WSG84.
 
 509     categories: List[Tuple[str, str]] = dataclasses.field(default_factory=list,
 
 510                                                           metadata={'transform': format_categories})
 
 511     """ Restrict search to places with one of the given class/type categories.
 
 512         An empty list (the default) will disable this filter.
 
 514     viewbox_x2: Optional[Bbox] = None
 
 516     def __post_init__(self) -> None:
 
 517         if self.viewbox is not None:
 
 518             xext = (self.viewbox.maxlon - self.viewbox.minlon)/2
 
 519             yext = (self.viewbox.maxlat - self.viewbox.minlat)/2
 
 520             self.viewbox_x2 = Bbox(self.viewbox.minlon - xext, self.viewbox.minlat - yext,
 
 521                                    self.viewbox.maxlon + xext, self.viewbox.maxlat + yext)
 
 524     def restrict_min_max_rank(self, new_min: int, new_max: int) -> None:
 
 525         """ Change the min_rank and max_rank fields to respect the
 
 528         assert new_min <= new_max
 
 529         self.min_rank = max(self.min_rank, new_min)
 
 530         self.max_rank = min(self.max_rank, new_max)
 
 533     def is_impossible(self) -> bool:
 
 534         """ Check if the parameter configuration is contradictionary and
 
 535             cannot yield any results.
 
 537         return (self.min_rank > self.max_rank
 
 538                 or (self.bounded_viewbox
 
 539                     and self.viewbox is not None and self.near is not None
 
 540                     and self.viewbox.contains(self.near))
 
 541                 or (self.layers is not None and not self.layers)
 
 542                 or (self.max_rank <= 4 and
 
 543                     self.layers is not None and not self.layers & DataLayer.ADDRESS))
 
 546     def layer_enabled(self, layer: DataLayer) -> bool:
 
 547         """ Check if the given layer has been chosen. Also returns
 
 548             true when layer restriction has been disabled completely.
 
 550         return self.layers is None or bool(self.layers & layer)