1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2025 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Tests for functions to maintain the artificial postcode table.
14 from nominatim_db.tools import postcodes
15 from nominatim_db.data import country_info
16 from nominatim_db.db.sql_preprocessor import SQLPreprocessor
18 import dummy_tokenizer
21 class MockPostcodeTable:
22 """ A location_postcodes table for testing.
24 def __init__(self, conn, config):
26 SQLPreprocessor(conn, config).run_sql_file(conn, 'functions/postcode_triggers.sql')
27 with conn.cursor() as cur:
28 cur.execute("""CREATE TABLE location_postcodes (
31 parent_place_id BIGINT,
33 indexed_status SMALLINT,
34 indexed_date TIMESTAMP,
35 country_code varchar(2),
37 geometry GEOMETRY(Geometry, 4326),
38 centroid GEOMETRY(Point, 4326))""")
39 cur.execute("""CREATE OR REPLACE FUNCTION token_normalized_postcode(postcode TEXT)
40 RETURNS TEXT AS $$ BEGIN RETURN postcode; END; $$ LANGUAGE plpgsql;
42 CREATE OR REPLACE FUNCTION get_country_code(place geometry)
43 RETURNS TEXT AS $$ BEGIN
45 END; $$ LANGUAGE plpgsql;
47 cur.execute("""CREATE OR REPLACE FUNCTION expand_by_meters(geom GEOMETRY, meters FLOAT)
48 RETURNS GEOMETRY AS $$
49 SELECT ST_Envelope(ST_Buffer(geom::geography, meters, 1)::geometry)
54 def add(self, country, postcode, x, y):
55 with self.conn.cursor() as cur:
57 """INSERT INTO location_postcodes
58 (place_id, indexed_status, country_code, postcode, centroid, geometry)
59 VALUES (nextval('seq_place'), 1, %(cc)s, %(pc)s,
60 ST_SetSRID(ST_MakePoint(%(x)s, %(y)s), 4326),
61 ST_Expand(ST_SetSRID(ST_MakePoint(%(x)s, %(y)s), 4326), 0.005))""",
62 {'cc': country, 'pc': postcode, 'x': x, 'y': y})
68 with self.conn.cursor() as cur:
69 cur.execute("""SELECT osm_id, country_code, postcode,
70 ST_X(centroid), ST_Y(centroid)
71 FROM location_postcodes""")
72 return set((tuple(row) for row in cur))
76 def postcode_table(def_config, temp_db_conn, placex_table, table_factory):
77 country_info.setup_country_config(def_config)
78 table_factory('country_name', 'partition INT', ((0, ), (1, ), (2, )))
79 return MockPostcodeTable(temp_db_conn, def_config)
83 def insert_implicit_postcode(placex_table, place_postcode_row):
84 """ Insert data into the placex and place table
85 which can then be used to compute one postcode.
87 def _insert_implicit_postcode(osm_id, country, geometry, postcode, in_placex=False):
89 placex_table.add(osm_id=osm_id, country=country, geom=geometry,
90 centroid=f'SRID=4326;{geometry}',
91 address={'postcode': postcode})
93 place_postcode_row(osm_id=osm_id, centroid=geometry,
94 country=country, postcode=postcode)
96 return _insert_implicit_postcode
100 def insert_postcode_area(place_postcode_row):
101 """ Insert an area around a centroid to the postcode table.
103 def _do(osm_id, country, postcode, x, y):
104 x1, x2, y1, y2 = x - 0.001, x + 0.001, y - 0.001, y + 0.001
105 place_postcode_row(osm_type='R', osm_id=osm_id, postcode=postcode, country=country,
106 centroid=f"POINT({x} {y})",
107 geom=f"POLYGON(({x1} {y1}, {x1} {y2}, {x2} {y2}, {x2} {y1}, {x1} {y1}))")
113 def postcode_update(dsn, temp_db_conn):
114 tokenizer = dummy_tokenizer.DummyTokenizer(None)
116 def _do(data_path=None):
117 with temp_db_conn.cursor() as cur:
118 cur.execute("""CREATE TRIGGER location_postcodes_before_update
119 BEFORE UPDATE ON location_postcodes
120 FOR EACH ROW EXECUTE PROCEDURE postcodes_update()""")
121 cur.execute("""CREATE TRIGGER location_postcodes_before_delete
122 BEFORE DELETE ON location_postcodes
123 FOR EACH ROW EXECUTE PROCEDURE postcodes_delete()""")
124 cur.execute("""CREATE TRIGGER location_postcodes_before_insert
125 BEFORE INSERT ON location_postcodes
126 FOR EACH ROW EXECUTE PROCEDURE postcodes_insert()""")
127 temp_db_conn.commit()
129 postcodes.update_postcodes(dsn, data_path, tokenizer)
134 def test_postcodes_empty(postcode_update, postcode_table, place_postcode_table):
137 assert not postcode_table.row_set
140 @pytest.mark.parametrize('in_placex', [True, False])
141 def test_postcodes_add_new_point(postcode_update, postcode_table,
142 insert_implicit_postcode, in_placex):
143 insert_implicit_postcode(1, 'xx', 'POINT(10 12)', '9486', in_placex)
144 postcode_table.add('yy', '9486', 99, 34)
148 assert postcode_table.row_set == {(None, 'xx', '9486', 10, 12), }
151 def test_postcodes_add_new_area(postcode_update, insert_postcode_area, postcode_table):
152 insert_postcode_area(345, 'de', '10445', 23.5, 46.2)
156 assert postcode_table.row_set == {(345, 'de', '10445', 23.5, 46.2)}
159 @pytest.mark.parametrize('in_placex', [True, False])
160 def test_postcodes_add_area_and_point(postcode_update, insert_postcode_area,
161 insert_implicit_postcode, postcode_table, in_placex):
162 insert_implicit_postcode(1, 'xx', 'POINT(10 12)', '10445', in_placex)
163 insert_postcode_area(345, 'xx', '10445', 23.5, 46.2)
167 assert postcode_table.row_set == {(345, 'xx', '10445', 23.5, 46.2)}
170 @pytest.mark.parametrize('in_placex', [True, False])
171 def test_postcodes_add_point_within_area(postcode_update, insert_postcode_area,
172 insert_implicit_postcode, postcode_table, in_placex):
173 insert_implicit_postcode(1, 'xx', 'POINT(23.5 46.2)', '10446', in_placex)
174 insert_postcode_area(345, 'xx', '10445', 23.5, 46.2)
178 assert postcode_table.row_set == {(345, 'xx', '10445', 23.5, 46.2)}
181 @pytest.mark.parametrize('coords', [(99, 34), (10, 34), (99, 12),
182 (9, 34), (9, 11), (23, 11)])
183 def test_postcodes_replace_coordinates(postcode_update, postcode_table, tmp_path,
184 insert_implicit_postcode, coords):
185 insert_implicit_postcode(1, 'xx', 'POINT(10 12)', 'AB 4511')
186 postcode_table.add('xx', 'AB 4511', *coords)
188 postcode_update(tmp_path)
190 assert postcode_table.row_set == {(None, 'xx', 'AB 4511', 10, 12)}
193 def test_postcodes_replace_coordinates_close(postcode_update, postcode_table,
194 insert_implicit_postcode):
195 insert_implicit_postcode(1, 'xx', 'POINT(10 12)', 'AB 4511')
196 postcode_table.add('xx', 'AB 4511', 10, 11.99999999)
200 assert postcode_table.row_set == {(None, 'xx', 'AB 4511', 10, 11.99999999)}
203 def test_postcodes_remove_point(postcode_update, postcode_table,
204 insert_implicit_postcode):
205 insert_implicit_postcode(1, 'xx', 'POINT(10 12)', 'AB 4511')
206 postcode_table.add('xx', 'badname', 10, 12)
210 assert postcode_table.row_set == {(None, 'xx', 'AB 4511', 10, 12)}
213 def test_postcodes_ignore_empty_country(postcode_update, postcode_table,
214 insert_implicit_postcode):
215 insert_implicit_postcode(1, None, 'POINT(10 12)', 'AB 4511')
217 assert not postcode_table.row_set
220 def test_postcodes_remove_all(postcode_update, postcode_table, place_postcode_table):
221 postcode_table.add('ch', '5613', 10, 12)
224 assert not postcode_table.row_set
227 def test_postcodes_multi_country(postcode_update, postcode_table,
228 insert_implicit_postcode):
229 insert_implicit_postcode(1, 'de', 'POINT(10 12)', '54451')
230 insert_implicit_postcode(2, 'cc', 'POINT(100 56)', 'DD23 T')
231 insert_implicit_postcode(3, 'de', 'POINT(10.3 11.0)', '54452')
232 insert_implicit_postcode(4, 'cc', 'POINT(10.3 11.0)', '54452')
236 assert postcode_table.row_set == {(None, 'de', '54451', 10, 12),
237 (None, 'de', '54452', 10.3, 11.0),
238 (None, 'cc', '54452', 10.3, 11.0),
239 (None, 'cc', 'DD23 T', 100, 56)}
242 @pytest.mark.parametrize("gzipped", [True, False])
243 def test_postcodes_extern(postcode_update, postcode_table, tmp_path,
244 insert_implicit_postcode, gzipped):
245 insert_implicit_postcode(1, 'xx', 'POINT(10 12)', 'AB 4511')
247 extfile = tmp_path / 'xx_postcodes.csv'
248 extfile.write_text("postcode,lat,lon\nAB 4511,-4,-1\nCD 4511,-5, -10")
251 subprocess.run(['gzip', str(extfile)])
252 assert not extfile.is_file()
254 postcode_update(tmp_path)
256 assert postcode_table.row_set == {(None, 'xx', 'AB 4511', 10, 12),
257 (None, 'xx', 'CD 4511', -10, -5)}
260 def test_postcodes_extern_bad_column(postcode_update, postcode_table, tmp_path,
261 insert_implicit_postcode):
262 insert_implicit_postcode(1, 'xx', 'POINT(10 12)', 'AB 4511')
264 extfile = tmp_path / 'xx_postcodes.csv'
265 extfile.write_text("postode,lat,lon\nAB 4511,-4,-1\nCD 4511,-5, -10")
267 postcode_update(tmp_path)
269 assert postcode_table.row_set == {(None, 'xx', 'AB 4511', 10, 12)}
272 def test_postcodes_extern_bad_number(postcode_update, insert_implicit_postcode,
273 postcode_table, tmp_path):
274 insert_implicit_postcode(1, 'xx', 'POINT(10 12)', 'AB 4511')
276 extfile = tmp_path / 'xx_postcodes.csv'
277 extfile.write_text("postcode,lat,lon\nXX 4511,-4,NaN\nCD 4511,-5, -10\n34,200,0")
279 postcode_update(tmp_path)
281 assert postcode_table.row_set == {(None, 'xx', 'AB 4511', 10, 12),
282 (None, 'xx', 'CD 4511', -10, -5)}
285 def test_can_compute(dsn, table_factory):
286 assert not postcodes.can_compute(dsn)
287 table_factory('place_postcode')
288 assert postcodes.can_compute(dsn)
291 def test_no_placex_entry(postcode_update, temp_db_cursor, place_postcode_row, postcode_table):
292 # Rewrite the get_country_code function to verify its execution.
293 temp_db_cursor.execute("""
294 CREATE OR REPLACE FUNCTION get_country_code(place geometry)
295 RETURNS TEXT AS $$ BEGIN
297 END; $$ LANGUAGE plpgsql;
299 place_postcode_row(centroid='POINT(10 12)', postcode='AB 4511')
302 assert postcode_table.row_set == {(None, 'yy', 'AB 4511', 10, 12)}
305 def test_discard_badly_formatted_postcodes(postcode_update, place_postcode_row, postcode_table):
306 place_postcode_row(centroid='POINT(10 12)', country='fr', postcode='AB 4511')
309 assert not postcode_table.row_set