1 # SPDX-License-Identifier: GPL-2.0-only
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2025 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Helper functions to compare expected values.
11 import collections.abc
16 from psycopg import sql as pysql
17 from psycopg.rows import dict_row
18 from .geometry_alias import ALIASES
22 'exactly': lambda exp, act: exp == act,
23 'more than': lambda exp, act: act > exp,
24 'less than': lambda exp, act: act < exp,
29 return json.dumps(obj, sort_keys=True, indent=2)
32 def _pt_close(p1, p2):
33 return math.isclose(p1[0], p2[0], abs_tol=1e-07) \
34 and math.isclose(p1[1], p2[1], abs_tol=1e-07)
37 def within_box(value, expect):
38 coord = [float(x) for x in expect.split(',')]
40 if isinstance(value, str):
41 if value.startswith('POINT'):
42 value = value[6:-1].split(' ')
44 value = value.split(',')
45 value = list(map(float, value))
48 return coord[0] <= value[0] <= coord[2] \
49 and coord[1] <= value[1] <= coord[3]
52 return value[0] >= coord[0] and value[1] <= coord[1] \
53 and value[2] >= coord[2] and value[3] <= coord[3]
55 raise ValueError("Not a coordinate or bbox.")
59 None: lambda val, exp: str(val) == exp,
60 'i': lambda val, exp: str(val).lower() == exp.lower(),
61 'fm': lambda val, exp: re.fullmatch(exp, val) is not None,
62 'dict': lambda val, exp: (val is None if exp == '-'
63 else (val == ast.literal_eval('{' + exp + '}'))),
67 OSM_TYPE = {'node': 'n', 'way': 'w', 'relation': 'r',
68 'N': 'n', 'W': 'w', 'R': 'r'}
72 """ Returns the given attribute as a string.
74 The key parameter determines how the value is formatted before
75 returning. To refer to sub attributes, use '+' to add more keys
76 (e.g. 'name+ref' will access obj['name']['ref']). A '!' introduces
77 a formatting suffix. If no suffix is given, the value will be
78 converted using the str() function.
82 !:... - use a formatting expression according to Python Mini Format Spec
83 !i - make case-insensitive comparison
84 !fm - consider comparison string a regular expression and match full value
85 !wkt - convert the expected value to a WKT string before comparing
86 !in_box - the expected value is a comma-separated bbox description
89 def __init__(self, obj, key, grid=None):
93 self.key, self.fmt = key.rsplit('!', 1)
98 if self.key == 'object':
99 assert 'osm_id' in obj
100 assert 'osm_type' in obj
101 self.subobj = OSM_TYPE[obj['osm_type']] + str(obj['osm_id'])
105 self.subobj = self.obj
106 for sub in self.key.split('+'):
108 if isinstance(self.subobj, collections.abc.Sequence) and sub.isdigit():
110 assert sub < len(self.subobj), \
111 f"Out of bound index {done}. Full object:\n{_pretty(self.obj)}"
113 assert sub in self.subobj, \
114 f"Missing attribute {done}. Full object:\n{_pretty(self.obj)}"
115 self.subobj = self.subobj[sub]
117 def __eq__(self, other):
118 # work around bad quoting by pytest-bdd
119 if not isinstance(other, str):
120 return self.subobj == other
122 other = other.replace(r'\\', '\\')
124 if self.fmt in COMPARISON_FUNCS:
125 return COMPARISON_FUNCS[self.fmt](self.subobj, other)
127 if self.fmt.startswith(':'):
128 return other == f"{{{self.fmt}}}".format(self.subobj)
130 if self.fmt == 'wkt':
131 return self.compare_wkt(self.subobj, other)
133 raise RuntimeError(f"Unknown format string '{self.fmt}'.")
136 k = self.key.replace('+', '][')
139 return f"result[{k}]({self.subobj})"
141 def compare_wkt(self, value, expected):
142 """ Compare a WKT value against a compact geometry format.
143 The function understands the following formats:
145 country:<country code>
146 Point geometry guaranteed to be in the given country
154 <P> may either be a coordinate of the form '<x> <y>' or a single
155 number. In the latter case it must refer to a point in
156 a previously defined grid.
158 m = re.fullmatch(r'(POINT)\(([0-9. -]*)\)', value) \
159 or re.fullmatch(r'(LINESTRING)\(([0-9,. -]*)\)', value) \
160 or re.fullmatch(r'(POLYGON)\(\(([0-9,. -]*)\)\)', value)
164 converted = [list(map(float, pt.split(' ', 1)))
165 for pt in map(str.strip, m[2].split(','))]
167 if expected.startswith('country:'):
168 ccode = expected[8:].upper()
169 assert ccode in ALIASES, f"Geometry error: unknown country {ccode}"
170 return m[1] == 'POINT' and _pt_close(converted[0], ALIASES[ccode])
172 if ',' not in expected:
173 return m[1] == 'POINT' and _pt_close(converted[0], self.get_point(expected))
175 if '(' not in expected:
176 return m[1] == 'LINESTRING' and \
177 all(_pt_close(p1, p2) for p1, p2 in
178 zip(converted, (self.get_point(p) for p in expected.split(','))))
180 if m[1] != 'POLYGON':
183 # Polygon comparison is tricky because the polygons don't necessarily
184 # end at the same point or have the same winding order.
185 # Brute force all possible variants of the expected polygon
186 exp_coords = [self.get_point(p) for p in expected[1:-1].split(',')]
187 if exp_coords[0] != exp_coords[-1]:
188 raise RuntimeError(f"Invalid polygon {expected}. "
189 "First and last point need to be the same")
190 for line in (exp_coords[:-1], exp_coords[-1:0:-1]):
191 for i in range(len(line)):
192 if all(_pt_close(p1, p2) for p1, p2 in
193 zip(converted, line[i:] + line[:i])):
198 def get_point(self, pt):
201 return list(map(float, pt.split(' ', 1)))
205 return self.grid.get(pt)
208 def check_table_content(conn, tablename, data, grid=None, exact=False):
209 lines = set(range(1, len(data)))
214 cols.extend(('osm_id', 'osm_type'))
216 name, fmt = col.rsplit('!', 1)
217 if fmt in ('wkt', 'in_box'):
218 cols.append(f"ST_AsText({name}) as {name}")
220 cols.append(name.split('+')[0])
222 cols.append(col.split('+')[0])
224 with conn.cursor(row_factory=dict_row) as cur:
225 cur.execute(pysql.SQL(f"SELECT {','.join(cols)} FROM")
226 + pysql.Identifier(tablename))
230 table_content += '\n' + str(row)
232 for col, value in zip(data[0], data[i]):
233 if ResultAttr(row, col, grid=grid) != (None if value == '-' else value):
239 assert not exact, f"Unexpected row in table {tablename}: {row}"
242 "Rows not found:\n" \
243 + '\n'.join(str(data[i]) for i in lines) \
244 + "\nTable content:\n" \