1 # SPDX-License-Identifier: GPL-2.0-only
 
   3 # This file is part of Nominatim. (https://nominatim.org)
 
   5 # Copyright (C) 2022 by the Nominatim developer community.
 
   6 # For a full list of authors see the git log.
 
   8 Functions to facilitate accessing and comparing the content of DB tables.
 
  14 from psycopg import sql as pysql
 
  16 from steps.check_functions import Almost
 
  18 ID_REGEX = re.compile(r"(?P<typ>[NRW])(?P<oid>\d+)(:(?P<cls>\w+))?")
 
  21     """ Splits a unique identifier for places into its components.
 
  22         As place_ids cannot be used for testing, we use a unique
 
  23         identifier instead that is of the form <osmtype><osmid>[:<class>].
 
  26     def __init__(self, oid):
 
  27         self.typ = self.oid = self.cls = None
 
  30             m = ID_REGEX.fullmatch(oid)
 
  31             assert m is not None, \
 
  32                    "ID '{}' not of form <osmtype><osmid>[:<class>]".format(oid)
 
  34             self.typ = m.group('typ')
 
  35             self.oid = m.group('oid')
 
  36             self.cls = m.group('cls')
 
  40             return self.typ + self.oid
 
  42         return '{self.typ}{self.oid}:{self.cls}'.format(self=self)
 
  44     def query_osm_id(self, cur, query):
 
  45         """ Run a query on cursor `cur` using osm ID, type and class. The
 
  46             `query` string must contain exactly one placeholder '{}' where
 
  47             the 'where' query should go.
 
  49         where = 'osm_type = %s and osm_id = %s'
 
  50         params = [self.typ, self. oid]
 
  52         if self.cls is not None:
 
  53             where += ' and class = %s'
 
  54             params.append(self.cls)
 
  56         cur.execute(query.format(where), params)
 
  58     def row_by_place_id(self, cur, table, extra_columns=None):
 
  59         """ Get a row by place_id from the given table using cursor `cur`.
 
  60             extra_columns may contain a list additional elements for the select
 
  63         pid = self.get_place_id(cur)
 
  64         query = "SELECT {} FROM {} WHERE place_id = %s".format(
 
  65                     ','.join(['*'] + (extra_columns or [])), table)
 
  66         cur.execute(query, (pid, ))
 
  68     def get_place_id(self, cur, allow_empty=False):
 
  69         """ Look up the place id for the ID. Throws an assertion if the ID
 
  72         self.query_osm_id(cur, "SELECT place_id FROM placex WHERE {}")
 
  73         if cur.rowcount == 0 and allow_empty:
 
  76         assert cur.rowcount == 1, \
 
  77                "Place ID {!s} not unique. Found {} entries.".format(self, cur.rowcount)
 
  79         return cur.fetchone()['place_id']
 
  83     """ Represents a row from a database and offers comparison functions.
 
  85     def __init__(self, nid, db_row, context):
 
  88         self.context = context
 
  90     def assert_row(self, row, exclude_columns):
 
  91         """ Check that all columns of the given behave row are contained
 
  92             in the database row. Exclude behave rows with the names given
 
  93             in the `exclude_columns` list.
 
  95         for name, value in zip(row.headings, row.cells):
 
  96             if name not in exclude_columns:
 
  97                 assert self.contains(name, value), self.assert_msg(name, value)
 
  99     def contains(self, name, expected):
 
 100         """ Check that the DB row contains a column `name` with the given value.
 
 103             column, field = name.split('+', 1)
 
 104             return self._contains_hstore_value(column, field, expected)
 
 106         if name == 'geometry':
 
 107             return self._has_geometry(expected)
 
 109         if name not in self.db_row:
 
 112         actual = self.db_row[name]
 
 115             return actual is None
 
 117         if name == 'name' and ':' not in expected:
 
 118             return self._compare_column(actual[name], expected)
 
 120         if 'place_id' in name:
 
 121             return self._compare_place_id(actual, expected)
 
 123         if name == 'centroid':
 
 124             return self._has_centroid(expected)
 
 126         return self._compare_column(actual, expected)
 
 128     def _contains_hstore_value(self, column, field, expected):
 
 132         if column not in self.db_row:
 
 136             return self.db_row[column] is None or field not in self.db_row[column]
 
 138         if self.db_row[column] is None:
 
 141         return self._compare_column(self.db_row[column].get(field), expected)
 
 143     def _compare_column(self, actual, expected):
 
 144         if isinstance(actual, dict):
 
 145             return actual == eval('{' + expected + '}')
 
 147         return str(actual) == expected
 
 149     def _compare_place_id(self, actual, expected):
 
 153        with self.context.db.cursor() as cur:
 
 154             return NominatimID(expected).get_place_id(cur) == actual
 
 156     def _has_centroid(self, expected):
 
 157         if expected == 'in geometry':
 
 158             with self.context.db.cursor(row_factory=psycopg.rows.tuple_row) as cur:
 
 159                 cur.execute("""SELECT ST_Within(ST_SetSRID(ST_Point(%(cx)s, %(cy)s), 4326),
 
 160                                         ST_SetSRID(%(geomtxt)s::geometry, 4326))""",
 
 162                 return cur.fetchone()[0]
 
 165             x, y = expected.split(' ')
 
 167             x, y = self.context.osm.grid_node(int(expected))
 
 169         return Almost(float(x)) == self.db_row['cx'] and Almost(float(y)) == self.db_row['cy']
 
 171     def _has_geometry(self, expected):
 
 172         geom = self.context.osm.parse_geometry(expected)
 
 173         with self.context.db.cursor(row_factory=psycopg.rows.tuple_row) as cur:
 
 174             cur.execute(pysql.SQL("""SELECT ST_Equals(ST_SnapToGrid({}, 0.00001, 0.00001),
 
 175                                    ST_SnapToGrid(ST_SetSRID({}::geometry, 4326), 0.00001, 0.00001))""")
 
 176                              .format(pysql.SQL(geom),
 
 177                                      pysql.Literal(self.db_row['geomtxt'])))
 
 178             return cur.fetchone()[0]
 
 180     def assert_msg(self, name, value):
 
 181         """ Return a string with an informative message for a failed compare.
 
 183         msg = "\nBad column '{}' in row '{!s}'.".format(name, self.nid)
 
 184         actual = self._get_actual(name)
 
 185         if actual is not None:
 
 186             msg += " Expected: {}, got: {}.".format(value, actual)
 
 188             msg += " No such column."
 
 190         return msg + "\nFull DB row: {}".format(json.dumps(dict(self.db_row), indent=4, default=str))
 
 192     def _get_actual(self, name):
 
 194             column, field = name.split('+', 1)
 
 197             return (self.db_row.get(column) or {}).get(field)
 
 199         if name == 'geometry':
 
 200             return self.db_row['geomtxt']
 
 202         if name not in self.db_row:
 
 205         if name == 'centroid':
 
 206             return "POINT({cx} {cy})".format(**self.db_row)
 
 208         actual = self.db_row[name]
 
 210         if 'place_id' in name:
 
 217             with self.context.db.cursor(row_factory=psycopg.rows.tuple_row) as cur:
 
 218                 cur.execute("""SELECT osm_type, osm_id, class
 
 219                                FROM placex WHERE place_id = %s""",
 
 222                 if cur.rowcount == 1:
 
 223                     return "{0[0]}{0[1]}:{0[2]}".format(cur.fetchone())
 
 225                 return "[place ID {} not found]".format(actual)