]> git.openstreetmap.org Git - nominatim.git/blob - test/python/tools/test_postcodes.py
release 5.2.0.post7
[nominatim.git] / test / python / tools / test_postcodes.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2025 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Tests for functions to maintain the artificial postcode table.
9 """
10 import subprocess
11
12 import pytest
13
14 from nominatim_db.tools import postcodes
15 from nominatim_db.data import country_info
16 from nominatim_db.db.sql_preprocessor import SQLPreprocessor
17
18 import dummy_tokenizer
19
20
21 class MockPostcodeTable:
22     """ A location_postcodes table for testing.
23     """
24     def __init__(self, conn, config):
25         self.conn = conn
26         SQLPreprocessor(conn, config).run_sql_file(conn, 'functions/postcode_triggers.sql')
27         with conn.cursor() as cur:
28             cur.execute("""CREATE TABLE location_postcodes (
29                                place_id BIGINT,
30                                osm_id BIGINT,
31                                parent_place_id BIGINT,
32                                rank_search SMALLINT,
33                                indexed_status SMALLINT,
34                                indexed_date TIMESTAMP,
35                                country_code varchar(2),
36                                postcode TEXT,
37                                geometry GEOMETRY(Geometry, 4326),
38                                centroid GEOMETRY(Point, 4326))""")
39             cur.execute("""CREATE OR REPLACE FUNCTION token_normalized_postcode(postcode TEXT)
40                            RETURNS TEXT AS $$ BEGIN RETURN postcode; END; $$ LANGUAGE plpgsql;
41
42                            CREATE OR REPLACE FUNCTION get_country_code(place geometry)
43                            RETURNS TEXT AS $$ BEGIN
44                            RETURN null;
45                            END; $$ LANGUAGE plpgsql;
46                         """)
47             cur.execute("""CREATE OR REPLACE FUNCTION expand_by_meters(geom GEOMETRY, meters FLOAT)
48                            RETURNS GEOMETRY AS $$
49                            SELECT ST_Envelope(ST_Buffer(geom::geography, meters, 1)::geometry)
50                            $$ LANGUAGE sql;""")
51
52         conn.commit()
53
54     def add(self, country, postcode, x, y):
55         with self.conn.cursor() as cur:
56             cur.execute(
57                 """INSERT INTO location_postcodes
58                        (place_id, indexed_status, country_code, postcode, centroid, geometry)
59                      VALUES (nextval('seq_place'), 1, %(cc)s, %(pc)s,
60                              ST_SetSRID(ST_MakePoint(%(x)s, %(y)s), 4326),
61                              ST_Expand(ST_SetSRID(ST_MakePoint(%(x)s, %(y)s), 4326), 0.005))""",
62                 {'cc': country, 'pc': postcode, 'x': x, 'y': y})
63
64         self.conn.commit()
65
66     @property
67     def row_set(self):
68         with self.conn.cursor() as cur:
69             cur.execute("""SELECT osm_id, country_code, postcode,
70                                   ST_X(centroid), ST_Y(centroid)
71                            FROM location_postcodes""")
72             return set((tuple(row) for row in cur))
73
74
75 @pytest.fixture
76 def postcode_table(def_config, temp_db_conn, placex_table, table_factory):
77     country_info.setup_country_config(def_config)
78     table_factory('country_name', 'partition INT', ((0, ), (1, ), (2, )))
79     return MockPostcodeTable(temp_db_conn, def_config)
80
81
82 @pytest.fixture
83 def insert_implicit_postcode(placex_table, place_postcode_row):
84     """ Insert data into the placex and place table
85         which can then be used to compute one postcode.
86     """
87     def _insert_implicit_postcode(osm_id, country, geometry, postcode, in_placex=False):
88         if in_placex:
89             placex_table.add(osm_id=osm_id, country=country, geom=geometry,
90                              centroid=f'SRID=4326;{geometry}',
91                              address={'postcode': postcode})
92         else:
93             place_postcode_row(osm_id=osm_id, centroid=geometry,
94                                country=country, postcode=postcode)
95
96     return _insert_implicit_postcode
97
98
99 @pytest.fixture
100 def insert_postcode_area(place_postcode_row):
101     """ Insert an area around a centroid to the postcode table.
102     """
103     def _do(osm_id, country, postcode, x, y):
104         x1, x2, y1, y2 = x - 0.001, x + 0.001, y - 0.001, y + 0.001
105         place_postcode_row(osm_type='R', osm_id=osm_id, postcode=postcode, country=country,
106                            centroid=f"POINT({x} {y})",
107                            geom=f"POLYGON(({x1} {y1}, {x1} {y2}, {x2} {y2}, {x2} {y1}, {x1} {y1}))")
108
109     return _do
110
111
112 @pytest.fixture
113 def postcode_update(dsn, temp_db_conn):
114     tokenizer = dummy_tokenizer.DummyTokenizer(None)
115
116     def _do(data_path=None):
117         with temp_db_conn.cursor() as cur:
118             cur.execute("""CREATE TRIGGER location_postcodes_before_update
119                             BEFORE UPDATE ON location_postcodes
120                             FOR EACH ROW EXECUTE PROCEDURE postcodes_update()""")
121             cur.execute("""CREATE TRIGGER location_postcodes_before_delete
122                             BEFORE DELETE ON location_postcodes
123                             FOR EACH ROW EXECUTE PROCEDURE postcodes_delete()""")
124             cur.execute("""CREATE TRIGGER location_postcodes_before_insert
125                             BEFORE INSERT ON location_postcodes
126                             FOR EACH ROW EXECUTE PROCEDURE postcodes_insert()""")
127         temp_db_conn.commit()
128
129         postcodes.update_postcodes(dsn, data_path, tokenizer)
130
131     return _do
132
133
134 def test_postcodes_empty(postcode_update, postcode_table, place_postcode_table):
135     postcode_update()
136
137     assert not postcode_table.row_set
138
139
140 @pytest.mark.parametrize('in_placex', [True, False])
141 def test_postcodes_add_new_point(postcode_update, postcode_table,
142                                  insert_implicit_postcode, in_placex):
143     insert_implicit_postcode(1, 'xx', 'POINT(10 12)', '9486', in_placex)
144     postcode_table.add('yy', '9486', 99, 34)
145
146     postcode_update()
147
148     assert postcode_table.row_set == {(None, 'xx', '9486', 10, 12), }
149
150
151 def test_postcodes_add_new_area(postcode_update, insert_postcode_area, postcode_table):
152     insert_postcode_area(345, 'de', '10445', 23.5, 46.2)
153
154     postcode_update()
155
156     assert postcode_table.row_set == {(345, 'de', '10445', 23.5, 46.2)}
157
158
159 @pytest.mark.parametrize('in_placex', [True, False])
160 def test_postcodes_add_area_and_point(postcode_update, insert_postcode_area,
161                                       insert_implicit_postcode, postcode_table, in_placex):
162     insert_implicit_postcode(1, 'xx', 'POINT(10 12)', '10445', in_placex)
163     insert_postcode_area(345, 'xx', '10445', 23.5, 46.2)
164
165     postcode_update()
166
167     assert postcode_table.row_set == {(345, 'xx', '10445', 23.5, 46.2)}
168
169
170 @pytest.mark.parametrize('in_placex', [True, False])
171 def test_postcodes_add_point_within_area(postcode_update, insert_postcode_area,
172                                          insert_implicit_postcode, postcode_table, in_placex):
173     insert_implicit_postcode(1, 'xx', 'POINT(23.5 46.2)', '10446', in_placex)
174     insert_postcode_area(345, 'xx', '10445', 23.5, 46.2)
175
176     postcode_update()
177
178     assert postcode_table.row_set == {(345, 'xx', '10445', 23.5, 46.2)}
179
180
181 @pytest.mark.parametrize('coords', [(99, 34), (10, 34), (99, 12),
182                                     (9, 34), (9, 11), (23, 11)])
183 def test_postcodes_replace_coordinates(postcode_update, postcode_table, tmp_path,
184                                        insert_implicit_postcode, coords):
185     insert_implicit_postcode(1, 'xx', 'POINT(10 12)', 'AB 4511')
186     postcode_table.add('xx', 'AB 4511', *coords)
187
188     postcode_update(tmp_path)
189
190     assert postcode_table.row_set == {(None, 'xx', 'AB 4511', 10, 12)}
191
192
193 def test_postcodes_replace_coordinates_close(postcode_update, postcode_table,
194                                              insert_implicit_postcode):
195     insert_implicit_postcode(1, 'xx', 'POINT(10 12)', 'AB 4511')
196     postcode_table.add('xx', 'AB 4511', 10, 11.99999999)
197
198     postcode_update()
199
200     assert postcode_table.row_set == {(None, 'xx', 'AB 4511', 10, 11.99999999)}
201
202
203 def test_postcodes_remove_point(postcode_update, postcode_table,
204                                 insert_implicit_postcode):
205     insert_implicit_postcode(1, 'xx', 'POINT(10 12)', 'AB 4511')
206     postcode_table.add('xx', 'badname', 10, 12)
207
208     postcode_update()
209
210     assert postcode_table.row_set == {(None, 'xx', 'AB 4511', 10, 12)}
211
212
213 def test_postcodes_ignore_empty_country(postcode_update, postcode_table,
214                                         insert_implicit_postcode):
215     insert_implicit_postcode(1, None, 'POINT(10 12)', 'AB 4511')
216     postcode_update()
217     assert not postcode_table.row_set
218
219
220 def test_postcodes_remove_all(postcode_update, postcode_table, place_postcode_table):
221     postcode_table.add('ch', '5613', 10, 12)
222     postcode_update()
223
224     assert not postcode_table.row_set
225
226
227 def test_postcodes_multi_country(postcode_update, postcode_table,
228                                  insert_implicit_postcode):
229     insert_implicit_postcode(1, 'de', 'POINT(10 12)', '54451')
230     insert_implicit_postcode(2, 'cc', 'POINT(100 56)', 'DD23 T')
231     insert_implicit_postcode(3, 'de', 'POINT(10.3 11.0)', '54452')
232     insert_implicit_postcode(4, 'cc', 'POINT(10.3 11.0)', '54452')
233
234     postcode_update()
235
236     assert postcode_table.row_set == {(None, 'de', '54451', 10, 12),
237                                       (None, 'de', '54452', 10.3, 11.0),
238                                       (None, 'cc', '54452', 10.3, 11.0),
239                                       (None, 'cc', 'DD23 T', 100, 56)}
240
241
242 @pytest.mark.parametrize("gzipped", [True, False])
243 def test_postcodes_extern(postcode_update, postcode_table, tmp_path,
244                           insert_implicit_postcode, gzipped):
245     insert_implicit_postcode(1, 'xx', 'POINT(10 12)', 'AB 4511')
246
247     extfile = tmp_path / 'xx_postcodes.csv'
248     extfile.write_text("postcode,lat,lon\nAB 4511,-4,-1\nCD 4511,-5, -10")
249
250     if gzipped:
251         subprocess.run(['gzip', str(extfile)])
252         assert not extfile.is_file()
253
254     postcode_update(tmp_path)
255
256     assert postcode_table.row_set == {(None, 'xx', 'AB 4511', 10, 12),
257                                       (None, 'xx', 'CD 4511', -10, -5)}
258
259
260 def test_postcodes_extern_bad_column(postcode_update, postcode_table, tmp_path,
261                                      insert_implicit_postcode):
262     insert_implicit_postcode(1, 'xx', 'POINT(10 12)', 'AB 4511')
263
264     extfile = tmp_path / 'xx_postcodes.csv'
265     extfile.write_text("postode,lat,lon\nAB 4511,-4,-1\nCD 4511,-5, -10")
266
267     postcode_update(tmp_path)
268
269     assert postcode_table.row_set == {(None, 'xx', 'AB 4511', 10, 12)}
270
271
272 def test_postcodes_extern_bad_number(postcode_update, insert_implicit_postcode,
273                                      postcode_table, tmp_path):
274     insert_implicit_postcode(1, 'xx', 'POINT(10 12)', 'AB 4511')
275
276     extfile = tmp_path / 'xx_postcodes.csv'
277     extfile.write_text("postcode,lat,lon\nXX 4511,-4,NaN\nCD 4511,-5, -10\n34,200,0")
278
279     postcode_update(tmp_path)
280
281     assert postcode_table.row_set == {(None, 'xx', 'AB 4511', 10, 12),
282                                       (None, 'xx', 'CD 4511', -10, -5)}
283
284
285 def test_can_compute(dsn, table_factory):
286     assert not postcodes.can_compute(dsn)
287     table_factory('place_postcode')
288     assert postcodes.can_compute(dsn)
289
290
291 def test_no_placex_entry(postcode_update, temp_db_cursor, place_postcode_row, postcode_table):
292     # Rewrite the get_country_code function to verify its execution.
293     temp_db_cursor.execute("""
294         CREATE OR REPLACE FUNCTION get_country_code(place geometry)
295         RETURNS TEXT AS $$ BEGIN
296         RETURN 'yy';
297         END; $$ LANGUAGE plpgsql;
298     """)
299     place_postcode_row(centroid='POINT(10 12)', postcode='AB 4511')
300     postcode_update()
301
302     assert postcode_table.row_set == {(None, 'yy', 'AB 4511', 10, 12)}
303
304
305 def test_discard_badly_formatted_postcodes(postcode_update, place_postcode_row, postcode_table):
306     place_postcode_row(centroid='POINT(10 12)', country='fr', postcode='AB 4511')
307     postcode_update()
308
309     assert not postcode_table.row_set