]> git.openstreetmap.org Git - nominatim.git/blob - test/python/tools/test_postcodes.py
add support for expanding interpolations on housenumbers
[nominatim.git] / test / python / tools / test_postcodes.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2026 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Tests for functions to maintain the artificial postcode table.
9 """
10 import subprocess
11
12 import pytest
13
14 from psycopg.rows import tuple_row
15
16 from nominatim_db.tools import postcodes
17 from nominatim_db.data import country_info
18
19 import dummy_tokenizer
20
21
22 @pytest.fixture
23 def insert_implicit_postcode(placex_row, place_postcode_row):
24     """ Insert data into the placex and place table
25         which can then be used to compute one postcode.
26     """
27     def _insert_implicit_postcode(osm_id, country, geometry, postcode, in_placex=False):
28         if in_placex:
29             placex_row(osm_id=osm_id, country=country, geom=geometry,
30                        centroid=geometry,
31                        address={'postcode': postcode})
32         else:
33             place_postcode_row(osm_id=osm_id, centroid=geometry,
34                                country=country, postcode=postcode)
35     return _insert_implicit_postcode
36
37
38 @pytest.fixture
39 def insert_postcode_area(place_postcode_row):
40     """ Insert an area around a centroid to the postcode table.
41     """
42     def _do(osm_id, country, postcode, x, y):
43         x1, x2, y1, y2 = x - 0.001, x + 0.001, y - 0.001, y + 0.001
44         place_postcode_row(osm_type='R', osm_id=osm_id, postcode=postcode, country=country,
45                            centroid=f"POINT({x} {y})",
46                            geom=f"POLYGON(({x1} {y1}, {x1} {y2}, {x2} {y2}, {x2} {y1}, {x1} {y1}))")
47     return _do
48
49
50 @pytest.fixture
51 def postcode_update(dsn, temp_db_conn):
52     tokenizer = dummy_tokenizer.DummyTokenizer(None)
53
54     def _do(data_path=None):
55         with temp_db_conn.cursor() as cur:
56             cur.execute("""CREATE TRIGGER location_postcodes_before_update
57                             BEFORE UPDATE ON location_postcodes
58                             FOR EACH ROW EXECUTE PROCEDURE postcodes_update()""")
59             cur.execute("""CREATE TRIGGER location_postcodes_before_delete
60                             BEFORE DELETE ON location_postcodes
61                             FOR EACH ROW EXECUTE PROCEDURE postcodes_delete()""")
62             cur.execute("""CREATE TRIGGER location_postcodes_before_insert
63                             BEFORE INSERT ON location_postcodes
64                             FOR EACH ROW EXECUTE PROCEDURE postcodes_insert()""")
65         temp_db_conn.commit()
66         postcodes.update_postcodes(dsn, data_path, tokenizer)
67     return _do
68
69
70 class TestPostcodes:
71     @pytest.fixture(autouse=True)
72     def setup(self, def_config, postcode_table, placex_table, place_postcode_table,
73               load_sql, temp_db_conn):
74         self.conn = temp_db_conn
75         country_info.setup_country_config(def_config)
76         load_sql('functions/postcode_triggers.sql')
77
78         temp_db_conn.execute("""
79             CREATE OR REPLACE FUNCTION token_normalized_postcode(postcode TEXT)
80             RETURNS TEXT AS $$
81               SELECT postcode
82             $$ LANGUAGE sql;
83
84             CREATE OR REPLACE FUNCTION get_country_code(place geometry)
85             RETURNS TEXT AS $$
86               SELECT NULL
87             $$ LANGUAGE sql;
88
89             CREATE OR REPLACE FUNCTION expand_by_meters(geom GEOMETRY, meters FLOAT)
90             RETURNS GEOMETRY AS $$
91               SELECT ST_Envelope(ST_Buffer(geom::geography, meters, 1)::geometry)
92             $$ LANGUAGE sql;
93         """)
94
95     @property
96     def row_set(self):
97         with self.conn.cursor(row_factory=tuple_row) as cur:
98             cur.execute("""SELECT osm_id, country_code, postcode,
99                                   ST_X(centroid), ST_Y(centroid)
100                            FROM location_postcodes""")
101             return {r for r in cur}
102
103     def test_postcodes_empty(self, postcode_update):
104         postcode_update()
105
106         assert not self.row_set
107
108     @pytest.mark.parametrize('in_placex', [True, False])
109     def test_postcodes_add_new_point(self, postcode_update, postcode_row,
110                                      insert_implicit_postcode, in_placex):
111         insert_implicit_postcode(1, 'xx', 'POINT(10 12)', '9486', in_placex)
112         postcode_row('yy', '9486', 99, 34)
113
114         postcode_update()
115
116         assert self.row_set == {(None, 'xx', '9486', 10, 12), }
117
118     def test_postcodes_add_new_area(self, postcode_update, insert_postcode_area):
119         insert_postcode_area(345, 'de', '10445', 23.5, 46.2)
120
121         postcode_update()
122
123         assert self.row_set == {(345, 'de', '10445', 23.5, 46.2)}
124
125     @pytest.mark.parametrize('in_placex', [True, False])
126     def test_postcodes_add_area_and_point(self, postcode_update, insert_postcode_area,
127                                           insert_implicit_postcode, in_placex):
128         insert_implicit_postcode(1, 'xx', 'POINT(10 12)', '10445', in_placex)
129         insert_postcode_area(345, 'xx', '10445', 23.5, 46.2)
130
131         postcode_update()
132
133         assert self.row_set == {(345, 'xx', '10445', 23.5, 46.2)}
134
135     @pytest.mark.parametrize('in_placex', [True, False])
136     def test_postcodes_add_point_within_area(self, postcode_update, insert_postcode_area,
137                                              insert_implicit_postcode, in_placex):
138         insert_implicit_postcode(1, 'xx', 'POINT(23.5 46.2)', '10446', in_placex)
139         insert_postcode_area(345, 'xx', '10445', 23.5, 46.2)
140
141         postcode_update()
142
143         assert self.row_set == {(345, 'xx', '10445', 23.5, 46.2)}
144
145     @pytest.mark.parametrize('coords', [(99, 34), (10, 34), (99, 12),
146                                         (9, 34), (9, 11), (23, 11)])
147     def test_postcodes_replace_coordinates(self, postcode_update, postcode_row, tmp_path,
148                                            insert_implicit_postcode, coords):
149         insert_implicit_postcode(1, 'xx', 'POINT(10 12)', 'AB 4511')
150         postcode_row('xx', 'AB 4511', *coords)
151
152         postcode_update(tmp_path)
153
154         assert self.row_set == {(None, 'xx', 'AB 4511', 10, 12)}
155
156     def test_postcodes_replace_coordinates_close(self, postcode_update, postcode_row,
157                                                  insert_implicit_postcode):
158         insert_implicit_postcode(1, 'xx', 'POINT(10 12)', 'AB 4511')
159         postcode_row('xx', 'AB 4511', 10, 11.99999999)
160
161         postcode_update()
162
163         assert self.row_set == {(None, 'xx', 'AB 4511', 10, 11.99999999)}
164
165     def test_postcodes_remove_point(self, postcode_update, postcode_row,
166                                     insert_implicit_postcode):
167         insert_implicit_postcode(1, 'xx', 'POINT(10 12)', 'AB 4511')
168         postcode_row('xx', 'badname', 10, 12)
169
170         postcode_update()
171
172         assert self.row_set == {(None, 'xx', 'AB 4511', 10, 12)}
173
174     def test_postcodes_ignore_empty_country(self, postcode_update, insert_implicit_postcode):
175         insert_implicit_postcode(1, None, 'POINT(10 12)', 'AB 4511')
176         postcode_update()
177         assert not self.row_set
178
179     def test_postcodes_remove_all(self, postcode_update, postcode_row, place_postcode_table):
180         postcode_row('ch', '5613', 10, 12)
181         postcode_update()
182
183         assert not self.row_set
184
185     def test_postcodes_multi_country(self, postcode_update,
186                                      insert_implicit_postcode):
187         insert_implicit_postcode(1, 'de', 'POINT(10 12)', '54451')
188         insert_implicit_postcode(2, 'cc', 'POINT(100 56)', 'DD23 T')
189         insert_implicit_postcode(3, 'de', 'POINT(10.3 11.0)', '54452')
190         insert_implicit_postcode(4, 'cc', 'POINT(10.3 11.0)', '54452')
191
192         postcode_update()
193
194         assert self.row_set == {(None, 'de', '54451', 10, 12),
195                                 (None, 'de', '54452', 10.3, 11.0),
196                                 (None, 'cc', '54452', 10.3, 11.0),
197                                 (None, 'cc', 'DD23 T', 100, 56)}
198
199     @pytest.mark.parametrize("gzipped", [True, False])
200     def test_postcodes_extern(self, postcode_update, tmp_path,
201                               insert_implicit_postcode, gzipped):
202         insert_implicit_postcode(1, 'xx', 'POINT(10 12)', 'AB 4511')
203
204         extfile = tmp_path / 'xx_postcodes.csv'
205         extfile.write_text("postcode,lat,lon\nAB 4511,-4,-1\nCD 4511,-5, -10", encoding='utf-8')
206
207         if gzipped:
208             subprocess.run(['gzip', str(extfile)])
209             assert not extfile.is_file()
210
211         postcode_update(tmp_path)
212
213         assert self.row_set == {(None, 'xx', 'AB 4511', 10, 12),
214                                 (None, 'xx', 'CD 4511', -10, -5)}
215
216     def test_postcodes_extern_bad_column(self, postcode_update, tmp_path,
217                                          insert_implicit_postcode):
218         insert_implicit_postcode(1, 'xx', 'POINT(10 12)', 'AB 4511')
219
220         extfile = tmp_path / 'xx_postcodes.csv'
221         extfile.write_text("postode,lat,lon\nAB 4511,-4,-1\nCD 4511,-5, -10", encoding='utf-8')
222
223         postcode_update(tmp_path)
224
225         assert self.row_set == {(None, 'xx', 'AB 4511', 10, 12)}
226
227     def test_postcodes_extern_bad_number(self, postcode_update, insert_implicit_postcode,
228                                          tmp_path):
229         insert_implicit_postcode(1, 'xx', 'POINT(10 12)', 'AB 4511')
230
231         extfile = tmp_path / 'xx_postcodes.csv'
232         extfile.write_text(
233             "postcode,lat,lon\nXX 4511,-4,NaN\nCD 4511,-5, -10\n34,200,0", encoding='utf-8')
234
235         postcode_update(tmp_path)
236
237         assert self.row_set == {(None, 'xx', 'AB 4511', 10, 12),
238                                 (None, 'xx', 'CD 4511', -10, -5)}
239
240     def test_no_placex_entry(self, postcode_update, temp_db_cursor, place_postcode_row):
241         # Rewrite the get_country_code function to verify its execution.
242         temp_db_cursor.execute("""
243             CREATE OR REPLACE FUNCTION get_country_code(place geometry) RETURNS TEXT AS $$
244               SELECT 'yy' $$ LANGUAGE sql""")
245         place_postcode_row(centroid='POINT(10 12)', postcode='AB 4511')
246         postcode_update()
247
248         assert self.row_set == {(None, 'yy', 'AB 4511', 10, 12)}
249
250     def test_discard_badly_formatted_postcodes(self, postcode_update, place_postcode_row):
251         place_postcode_row(centroid='POINT(10 12)', country='fr', postcode='AB 4511')
252         postcode_update()
253
254         assert not self.row_set
255
256
257 def test_can_compute(dsn, table_factory):
258     assert not postcodes.can_compute(dsn)
259     table_factory('place_postcode')
260     assert postcodes.can_compute(dsn)