]> git.openstreetmap.org Git - nominatim.git/blob - test/python/tools/test_postcodes.py
prepare release 5.3.2.post6
[nominatim.git] / test / python / tools / test_postcodes.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2026 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Tests for functions to maintain the artificial postcode table.
9 """
10 import subprocess
11 import json
12
13 import pytest
14
15 from psycopg.rows import tuple_row
16
17 from nominatim_db.tools import postcodes
18 from nominatim_db.data import country_info
19
20 import dummy_tokenizer
21
22
23 @pytest.fixture
24 def insert_implicit_postcode(placex_row, place_postcode_row, country_row):
25     """ Insert data into the placex and place table
26         which can then be used to compute one postcode.
27     """
28     def _insert_implicit_postcode(osm_id, country, geometry, postcode, in_placex=False):
29         country_row(country=country, names={"name": country})
30
31         if in_placex:
32             placex_row(osm_id=osm_id, country=country, geom=geometry,
33                        centroid=geometry,
34                        address={'postcode': postcode})
35         else:
36             place_postcode_row(osm_id=osm_id, centroid=geometry,
37                                country=country, postcode=postcode)
38     return _insert_implicit_postcode
39
40
41 @pytest.fixture
42 def insert_postcode_area(place_postcode_row, country_row):
43     """ Insert an area around a centroid to the postcode table.
44     """
45     def _do(osm_id, country, postcode, x, y):
46         country_row(country=country, names={"name": country})
47
48         x1, x2, y1, y2 = x - 0.001, x + 0.001, y - 0.001, y + 0.001
49         place_postcode_row(osm_type='R', osm_id=osm_id, postcode=postcode, country=country,
50                            centroid=f"POINT({x} {y})",
51                            geom=f"POLYGON(({x1} {y1}, {x1} {y2}, {x2} {y2}, {x2} {y1}, {x1} {y1}))")
52     return _do
53
54
55 @pytest.fixture
56 def postcode_update(dsn, temp_db_conn):
57     tokenizer = dummy_tokenizer.DummyTokenizer(None)
58
59     def _do(data_path=None):
60         with temp_db_conn.cursor() as cur:
61             cur.execute("""CREATE TRIGGER location_postcodes_before_update
62                             BEFORE UPDATE ON location_postcodes
63                             FOR EACH ROW EXECUTE PROCEDURE postcodes_update()""")
64             cur.execute("""CREATE TRIGGER location_postcodes_before_delete
65                             BEFORE DELETE ON location_postcodes
66                             FOR EACH ROW EXECUTE PROCEDURE postcodes_delete()""")
67             cur.execute("""CREATE TRIGGER location_postcodes_before_insert
68                             BEFORE INSERT ON location_postcodes
69                             FOR EACH ROW EXECUTE PROCEDURE postcodes_insert()""")
70         temp_db_conn.commit()
71         postcodes.update_postcodes(dsn, data_path, tokenizer)
72     return _do
73
74
75 class TestPostcodes:
76     @pytest.fixture(autouse=True)
77     def setup(self, def_config, postcode_table, placex_table, place_postcode_table,
78               load_sql, temp_db_conn):
79         self.conn = temp_db_conn
80         country_info.setup_country_config(def_config)
81         load_sql('functions/postcode_triggers.sql')
82
83         temp_db_conn.execute("""
84             CREATE OR REPLACE FUNCTION token_normalized_postcode(postcode TEXT)
85             RETURNS TEXT AS $$
86               SELECT postcode
87             $$ LANGUAGE sql;
88
89             CREATE OR REPLACE FUNCTION get_country_code(place geometry)
90             RETURNS TEXT AS $$
91               SELECT NULL
92             $$ LANGUAGE sql;
93
94             CREATE OR REPLACE FUNCTION expand_by_meters(geom GEOMETRY, meters FLOAT)
95             RETURNS GEOMETRY AS $$
96               SELECT ST_Envelope(ST_Buffer(geom::geography, meters, 1)::geometry)
97             $$ LANGUAGE sql;
98         """)
99
100     @property
101     def row_set(self):
102         with self.conn.cursor(row_factory=tuple_row) as cur:
103             cur.execute("""SELECT osm_id, country_code, postcode,
104                                   ST_X(centroid), ST_Y(centroid)
105                            FROM location_postcodes""")
106             return {r for r in cur}
107
108     def test_postcodes_empty(self, postcode_update):
109         postcode_update()
110
111         assert not self.row_set
112
113     @pytest.mark.parametrize('in_placex', [True, False])
114     def test_postcodes_add_new_point(self, postcode_update, postcode_row,
115                                      insert_implicit_postcode, in_placex):
116         insert_implicit_postcode(1, 'xx', 'POINT(10 12)', '9486', in_placex)
117         postcode_row('yy', '9486', 99, 34)
118
119         postcode_update()
120
121         assert self.row_set == {(None, 'xx', '9486', 10, 12), }
122
123     def test_postcodes_add_new_area(self, postcode_update, insert_postcode_area):
124         insert_postcode_area(345, 'de', '10445', 23.5, 46.2)
125
126         postcode_update()
127
128         assert self.row_set == {(345, 'de', '10445', 23.5, 46.2)}
129
130     @pytest.mark.parametrize('in_placex', [True, False])
131     def test_postcodes_add_area_and_point(self, postcode_update, insert_postcode_area,
132                                           insert_implicit_postcode, in_placex):
133         insert_implicit_postcode(1, 'xx', 'POINT(10 12)', '10445', in_placex)
134         insert_postcode_area(345, 'xx', '10445', 23.5, 46.2)
135
136         postcode_update()
137
138         assert self.row_set == {(345, 'xx', '10445', 23.5, 46.2)}
139
140     @pytest.mark.parametrize('in_placex', [True, False])
141     def test_postcodes_add_point_within_area(self, postcode_update, insert_postcode_area,
142                                              insert_implicit_postcode, in_placex):
143         insert_implicit_postcode(1, 'xx', 'POINT(23.5 46.2)', '10446', in_placex)
144         insert_postcode_area(345, 'xx', '10445', 23.5, 46.2)
145
146         postcode_update()
147
148         assert self.row_set == {(345, 'xx', '10445', 23.5, 46.2)}
149
150     @pytest.mark.parametrize('coords', [(99, 34), (10, 34), (99, 12),
151                                         (9, 34), (9, 11), (23, 11)])
152     def test_postcodes_replace_coordinates(self, postcode_update, postcode_row, tmp_path,
153                                            insert_implicit_postcode, coords):
154         insert_implicit_postcode(1, 'xx', 'POINT(10 12)', 'AB 4511')
155         postcode_row('xx', 'AB 4511', *coords)
156
157         postcode_update(tmp_path)
158
159         assert self.row_set == {(None, 'xx', 'AB 4511', 10, 12)}
160
161     def test_postcodes_replace_coordinates_close(self, postcode_update, postcode_row,
162                                                  insert_implicit_postcode):
163         insert_implicit_postcode(1, 'xx', 'POINT(10 12)', 'AB 4511')
164         postcode_row('xx', 'AB 4511', 10, 11.99999999)
165
166         postcode_update()
167
168         assert self.row_set == {(None, 'xx', 'AB 4511', 10, 11.99999999)}
169
170     def test_postcodes_remove_point(self, postcode_update, postcode_row,
171                                     insert_implicit_postcode):
172         insert_implicit_postcode(1, 'xx', 'POINT(10 12)', 'AB 4511')
173         postcode_row('xx', 'badname', 10, 12)
174
175         postcode_update()
176
177         assert self.row_set == {(None, 'xx', 'AB 4511', 10, 12)}
178
179     def test_postcodes_ignore_empty_country(self, postcode_update, insert_implicit_postcode):
180         insert_implicit_postcode(1, None, 'POINT(10 12)', 'AB 4511')
181         postcode_update()
182         assert not self.row_set
183
184     def test_postcodes_remove_all(self, postcode_update, postcode_row, place_postcode_table):
185         postcode_row('ch', '5613', 10, 12)
186         postcode_update()
187
188         assert not self.row_set
189
190     def test_postcodes_multi_country(self, postcode_update,
191                                      insert_implicit_postcode):
192         insert_implicit_postcode(1, 'de', 'POINT(10 12)', '54451')
193         insert_implicit_postcode(2, 'cc', 'POINT(100 56)', 'DD23 T')
194         insert_implicit_postcode(3, 'de', 'POINT(10.3 11.0)', '54452')
195         insert_implicit_postcode(4, 'cc', 'POINT(10.3 11.0)', '54452')
196
197         postcode_update()
198
199         assert self.row_set == {(None, 'de', '54451', 10, 12),
200                                 (None, 'de', '54452', 10.3, 11.0),
201                                 (None, 'cc', '54452', 10.3, 11.0),
202                                 (None, 'cc', 'DD23 T', 100, 56)}
203
204     @pytest.mark.parametrize("gzipped", [True, False])
205     def test_postcodes_extern_jsonl(self, postcode_update, tmp_path,
206                                     insert_implicit_postcode, gzipped):
207         insert_implicit_postcode(1, 'xx', 'POINT(10 12)', 'AB 4511')
208
209         extfile = tmp_path / 'xx_postcodes_geometry.jsonl'
210         extfile.write_text(
211             json.dumps({'properties': {'postcode': 'CD 4511'},
212                         # Centroid : 0.5, 0.5
213                         'geometry': {'type': 'Polygon', 'coordinates': [[
214                             [0, 0], [0, 1], [1, 1], [1, 0], [0, 0]]]}}) + '\n' +
215             json.dumps({'properties': {'postcode': '822114', 'lat': 0.1, 'lon': 0.2},
216                         'geometry': {'type': 'Polygon', 'coordinates': [[
217                             [0, 0], [0, 1], [1, 1], [1, 0], [0, 0]]]}}) + '\n', encoding='utf-8')
218
219         if gzipped:
220             subprocess.run(['gzip', str(extfile)])
221             assert not extfile.is_file()
222
223         postcode_update(tmp_path)
224
225         assert self.row_set == {(None, 'xx', 'AB 4511', 10, 12),
226                                 (None, 'xx', 'CD 4511', 0.5, 0.5),
227                                 (None, 'xx', '822114', 0.2, 0.1)}
228
229     def test_postcodes_extern_jsonl_invalid_data(self, postcode_update, tmp_path,
230                                                  insert_implicit_postcode):
231         insert_implicit_postcode(1, 'xx', 'POINT(10 12)', 'AB 4511')
232
233         extfile = tmp_path / 'xx_postcodes_geometry.jsonl'
234         extfile.write_text(
235             # Invalid JSON
236             '{"geometry": {"type": "Polygon"}, "properties": {"postcode": "BAD"}\n'
237             # Missing geometry
238             + json.dumps({'properties': {'postcode': 'NOGEOM'}}) + '\n'
239             # Invalid geometry type
240             + json.dumps({'geometry': {'type': 'Point', 'coordinates': [0, 0]},
241                           'properties': {'postcode': 'BADGEOM'}}) + '\n'
242             # Missing postcode
243             + json.dumps({'properties': {'lat': 0, 'lon': 0},
244                           'geometry': {'type': 'Polygon', 'coordinates': [[
245                               [0, 0], [0, 1], [1, 1], [1, 0], [0, 0]]]}}) + '\n'
246             # Valid one
247             + json.dumps({'geometry': {'type': 'Polygon', 'coordinates': [[
248                 [0, 0], [0, 1], [1, 1], [1, 0], [0, 0]]]},
249                 'properties': {'postcode': 'GOOD'}}) + '\n', encoding='utf-8')
250
251         postcode_update(tmp_path)
252
253         assert self.row_set == {(None, 'xx', 'AB 4511', 10, 12),
254                                 (None, 'xx', 'GOOD', 0.5, 0.5)}
255
256     def test_postcodes_extern_jsonl_overwrite_past_import(self, postcode_update, tmp_path,
257                                                           postcode_row, country_row):
258         country_row(country="xx", names={"name": "xx"})
259         postcode_row('xx', '822114', 83.8, 24.1, True)  # area from past geometry import
260         postcode_row('xx', '110000', 77.2, 28.6, True)
261
262         extfile = tmp_path / 'xx_postcodes_geometry.jsonl'
263         extfile.write_text(
264             # overwrites past postcode
265             json.dumps({'properties': {'postcode': '822114'},
266                         'geometry': {'type': 'Polygon', 'coordinates': [[
267                             [0, 0], [0, 1], [1, 1], [1, 0], [0, 0]]]}}) + '\n', encoding='utf-8')
268
269         postcode_update(tmp_path)
270
271         # pc 110000 no longer exist in import file, thus deleted
272         assert self.row_set == {(None, 'xx', '822114', 0.5, 0.5)}
273
274     @pytest.mark.parametrize("gzipped", [True, False])
275     def test_postcodes_extern_csv(self, postcode_update, tmp_path,
276                                   insert_implicit_postcode, gzipped):
277         insert_implicit_postcode(1, 'xx', 'POINT(10 12)', 'AB 4511')
278
279         extfile = tmp_path / 'xx_postcodes.csv'
280         extfile.write_text("postcode,lat,lon\nAB 4511,-4,-1\nCD 4511,-5, -10", encoding='utf-8')
281
282         if gzipped:
283             subprocess.run(['gzip', str(extfile)])
284             assert not extfile.is_file()
285
286         postcode_update(tmp_path)
287
288         assert self.row_set == {(None, 'xx', 'AB 4511', 10, 12),
289                                 (None, 'xx', 'CD 4511', -10, -5)}
290
291     def test_postcodes_extern_csv_bad_column(self, postcode_update, tmp_path,
292                                              insert_implicit_postcode):
293         insert_implicit_postcode(1, 'xx', 'POINT(10 12)', 'AB 4511')
294
295         extfile = tmp_path / 'xx_postcodes.csv'
296         extfile.write_text("postode,lat,lon\nAB 4511,-4,-1\nCD 4511,-5, -10", encoding='utf-8')
297
298         postcode_update(tmp_path)
299
300         assert self.row_set == {(None, 'xx', 'AB 4511', 10, 12)}
301
302     def test_postcodes_extern_csv_bad_number(self, postcode_update, insert_implicit_postcode,
303                                              tmp_path):
304         insert_implicit_postcode(1, 'xx', 'POINT(10 12)', 'AB 4511')
305
306         extfile = tmp_path / 'xx_postcodes.csv'
307         extfile.write_text(
308             "postcode,lat,lon\nXX 4511,-4,NaN\nCD 4511,-5, -10\n34,200,0", encoding='utf-8')
309
310         postcode_update(tmp_path)
311
312         assert self.row_set == {(None, 'xx', 'AB 4511', 10, 12),
313                                 (None, 'xx', 'CD 4511', -10, -5)}
314
315     def test_postcodes_import_precedence(self, postcode_update, tmp_path,
316                                          insert_implicit_postcode, insert_postcode_area):
317         # osm area precedes all external imoprts
318         insert_postcode_area(3, 'xx', '110000', 77.2, 28.6)
319         # guessed postcode area from osm points, should be overwritten by geometry import
320         insert_implicit_postcode(1, 'xx', 'POINT(10 12)', '822114')
321         insert_implicit_postcode(2, 'xx', 'POINT(0 12)', '822114')
322
323         josnlfile = tmp_path / 'xx_postcodes_geometry.jsonl'
324         josnlfile.write_text(
325             # ignored, osm area takes precedence over geometry import
326             json.dumps({'properties': {'postcode': '110000', 'lat': 0.1, 'lon': 0.2},
327                         'geometry': {'type': 'Polygon', 'coordinates': [[
328                             [0, 0], [0, 1], [1, 1], [1, 0], [0, 0]]]}}) + '\n' +
329             # geometry import takes precedence over guessed postcode from osm points
330             json.dumps({'properties': {'postcode': '822114'},
331                         'geometry': {'type': 'Polygon', 'coordinates': [[
332                             [0, 0], [0, 10], [10, 10], [10, 0], [0, 0]]]}}) + '\n' +
333             # geometry import takes precedence over csv import
334             json.dumps({'properties': {'postcode': '822115'},
335                         'geometry': {'type': 'Polygon', 'coordinates': [[
336                             [0, 0], [0, 2], [2, 2], [2, 0], [0, 0]]]}}) + '\n', encoding='utf-8')
337
338         csvfile = tmp_path / 'xx_postcodes.csv'
339         csvfile.write_text(
340             "postcode,lat,lon\n"
341             "822115,0.2,0.2\n"  # ignored, geometry import takes precedence over csv import
342             "822116,-5,-10",  # added
343             encoding='utf-8')
344
345         postcode_update(tmp_path)
346
347         assert self.row_set == {(None, 'xx', '822114', 5, 5),
348                                 (None, 'xx', '822115', 1, 1),
349                                 (3, 'xx', '110000', 77.2, 28.6),
350                                 (None, 'xx', '822116', -10, -5)}
351
352     def test_no_placex_entry(self, postcode_update, temp_db_cursor, place_postcode_row):
353         # Rewrite the get_country_code function to verify its execution.
354         temp_db_cursor.execute("""
355             CREATE OR REPLACE FUNCTION get_country_code(place geometry) RETURNS TEXT AS $$
356               SELECT 'yy' $$ LANGUAGE sql""")
357         place_postcode_row(centroid='POINT(10 12)', postcode='AB 4511')
358         postcode_update()
359
360         assert self.row_set == {(None, 'yy', 'AB 4511', 10, 12)}
361
362     def test_discard_badly_formatted_postcodes(self, postcode_update, place_postcode_row):
363         place_postcode_row(centroid='POINT(10 12)', country='fr', postcode='AB 4511')
364         postcode_update()
365
366         assert not self.row_set
367
368
369 def test_can_compute(dsn, table_factory):
370     assert not postcodes.can_compute(dsn)
371     table_factory('place_postcode')
372     assert postcodes.can_compute(dsn)