1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2026 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Tests for functions to maintain the artificial postcode table.
15 from psycopg.rows import tuple_row
17 from nominatim_db.tools import postcodes
18 from nominatim_db.data import country_info
20 import dummy_tokenizer
24 def insert_implicit_postcode(placex_row, place_postcode_row, country_row):
25 """ Insert data into the placex and place table
26 which can then be used to compute one postcode.
28 def _insert_implicit_postcode(osm_id, country, geometry, postcode, in_placex=False):
29 country_row(country=country, names={"name": country})
32 placex_row(osm_id=osm_id, country=country, geom=geometry,
34 address={'postcode': postcode})
36 place_postcode_row(osm_id=osm_id, centroid=geometry,
37 country=country, postcode=postcode)
38 return _insert_implicit_postcode
42 def insert_postcode_area(place_postcode_row, country_row):
43 """ Insert an area around a centroid to the postcode table.
45 def _do(osm_id, country, postcode, x, y):
46 country_row(country=country, names={"name": country})
48 x1, x2, y1, y2 = x - 0.001, x + 0.001, y - 0.001, y + 0.001
49 place_postcode_row(osm_type='R', osm_id=osm_id, postcode=postcode, country=country,
50 centroid=f"POINT({x} {y})",
51 geom=f"POLYGON(({x1} {y1}, {x1} {y2}, {x2} {y2}, {x2} {y1}, {x1} {y1}))")
56 def postcode_update(dsn, temp_db_conn):
57 tokenizer = dummy_tokenizer.DummyTokenizer(None)
59 def _do(data_path=None):
60 with temp_db_conn.cursor() as cur:
61 cur.execute("""CREATE TRIGGER location_postcodes_before_update
62 BEFORE UPDATE ON location_postcodes
63 FOR EACH ROW EXECUTE PROCEDURE postcodes_update()""")
64 cur.execute("""CREATE TRIGGER location_postcodes_before_delete
65 BEFORE DELETE ON location_postcodes
66 FOR EACH ROW EXECUTE PROCEDURE postcodes_delete()""")
67 cur.execute("""CREATE TRIGGER location_postcodes_before_insert
68 BEFORE INSERT ON location_postcodes
69 FOR EACH ROW EXECUTE PROCEDURE postcodes_insert()""")
71 postcodes.update_postcodes(dsn, data_path, tokenizer)
76 @pytest.fixture(autouse=True)
77 def setup(self, def_config, postcode_table, placex_table, place_postcode_table,
78 load_sql, temp_db_conn):
79 self.conn = temp_db_conn
80 country_info.setup_country_config(def_config)
81 load_sql('functions/postcode_triggers.sql')
83 temp_db_conn.execute("""
84 CREATE OR REPLACE FUNCTION token_normalized_postcode(postcode TEXT)
89 CREATE OR REPLACE FUNCTION get_country_code(place geometry)
94 CREATE OR REPLACE FUNCTION expand_by_meters(geom GEOMETRY, meters FLOAT)
95 RETURNS GEOMETRY AS $$
96 SELECT ST_Envelope(ST_Buffer(geom::geography, meters, 1)::geometry)
102 with self.conn.cursor(row_factory=tuple_row) as cur:
103 cur.execute("""SELECT osm_id, country_code, postcode,
104 ST_X(centroid), ST_Y(centroid)
105 FROM location_postcodes""")
106 return {r for r in cur}
108 def test_postcodes_empty(self, postcode_update):
111 assert not self.row_set
113 @pytest.mark.parametrize('in_placex', [True, False])
114 def test_postcodes_add_new_point(self, postcode_update, postcode_row,
115 insert_implicit_postcode, in_placex):
116 insert_implicit_postcode(1, 'xx', 'POINT(10 12)', '9486', in_placex)
117 postcode_row('yy', '9486', 99, 34)
121 assert self.row_set == {(None, 'xx', '9486', 10, 12), }
123 def test_postcodes_add_new_area(self, postcode_update, insert_postcode_area):
124 insert_postcode_area(345, 'de', '10445', 23.5, 46.2)
128 assert self.row_set == {(345, 'de', '10445', 23.5, 46.2)}
130 @pytest.mark.parametrize('in_placex', [True, False])
131 def test_postcodes_add_area_and_point(self, postcode_update, insert_postcode_area,
132 insert_implicit_postcode, in_placex):
133 insert_implicit_postcode(1, 'xx', 'POINT(10 12)', '10445', in_placex)
134 insert_postcode_area(345, 'xx', '10445', 23.5, 46.2)
138 assert self.row_set == {(345, 'xx', '10445', 23.5, 46.2)}
140 @pytest.mark.parametrize('in_placex', [True, False])
141 def test_postcodes_add_point_within_area(self, postcode_update, insert_postcode_area,
142 insert_implicit_postcode, in_placex):
143 insert_implicit_postcode(1, 'xx', 'POINT(23.5 46.2)', '10446', in_placex)
144 insert_postcode_area(345, 'xx', '10445', 23.5, 46.2)
148 assert self.row_set == {(345, 'xx', '10445', 23.5, 46.2)}
150 @pytest.mark.parametrize('coords', [(99, 34), (10, 34), (99, 12),
151 (9, 34), (9, 11), (23, 11)])
152 def test_postcodes_replace_coordinates(self, postcode_update, postcode_row, tmp_path,
153 insert_implicit_postcode, coords):
154 insert_implicit_postcode(1, 'xx', 'POINT(10 12)', 'AB 4511')
155 postcode_row('xx', 'AB 4511', *coords)
157 postcode_update(tmp_path)
159 assert self.row_set == {(None, 'xx', 'AB 4511', 10, 12)}
161 def test_postcodes_replace_coordinates_close(self, postcode_update, postcode_row,
162 insert_implicit_postcode):
163 insert_implicit_postcode(1, 'xx', 'POINT(10 12)', 'AB 4511')
164 postcode_row('xx', 'AB 4511', 10, 11.99999999)
168 assert self.row_set == {(None, 'xx', 'AB 4511', 10, 11.99999999)}
170 def test_postcodes_remove_point(self, postcode_update, postcode_row,
171 insert_implicit_postcode):
172 insert_implicit_postcode(1, 'xx', 'POINT(10 12)', 'AB 4511')
173 postcode_row('xx', 'badname', 10, 12)
177 assert self.row_set == {(None, 'xx', 'AB 4511', 10, 12)}
179 def test_postcodes_ignore_empty_country(self, postcode_update, insert_implicit_postcode):
180 insert_implicit_postcode(1, None, 'POINT(10 12)', 'AB 4511')
182 assert not self.row_set
184 def test_postcodes_remove_all(self, postcode_update, postcode_row, place_postcode_table):
185 postcode_row('ch', '5613', 10, 12)
188 assert not self.row_set
190 def test_postcodes_multi_country(self, postcode_update,
191 insert_implicit_postcode):
192 insert_implicit_postcode(1, 'de', 'POINT(10 12)', '54451')
193 insert_implicit_postcode(2, 'cc', 'POINT(100 56)', 'DD23 T')
194 insert_implicit_postcode(3, 'de', 'POINT(10.3 11.0)', '54452')
195 insert_implicit_postcode(4, 'cc', 'POINT(10.3 11.0)', '54452')
199 assert self.row_set == {(None, 'de', '54451', 10, 12),
200 (None, 'de', '54452', 10.3, 11.0),
201 (None, 'cc', '54452', 10.3, 11.0),
202 (None, 'cc', 'DD23 T', 100, 56)}
204 @pytest.mark.parametrize("gzipped", [True, False])
205 def test_postcodes_extern_jsonl(self, postcode_update, tmp_path,
206 insert_implicit_postcode, gzipped):
207 insert_implicit_postcode(1, 'xx', 'POINT(10 12)', 'AB 4511')
209 extfile = tmp_path / 'xx_postcodes_geometry.jsonl'
211 json.dumps({'properties': {'postcode': 'CD 4511'},
212 # Centroid : 0.5, 0.5
213 'geometry': {'type': 'Polygon', 'coordinates': [[
214 [0, 0], [0, 1], [1, 1], [1, 0], [0, 0]]]}}) + '\n' +
215 json.dumps({'properties': {'postcode': '822114', 'lat': 0.1, 'lon': 0.2},
216 'geometry': {'type': 'Polygon', 'coordinates': [[
217 [0, 0], [0, 1], [1, 1], [1, 0], [0, 0]]]}}) + '\n', encoding='utf-8')
220 subprocess.run(['gzip', str(extfile)])
221 assert not extfile.is_file()
223 postcode_update(tmp_path)
225 assert self.row_set == {(None, 'xx', 'AB 4511', 10, 12),
226 (None, 'xx', 'CD 4511', 0.5, 0.5),
227 (None, 'xx', '822114', 0.2, 0.1)}
229 def test_postcodes_extern_jsonl_invalid_data(self, postcode_update, tmp_path,
230 insert_implicit_postcode):
231 insert_implicit_postcode(1, 'xx', 'POINT(10 12)', 'AB 4511')
233 extfile = tmp_path / 'xx_postcodes_geometry.jsonl'
236 '{"geometry": {"type": "Polygon"}, "properties": {"postcode": "BAD"}\n'
238 + json.dumps({'properties': {'postcode': 'NOGEOM'}}) + '\n'
239 # Invalid geometry type
240 + json.dumps({'geometry': {'type': 'Point', 'coordinates': [0, 0]},
241 'properties': {'postcode': 'BADGEOM'}}) + '\n'
243 + json.dumps({'properties': {'lat': 0, 'lon': 0},
244 'geometry': {'type': 'Polygon', 'coordinates': [[
245 [0, 0], [0, 1], [1, 1], [1, 0], [0, 0]]]}}) + '\n'
247 + json.dumps({'geometry': {'type': 'Polygon', 'coordinates': [[
248 [0, 0], [0, 1], [1, 1], [1, 0], [0, 0]]]},
249 'properties': {'postcode': 'GOOD'}}) + '\n', encoding='utf-8')
251 postcode_update(tmp_path)
253 assert self.row_set == {(None, 'xx', 'AB 4511', 10, 12),
254 (None, 'xx', 'GOOD', 0.5, 0.5)}
256 def test_postcodes_extern_jsonl_overwrite_past_import(self, postcode_update, tmp_path,
257 postcode_row, country_row):
258 country_row(country="xx", names={"name": "xx"})
259 postcode_row('xx', '822114', 83.8, 24.1, True) # area from past geometry import
260 postcode_row('xx', '110000', 77.2, 28.6, True)
262 extfile = tmp_path / 'xx_postcodes_geometry.jsonl'
264 # overwrites past postcode
265 json.dumps({'properties': {'postcode': '822114'},
266 'geometry': {'type': 'Polygon', 'coordinates': [[
267 [0, 0], [0, 1], [1, 1], [1, 0], [0, 0]]]}}) + '\n', encoding='utf-8')
269 postcode_update(tmp_path)
271 # pc 110000 no longer exist in import file, thus deleted
272 assert self.row_set == {(None, 'xx', '822114', 0.5, 0.5)}
274 @pytest.mark.parametrize("gzipped", [True, False])
275 def test_postcodes_extern_csv(self, postcode_update, tmp_path,
276 insert_implicit_postcode, gzipped):
277 insert_implicit_postcode(1, 'xx', 'POINT(10 12)', 'AB 4511')
279 extfile = tmp_path / 'xx_postcodes.csv'
280 extfile.write_text("postcode,lat,lon\nAB 4511,-4,-1\nCD 4511,-5, -10", encoding='utf-8')
283 subprocess.run(['gzip', str(extfile)])
284 assert not extfile.is_file()
286 postcode_update(tmp_path)
288 assert self.row_set == {(None, 'xx', 'AB 4511', 10, 12),
289 (None, 'xx', 'CD 4511', -10, -5)}
291 def test_postcodes_extern_csv_bad_column(self, postcode_update, tmp_path,
292 insert_implicit_postcode):
293 insert_implicit_postcode(1, 'xx', 'POINT(10 12)', 'AB 4511')
295 extfile = tmp_path / 'xx_postcodes.csv'
296 extfile.write_text("postode,lat,lon\nAB 4511,-4,-1\nCD 4511,-5, -10", encoding='utf-8')
298 postcode_update(tmp_path)
300 assert self.row_set == {(None, 'xx', 'AB 4511', 10, 12)}
302 def test_postcodes_extern_csv_bad_number(self, postcode_update, insert_implicit_postcode,
304 insert_implicit_postcode(1, 'xx', 'POINT(10 12)', 'AB 4511')
306 extfile = tmp_path / 'xx_postcodes.csv'
308 "postcode,lat,lon\nXX 4511,-4,NaN\nCD 4511,-5, -10\n34,200,0", encoding='utf-8')
310 postcode_update(tmp_path)
312 assert self.row_set == {(None, 'xx', 'AB 4511', 10, 12),
313 (None, 'xx', 'CD 4511', -10, -5)}
315 def test_postcodes_import_precedence(self, postcode_update, tmp_path,
316 insert_implicit_postcode, insert_postcode_area):
317 # osm area precedes all external imoprts
318 insert_postcode_area(3, 'xx', '110000', 77.2, 28.6)
319 # guessed postcode area from osm points, should be overwritten by geometry import
320 insert_implicit_postcode(1, 'xx', 'POINT(10 12)', '822114')
321 insert_implicit_postcode(2, 'xx', 'POINT(0 12)', '822114')
323 josnlfile = tmp_path / 'xx_postcodes_geometry.jsonl'
324 josnlfile.write_text(
325 # ignored, osm area takes precedence over geometry import
326 json.dumps({'properties': {'postcode': '110000', 'lat': 0.1, 'lon': 0.2},
327 'geometry': {'type': 'Polygon', 'coordinates': [[
328 [0, 0], [0, 1], [1, 1], [1, 0], [0, 0]]]}}) + '\n' +
329 # geometry import takes precedence over guessed postcode from osm points
330 json.dumps({'properties': {'postcode': '822114'},
331 'geometry': {'type': 'Polygon', 'coordinates': [[
332 [0, 0], [0, 10], [10, 10], [10, 0], [0, 0]]]}}) + '\n' +
333 # geometry import takes precedence over csv import
334 json.dumps({'properties': {'postcode': '822115'},
335 'geometry': {'type': 'Polygon', 'coordinates': [[
336 [0, 0], [0, 2], [2, 2], [2, 0], [0, 0]]]}}) + '\n', encoding='utf-8')
338 csvfile = tmp_path / 'xx_postcodes.csv'
341 "822115,0.2,0.2\n" # ignored, geometry import takes precedence over csv import
342 "822116,-5,-10", # added
345 postcode_update(tmp_path)
347 assert self.row_set == {(None, 'xx', '822114', 5, 5),
348 (None, 'xx', '822115', 1, 1),
349 (3, 'xx', '110000', 77.2, 28.6),
350 (None, 'xx', '822116', -10, -5)}
352 def test_no_placex_entry(self, postcode_update, temp_db_cursor, place_postcode_row):
353 # Rewrite the get_country_code function to verify its execution.
354 temp_db_cursor.execute("""
355 CREATE OR REPLACE FUNCTION get_country_code(place geometry) RETURNS TEXT AS $$
356 SELECT 'yy' $$ LANGUAGE sql""")
357 place_postcode_row(centroid='POINT(10 12)', postcode='AB 4511')
360 assert self.row_set == {(None, 'yy', 'AB 4511', 10, 12)}
362 def test_discard_badly_formatted_postcodes(self, postcode_update, place_postcode_row):
363 place_postcode_row(centroid='POINT(10 12)', country='fr', postcode='AB 4511')
366 assert not self.row_set
369 def test_can_compute(dsn, table_factory):
370 assert not postcodes.can_compute(dsn)
371 table_factory('place_postcode')
372 assert postcodes.can_compute(dsn)