]> git.openstreetmap.org Git - nominatim.git/blob - test/python/tools/test_postcodes.py
reorganise fixtures for placex table
[nominatim.git] / test / python / tools / test_postcodes.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2026 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Tests for functions to maintain the artificial postcode table.
9 """
10 import subprocess
11
12 import pytest
13
14 from nominatim_db.tools import postcodes
15 from nominatim_db.data import country_info
16 from nominatim_db.db.sql_preprocessor import SQLPreprocessor
17
18 import dummy_tokenizer
19
20
21 class MockPostcodeTable:
22     """ A location_postcodes table for testing.
23     """
24     def __init__(self, conn, config):
25         self.conn = conn
26         SQLPreprocessor(conn, config).run_sql_file(conn, 'functions/postcode_triggers.sql')
27         with conn.cursor() as cur:
28             cur.execute("""CREATE TABLE location_postcodes (
29                                place_id BIGINT,
30                                osm_id BIGINT,
31                                parent_place_id BIGINT,
32                                rank_search SMALLINT,
33                                indexed_status SMALLINT,
34                                indexed_date TIMESTAMP,
35                                country_code varchar(2),
36                                postcode TEXT,
37                                geometry GEOMETRY(Geometry, 4326),
38                                centroid GEOMETRY(Point, 4326))""")
39             cur.execute("""CREATE OR REPLACE FUNCTION token_normalized_postcode(postcode TEXT)
40                            RETURNS TEXT AS $$ BEGIN RETURN postcode; END; $$ LANGUAGE plpgsql;
41
42                            CREATE OR REPLACE FUNCTION get_country_code(place geometry)
43                            RETURNS TEXT AS $$ BEGIN
44                            RETURN null;
45                            END; $$ LANGUAGE plpgsql;
46                         """)
47             cur.execute("""CREATE OR REPLACE FUNCTION expand_by_meters(geom GEOMETRY, meters FLOAT)
48                            RETURNS GEOMETRY AS $$
49                            SELECT ST_Envelope(ST_Buffer(geom::geography, meters, 1)::geometry)
50                            $$ LANGUAGE sql;""")
51
52         conn.commit()
53
54     def add(self, country, postcode, x, y):
55         with self.conn.cursor() as cur:
56             cur.execute(
57                 """INSERT INTO location_postcodes
58                        (place_id, indexed_status, country_code, postcode, centroid, geometry)
59                      VALUES (nextval('seq_place'), 1, %(cc)s, %(pc)s,
60                              ST_SetSRID(ST_MakePoint(%(x)s, %(y)s), 4326),
61                              ST_Expand(ST_SetSRID(ST_MakePoint(%(x)s, %(y)s), 4326), 0.005))""",
62                 {'cc': country, 'pc': postcode, 'x': x, 'y': y})
63
64         self.conn.commit()
65
66     @property
67     def row_set(self):
68         with self.conn.cursor() as cur:
69             cur.execute("""SELECT osm_id, country_code, postcode,
70                                   ST_X(centroid), ST_Y(centroid)
71                            FROM location_postcodes""")
72             return set((tuple(row) for row in cur))
73
74
75 @pytest.fixture
76 def postcode_table(def_config, temp_db_conn, placex_table, table_factory):
77     country_info.setup_country_config(def_config)
78     return MockPostcodeTable(temp_db_conn, def_config)
79
80
81 @pytest.fixture
82 def insert_implicit_postcode(placex_row, place_postcode_row):
83     """ Insert data into the placex and place table
84         which can then be used to compute one postcode.
85     """
86     def _insert_implicit_postcode(osm_id, country, geometry, postcode, in_placex=False):
87         if in_placex:
88             placex_row(osm_id=osm_id, country=country, geom=geometry,
89                        centroid=geometry, address={'postcode': postcode})
90         else:
91             place_postcode_row(osm_id=osm_id, centroid=geometry,
92                                country=country, postcode=postcode)
93
94     return _insert_implicit_postcode
95
96
97 @pytest.fixture
98 def insert_postcode_area(place_postcode_row):
99     """ Insert an area around a centroid to the postcode table.
100     """
101     def _do(osm_id, country, postcode, x, y):
102         x1, x2, y1, y2 = x - 0.001, x + 0.001, y - 0.001, y + 0.001
103         place_postcode_row(osm_type='R', osm_id=osm_id, postcode=postcode, country=country,
104                            centroid=f"POINT({x} {y})",
105                            geom=f"POLYGON(({x1} {y1}, {x1} {y2}, {x2} {y2}, {x2} {y1}, {x1} {y1}))")
106
107     return _do
108
109
110 @pytest.fixture
111 def postcode_update(dsn, temp_db_conn):
112     tokenizer = dummy_tokenizer.DummyTokenizer(None)
113
114     def _do(data_path=None):
115         with temp_db_conn.cursor() as cur:
116             cur.execute("""CREATE TRIGGER location_postcodes_before_update
117                             BEFORE UPDATE ON location_postcodes
118                             FOR EACH ROW EXECUTE PROCEDURE postcodes_update()""")
119             cur.execute("""CREATE TRIGGER location_postcodes_before_delete
120                             BEFORE DELETE ON location_postcodes
121                             FOR EACH ROW EXECUTE PROCEDURE postcodes_delete()""")
122             cur.execute("""CREATE TRIGGER location_postcodes_before_insert
123                             BEFORE INSERT ON location_postcodes
124                             FOR EACH ROW EXECUTE PROCEDURE postcodes_insert()""")
125         temp_db_conn.commit()
126
127         postcodes.update_postcodes(dsn, data_path, tokenizer)
128
129     return _do
130
131
132 def test_postcodes_empty(postcode_update, postcode_table, place_postcode_table):
133     postcode_update()
134
135     assert not postcode_table.row_set
136
137
138 @pytest.mark.parametrize('in_placex', [True, False])
139 def test_postcodes_add_new_point(postcode_update, postcode_table,
140                                  insert_implicit_postcode, in_placex):
141     insert_implicit_postcode(1, 'xx', 'POINT(10 12)', '9486', in_placex)
142     postcode_table.add('yy', '9486', 99, 34)
143
144     postcode_update()
145
146     assert postcode_table.row_set == {(None, 'xx', '9486', 10, 12), }
147
148
149 def test_postcodes_add_new_area(postcode_update, insert_postcode_area, postcode_table):
150     insert_postcode_area(345, 'de', '10445', 23.5, 46.2)
151
152     postcode_update()
153
154     assert postcode_table.row_set == {(345, 'de', '10445', 23.5, 46.2)}
155
156
157 @pytest.mark.parametrize('in_placex', [True, False])
158 def test_postcodes_add_area_and_point(postcode_update, insert_postcode_area,
159                                       insert_implicit_postcode, postcode_table, in_placex):
160     insert_implicit_postcode(1, 'xx', 'POINT(10 12)', '10445', in_placex)
161     insert_postcode_area(345, 'xx', '10445', 23.5, 46.2)
162
163     postcode_update()
164
165     assert postcode_table.row_set == {(345, 'xx', '10445', 23.5, 46.2)}
166
167
168 @pytest.mark.parametrize('in_placex', [True, False])
169 def test_postcodes_add_point_within_area(postcode_update, insert_postcode_area,
170                                          insert_implicit_postcode, postcode_table, in_placex):
171     insert_implicit_postcode(1, 'xx', 'POINT(23.5 46.2)', '10446', in_placex)
172     insert_postcode_area(345, 'xx', '10445', 23.5, 46.2)
173
174     postcode_update()
175
176     assert postcode_table.row_set == {(345, 'xx', '10445', 23.5, 46.2)}
177
178
179 @pytest.mark.parametrize('coords', [(99, 34), (10, 34), (99, 12),
180                                     (9, 34), (9, 11), (23, 11)])
181 def test_postcodes_replace_coordinates(postcode_update, postcode_table, tmp_path,
182                                        insert_implicit_postcode, coords):
183     insert_implicit_postcode(1, 'xx', 'POINT(10 12)', 'AB 4511')
184     postcode_table.add('xx', 'AB 4511', *coords)
185
186     postcode_update(tmp_path)
187
188     assert postcode_table.row_set == {(None, 'xx', 'AB 4511', 10, 12)}
189
190
191 def test_postcodes_replace_coordinates_close(postcode_update, postcode_table,
192                                              insert_implicit_postcode):
193     insert_implicit_postcode(1, 'xx', 'POINT(10 12)', 'AB 4511')
194     postcode_table.add('xx', 'AB 4511', 10, 11.99999999)
195
196     postcode_update()
197
198     assert postcode_table.row_set == {(None, 'xx', 'AB 4511', 10, 11.99999999)}
199
200
201 def test_postcodes_remove_point(postcode_update, postcode_table,
202                                 insert_implicit_postcode):
203     insert_implicit_postcode(1, 'xx', 'POINT(10 12)', 'AB 4511')
204     postcode_table.add('xx', 'badname', 10, 12)
205
206     postcode_update()
207
208     assert postcode_table.row_set == {(None, 'xx', 'AB 4511', 10, 12)}
209
210
211 def test_postcodes_ignore_empty_country(postcode_update, postcode_table,
212                                         insert_implicit_postcode):
213     insert_implicit_postcode(1, None, 'POINT(10 12)', 'AB 4511')
214     postcode_update()
215     assert not postcode_table.row_set
216
217
218 def test_postcodes_remove_all(postcode_update, postcode_table, place_postcode_table):
219     postcode_table.add('ch', '5613', 10, 12)
220     postcode_update()
221
222     assert not postcode_table.row_set
223
224
225 def test_postcodes_multi_country(postcode_update, postcode_table,
226                                  insert_implicit_postcode):
227     insert_implicit_postcode(1, 'de', 'POINT(10 12)', '54451')
228     insert_implicit_postcode(2, 'cc', 'POINT(100 56)', 'DD23 T')
229     insert_implicit_postcode(3, 'de', 'POINT(10.3 11.0)', '54452')
230     insert_implicit_postcode(4, 'cc', 'POINT(10.3 11.0)', '54452')
231
232     postcode_update()
233
234     assert postcode_table.row_set == {(None, 'de', '54451', 10, 12),
235                                       (None, 'de', '54452', 10.3, 11.0),
236                                       (None, 'cc', '54452', 10.3, 11.0),
237                                       (None, 'cc', 'DD23 T', 100, 56)}
238
239
240 @pytest.mark.parametrize("gzipped", [True, False])
241 def test_postcodes_extern(postcode_update, postcode_table, tmp_path,
242                           insert_implicit_postcode, gzipped):
243     insert_implicit_postcode(1, 'xx', 'POINT(10 12)', 'AB 4511')
244
245     extfile = tmp_path / 'xx_postcodes.csv'
246     extfile.write_text("postcode,lat,lon\nAB 4511,-4,-1\nCD 4511,-5, -10", encoding='utf-8')
247
248     if gzipped:
249         subprocess.run(['gzip', str(extfile)])
250         assert not extfile.is_file()
251
252     postcode_update(tmp_path)
253
254     assert postcode_table.row_set == {(None, 'xx', 'AB 4511', 10, 12),
255                                       (None, 'xx', 'CD 4511', -10, -5)}
256
257
258 def test_postcodes_extern_bad_column(postcode_update, postcode_table, tmp_path,
259                                      insert_implicit_postcode):
260     insert_implicit_postcode(1, 'xx', 'POINT(10 12)', 'AB 4511')
261
262     extfile = tmp_path / 'xx_postcodes.csv'
263     extfile.write_text("postode,lat,lon\nAB 4511,-4,-1\nCD 4511,-5, -10", encoding='utf-8')
264
265     postcode_update(tmp_path)
266
267     assert postcode_table.row_set == {(None, 'xx', 'AB 4511', 10, 12)}
268
269
270 def test_postcodes_extern_bad_number(postcode_update, insert_implicit_postcode,
271                                      postcode_table, tmp_path):
272     insert_implicit_postcode(1, 'xx', 'POINT(10 12)', 'AB 4511')
273
274     extfile = tmp_path / 'xx_postcodes.csv'
275     extfile.write_text(
276         "postcode,lat,lon\nXX 4511,-4,NaN\nCD 4511,-5, -10\n34,200,0", encoding='utf-8')
277
278     postcode_update(tmp_path)
279
280     assert postcode_table.row_set == {(None, 'xx', 'AB 4511', 10, 12),
281                                       (None, 'xx', 'CD 4511', -10, -5)}
282
283
284 def test_can_compute(dsn, table_factory):
285     assert not postcodes.can_compute(dsn)
286     table_factory('place_postcode')
287     assert postcodes.can_compute(dsn)
288
289
290 def test_no_placex_entry(postcode_update, temp_db_cursor, place_postcode_row, postcode_table):
291     # Rewrite the get_country_code function to verify its execution.
292     temp_db_cursor.execute("""
293         CREATE OR REPLACE FUNCTION get_country_code(place geometry)
294         RETURNS TEXT AS $$ BEGIN
295         RETURN 'yy';
296         END; $$ LANGUAGE plpgsql;
297     """)
298     place_postcode_row(centroid='POINT(10 12)', postcode='AB 4511')
299     postcode_update()
300
301     assert postcode_table.row_set == {(None, 'yy', 'AB 4511', 10, 12)}
302
303
304 def test_discard_badly_formatted_postcodes(postcode_update, place_postcode_row, postcode_table):
305     place_postcode_row(centroid='POINT(10 12)', country='fr', postcode='AB 4511')
306     postcode_update()
307
308     assert not postcode_table.row_set