1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2026 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Tests for functions to maintain the artificial postcode table.
14 from nominatim_db.tools import postcodes
15 from nominatim_db.data import country_info
16 from nominatim_db.db.sql_preprocessor import SQLPreprocessor
18 import dummy_tokenizer
21 class MockPostcodeTable:
22 """ A location_postcodes table for testing.
24 def __init__(self, conn, config):
26 SQLPreprocessor(conn, config).run_sql_file(conn, 'functions/postcode_triggers.sql')
27 with conn.cursor() as cur:
28 cur.execute("""CREATE TABLE location_postcodes (
31 parent_place_id BIGINT,
33 indexed_status SMALLINT,
34 indexed_date TIMESTAMP,
35 country_code varchar(2),
37 geometry GEOMETRY(Geometry, 4326),
38 centroid GEOMETRY(Point, 4326))""")
39 cur.execute("""CREATE OR REPLACE FUNCTION token_normalized_postcode(postcode TEXT)
40 RETURNS TEXT AS $$ BEGIN RETURN postcode; END; $$ LANGUAGE plpgsql;
42 CREATE OR REPLACE FUNCTION get_country_code(place geometry)
43 RETURNS TEXT AS $$ BEGIN
45 END; $$ LANGUAGE plpgsql;
47 cur.execute("""CREATE OR REPLACE FUNCTION expand_by_meters(geom GEOMETRY, meters FLOAT)
48 RETURNS GEOMETRY AS $$
49 SELECT ST_Envelope(ST_Buffer(geom::geography, meters, 1)::geometry)
54 def add(self, country, postcode, x, y):
55 with self.conn.cursor() as cur:
57 """INSERT INTO location_postcodes
58 (place_id, indexed_status, country_code, postcode, centroid, geometry)
59 VALUES (nextval('seq_place'), 1, %(cc)s, %(pc)s,
60 ST_SetSRID(ST_MakePoint(%(x)s, %(y)s), 4326),
61 ST_Expand(ST_SetSRID(ST_MakePoint(%(x)s, %(y)s), 4326), 0.005))""",
62 {'cc': country, 'pc': postcode, 'x': x, 'y': y})
68 with self.conn.cursor() as cur:
69 cur.execute("""SELECT osm_id, country_code, postcode,
70 ST_X(centroid), ST_Y(centroid)
71 FROM location_postcodes""")
72 return set((tuple(row) for row in cur))
76 def postcode_table(def_config, temp_db_conn, placex_table, table_factory):
77 country_info.setup_country_config(def_config)
78 return MockPostcodeTable(temp_db_conn, def_config)
82 def insert_implicit_postcode(placex_row, place_postcode_row):
83 """ Insert data into the placex and place table
84 which can then be used to compute one postcode.
86 def _insert_implicit_postcode(osm_id, country, geometry, postcode, in_placex=False):
88 placex_row(osm_id=osm_id, country=country, geom=geometry,
89 centroid=geometry, address={'postcode': postcode})
91 place_postcode_row(osm_id=osm_id, centroid=geometry,
92 country=country, postcode=postcode)
94 return _insert_implicit_postcode
98 def insert_postcode_area(place_postcode_row):
99 """ Insert an area around a centroid to the postcode table.
101 def _do(osm_id, country, postcode, x, y):
102 x1, x2, y1, y2 = x - 0.001, x + 0.001, y - 0.001, y + 0.001
103 place_postcode_row(osm_type='R', osm_id=osm_id, postcode=postcode, country=country,
104 centroid=f"POINT({x} {y})",
105 geom=f"POLYGON(({x1} {y1}, {x1} {y2}, {x2} {y2}, {x2} {y1}, {x1} {y1}))")
111 def postcode_update(dsn, temp_db_conn):
112 tokenizer = dummy_tokenizer.DummyTokenizer(None)
114 def _do(data_path=None):
115 with temp_db_conn.cursor() as cur:
116 cur.execute("""CREATE TRIGGER location_postcodes_before_update
117 BEFORE UPDATE ON location_postcodes
118 FOR EACH ROW EXECUTE PROCEDURE postcodes_update()""")
119 cur.execute("""CREATE TRIGGER location_postcodes_before_delete
120 BEFORE DELETE ON location_postcodes
121 FOR EACH ROW EXECUTE PROCEDURE postcodes_delete()""")
122 cur.execute("""CREATE TRIGGER location_postcodes_before_insert
123 BEFORE INSERT ON location_postcodes
124 FOR EACH ROW EXECUTE PROCEDURE postcodes_insert()""")
125 temp_db_conn.commit()
127 postcodes.update_postcodes(dsn, data_path, tokenizer)
132 def test_postcodes_empty(postcode_update, postcode_table, place_postcode_table):
135 assert not postcode_table.row_set
138 @pytest.mark.parametrize('in_placex', [True, False])
139 def test_postcodes_add_new_point(postcode_update, postcode_table,
140 insert_implicit_postcode, in_placex):
141 insert_implicit_postcode(1, 'xx', 'POINT(10 12)', '9486', in_placex)
142 postcode_table.add('yy', '9486', 99, 34)
146 assert postcode_table.row_set == {(None, 'xx', '9486', 10, 12), }
149 def test_postcodes_add_new_area(postcode_update, insert_postcode_area, postcode_table):
150 insert_postcode_area(345, 'de', '10445', 23.5, 46.2)
154 assert postcode_table.row_set == {(345, 'de', '10445', 23.5, 46.2)}
157 @pytest.mark.parametrize('in_placex', [True, False])
158 def test_postcodes_add_area_and_point(postcode_update, insert_postcode_area,
159 insert_implicit_postcode, postcode_table, in_placex):
160 insert_implicit_postcode(1, 'xx', 'POINT(10 12)', '10445', in_placex)
161 insert_postcode_area(345, 'xx', '10445', 23.5, 46.2)
165 assert postcode_table.row_set == {(345, 'xx', '10445', 23.5, 46.2)}
168 @pytest.mark.parametrize('in_placex', [True, False])
169 def test_postcodes_add_point_within_area(postcode_update, insert_postcode_area,
170 insert_implicit_postcode, postcode_table, in_placex):
171 insert_implicit_postcode(1, 'xx', 'POINT(23.5 46.2)', '10446', in_placex)
172 insert_postcode_area(345, 'xx', '10445', 23.5, 46.2)
176 assert postcode_table.row_set == {(345, 'xx', '10445', 23.5, 46.2)}
179 @pytest.mark.parametrize('coords', [(99, 34), (10, 34), (99, 12),
180 (9, 34), (9, 11), (23, 11)])
181 def test_postcodes_replace_coordinates(postcode_update, postcode_table, tmp_path,
182 insert_implicit_postcode, coords):
183 insert_implicit_postcode(1, 'xx', 'POINT(10 12)', 'AB 4511')
184 postcode_table.add('xx', 'AB 4511', *coords)
186 postcode_update(tmp_path)
188 assert postcode_table.row_set == {(None, 'xx', 'AB 4511', 10, 12)}
191 def test_postcodes_replace_coordinates_close(postcode_update, postcode_table,
192 insert_implicit_postcode):
193 insert_implicit_postcode(1, 'xx', 'POINT(10 12)', 'AB 4511')
194 postcode_table.add('xx', 'AB 4511', 10, 11.99999999)
198 assert postcode_table.row_set == {(None, 'xx', 'AB 4511', 10, 11.99999999)}
201 def test_postcodes_remove_point(postcode_update, postcode_table,
202 insert_implicit_postcode):
203 insert_implicit_postcode(1, 'xx', 'POINT(10 12)', 'AB 4511')
204 postcode_table.add('xx', 'badname', 10, 12)
208 assert postcode_table.row_set == {(None, 'xx', 'AB 4511', 10, 12)}
211 def test_postcodes_ignore_empty_country(postcode_update, postcode_table,
212 insert_implicit_postcode):
213 insert_implicit_postcode(1, None, 'POINT(10 12)', 'AB 4511')
215 assert not postcode_table.row_set
218 def test_postcodes_remove_all(postcode_update, postcode_table, place_postcode_table):
219 postcode_table.add('ch', '5613', 10, 12)
222 assert not postcode_table.row_set
225 def test_postcodes_multi_country(postcode_update, postcode_table,
226 insert_implicit_postcode):
227 insert_implicit_postcode(1, 'de', 'POINT(10 12)', '54451')
228 insert_implicit_postcode(2, 'cc', 'POINT(100 56)', 'DD23 T')
229 insert_implicit_postcode(3, 'de', 'POINT(10.3 11.0)', '54452')
230 insert_implicit_postcode(4, 'cc', 'POINT(10.3 11.0)', '54452')
234 assert postcode_table.row_set == {(None, 'de', '54451', 10, 12),
235 (None, 'de', '54452', 10.3, 11.0),
236 (None, 'cc', '54452', 10.3, 11.0),
237 (None, 'cc', 'DD23 T', 100, 56)}
240 @pytest.mark.parametrize("gzipped", [True, False])
241 def test_postcodes_extern(postcode_update, postcode_table, tmp_path,
242 insert_implicit_postcode, gzipped):
243 insert_implicit_postcode(1, 'xx', 'POINT(10 12)', 'AB 4511')
245 extfile = tmp_path / 'xx_postcodes.csv'
246 extfile.write_text("postcode,lat,lon\nAB 4511,-4,-1\nCD 4511,-5, -10", encoding='utf-8')
249 subprocess.run(['gzip', str(extfile)])
250 assert not extfile.is_file()
252 postcode_update(tmp_path)
254 assert postcode_table.row_set == {(None, 'xx', 'AB 4511', 10, 12),
255 (None, 'xx', 'CD 4511', -10, -5)}
258 def test_postcodes_extern_bad_column(postcode_update, postcode_table, tmp_path,
259 insert_implicit_postcode):
260 insert_implicit_postcode(1, 'xx', 'POINT(10 12)', 'AB 4511')
262 extfile = tmp_path / 'xx_postcodes.csv'
263 extfile.write_text("postode,lat,lon\nAB 4511,-4,-1\nCD 4511,-5, -10", encoding='utf-8')
265 postcode_update(tmp_path)
267 assert postcode_table.row_set == {(None, 'xx', 'AB 4511', 10, 12)}
270 def test_postcodes_extern_bad_number(postcode_update, insert_implicit_postcode,
271 postcode_table, tmp_path):
272 insert_implicit_postcode(1, 'xx', 'POINT(10 12)', 'AB 4511')
274 extfile = tmp_path / 'xx_postcodes.csv'
276 "postcode,lat,lon\nXX 4511,-4,NaN\nCD 4511,-5, -10\n34,200,0", encoding='utf-8')
278 postcode_update(tmp_path)
280 assert postcode_table.row_set == {(None, 'xx', 'AB 4511', 10, 12),
281 (None, 'xx', 'CD 4511', -10, -5)}
284 def test_can_compute(dsn, table_factory):
285 assert not postcodes.can_compute(dsn)
286 table_factory('place_postcode')
287 assert postcodes.can_compute(dsn)
290 def test_no_placex_entry(postcode_update, temp_db_cursor, place_postcode_row, postcode_table):
291 # Rewrite the get_country_code function to verify its execution.
292 temp_db_cursor.execute("""
293 CREATE OR REPLACE FUNCTION get_country_code(place geometry)
294 RETURNS TEXT AS $$ BEGIN
296 END; $$ LANGUAGE plpgsql;
298 place_postcode_row(centroid='POINT(10 12)', postcode='AB 4511')
301 assert postcode_table.row_set == {(None, 'yy', 'AB 4511', 10, 12)}
304 def test_discard_badly_formatted_postcodes(postcode_update, place_postcode_row, postcode_table):
305 place_postcode_row(centroid='POINT(10 12)', country='fr', postcode='AB 4511')
308 assert not postcode_table.row_set