1 # SPDX-License-Identifier: GPL-2.0-only
 
   3 # This file is part of Nominatim. (https://nominatim.org)
 
   5 # Copyright (C) 2025 by the Nominatim developer community.
 
   6 # For a full list of authors see the git log.
 
   8 Helper classes for filling the place table.
 
  13 from .geometry_alias import ALIASES
 
  17     """ Helper class to collect contents from a BDD table row and
 
  18         insert it into the place table.
 
  20     def __init__(self, grid=None):
 
  21         self.columns = {'admin_level': 15}
 
  25     def add_row(self, headings, row, force_name):
 
  26         """ Parse the content from the given behave row as place column data.
 
  28         for name, value in zip(headings, row):
 
  29             self._add(name, value)
 
  31         assert 'osm_type' in self.columns, "osm column missing"
 
  33         if force_name and 'name' not in self.columns:
 
  37                 ''.join(random.choices(string.printable, k=random.randrange(30))),
 
  42     def _add(self, key, value):
 
  43         if hasattr(self, '_set_key_' + key):
 
  44             getattr(self, '_set_key_' + key)(value)
 
  45         elif key.startswith('name+'):
 
  46             self._add_hstore('name', key[5:], value)
 
  47         elif key.startswith('extra+'):
 
  48             self._add_hstore('extratags', key[6:], value)
 
  49         elif key.startswith('addr+'):
 
  50             self._add_hstore('address', key[5:], value)
 
  51         elif key in ('name', 'address', 'extratags'):
 
  52             self.columns[key] = eval('{' + value + '}')
 
  54             assert key in ('class', 'type'), "Unknown column '{}'.".format(key)
 
  55             self.columns[key] = None if value == '' else value
 
  57     def _set_key_name(self, value):
 
  58         self._add_hstore('name', 'name', value)
 
  60     def _set_key_osm(self, value):
 
  61         assert value[0] in 'NRW' and value[1:].isdigit(), \
 
  62                "OSM id needs to be of format <NRW><id>."
 
  64         self.columns['osm_type'] = value[0]
 
  65         self.columns['osm_id'] = int(value[1:])
 
  67     def _set_key_admin(self, value):
 
  68         self.columns['admin_level'] = int(value)
 
  70     def _set_key_housenr(self, value):
 
  72             self._add_hstore('address', 'housenumber', value)
 
  74     def _set_key_postcode(self, value):
 
  76             self._add_hstore('address', 'postcode', value)
 
  78     def _set_key_street(self, value):
 
  80             self._add_hstore('address', 'street', value)
 
  82     def _set_key_addr_place(self, value):
 
  84             self._add_hstore('address', 'place', value)
 
  86     def _set_key_country(self, value):
 
  88             self._add_hstore('address', 'country', value)
 
  90     def _set_key_geometry(self, value):
 
  91         if value.startswith('country:'):
 
  92             ccode = value[8:].upper()
 
  93             self.geometry = "ST_SetSRID(ST_Point({}, {}), 4326)".format(*ALIASES[ccode])
 
  94         elif ',' not in value:
 
  96                 pt = self.grid.parse_point(value)
 
  99             self.geometry = f"ST_SetSRID(ST_Point({pt[0]}, {pt[1]}), 4326)"
 
 100         elif '(' not in value:
 
 102                 coords = ','.join(' '.join(f"{p:.7f}" for p in pt)
 
 103                                   for pt in self.grid.parse_line(value))
 
 106             self.geometry = f"'srid=4326;LINESTRING({coords})'::geometry"
 
 109                 coords = ','.join(' '.join(f"{p:.7f}" for p in pt)
 
 110                                   for pt in self.grid.parse_line(value[1:-1]))
 
 113             self.geometry = f"'srid=4326;POLYGON(({coords}))'::geometry"
 
 115     def _add_hstore(self, column, key, value):
 
 116         if column in self.columns:
 
 117             self.columns[column][key] = value
 
 119             self.columns[column] = {key: value}
 
 122         if self.columns['osm_type'] == 'N' and self.geometry is None:
 
 123             pt = self.grid.get(str(self.columns['osm_id'])) if self.grid else None
 
 125                 pt = (random.uniform(-180, 180), random.uniform(-90, 90))
 
 127             return "ST_SetSRID(ST_Point({}, {}), 4326)".format(*pt)
 
 129         assert self.geometry is not None, "Geometry missing"
 
 132     def db_delete(self, cursor):
 
 133         """ Issue a delete for the given OSM object.
 
 135         cursor.execute('DELETE FROM place WHERE osm_type = %s and osm_id = %s',
 
 136                        (self.columns['osm_type'], self.columns['osm_id']))
 
 138     def db_insert(self, cursor):
 
 139         """ Insert the collected data into the database.
 
 141         query = 'INSERT INTO place ({}, geometry) values({}, {})'.format(
 
 142             ','.join(self.columns.keys()),
 
 143             ','.join(['%s' for x in range(len(self.columns))]),
 
 145         cursor.execute(query, list(self.columns.values()))