1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2025 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Tests for functions to maintain the artificial postcode table.
14 from nominatim_db.tools import postcodes
15 from nominatim_db.data import country_info
16 import dummy_tokenizer
19 class MockPostcodeTable:
20 """ A location_postcode table for testing.
22 def __init__(self, conn):
24 with conn.cursor() as cur:
25 cur.execute("""CREATE TABLE location_postcode (
27 parent_place_id BIGINT,
29 rank_address SMALLINT,
30 indexed_status SMALLINT,
31 indexed_date TIMESTAMP,
32 country_code varchar(2),
34 geometry GEOMETRY(Geometry, 4326))""")
35 cur.execute("""CREATE OR REPLACE FUNCTION token_normalized_postcode(postcode TEXT)
36 RETURNS TEXT AS $$ BEGIN RETURN postcode; END; $$ LANGUAGE plpgsql;
38 CREATE OR REPLACE FUNCTION get_country_code(place geometry)
39 RETURNS TEXT AS $$ BEGIN
41 END; $$ LANGUAGE plpgsql;
45 def add(self, country, postcode, x, y):
46 with self.conn.cursor() as cur:
47 cur.execute("""INSERT INTO location_postcode (place_id, indexed_status,
48 country_code, postcode,
50 VALUES (nextval('seq_place'), 1, %s, %s,
51 ST_SetSRID(ST_MakePoint(%s, %s), 4326))""",
52 (country, postcode, x, y))
57 with self.conn.cursor() as cur:
58 cur.execute("""SELECT country_code, postcode,
59 ST_X(geometry), ST_Y(geometry)
60 FROM location_postcode""")
61 return set((tuple(row) for row in cur))
66 return dummy_tokenizer.DummyTokenizer(None)
70 def postcode_table(def_config, temp_db_conn, placex_table):
71 country_info.setup_country_config(def_config)
72 return MockPostcodeTable(temp_db_conn)
76 def insert_implicit_postcode(placex_table, place_row):
78 Inserts data into the placex and place table
79 which can then be used to compute one postcode.
81 def _insert_implicit_postcode(osm_id, country, geometry, address):
82 placex_table.add(osm_id=osm_id, country=country, geom=geometry)
83 place_row(osm_id=osm_id, geom='SRID=4326;'+geometry, address=address)
85 return _insert_implicit_postcode
88 def test_postcodes_empty(dsn, postcode_table, place_table, tokenizer):
89 postcodes.update_postcodes(dsn, None, tokenizer)
91 assert not postcode_table.row_set
94 def test_postcodes_add_new(dsn, postcode_table, insert_implicit_postcode, tokenizer):
95 insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='9486'))
96 postcode_table.add('yy', '9486', 99, 34)
98 postcodes.update_postcodes(dsn, None, tokenizer)
100 assert postcode_table.row_set == {('xx', '9486', 10, 12), }
103 @pytest.mark.parametrize('coords', [(99, 34), (10, 34), (99, 12),
104 (9, 34), (9, 11), (23, 11)])
105 def test_postcodes_replace_coordinates(dsn, postcode_table, tmp_path,
106 insert_implicit_postcode, tokenizer, coords):
107 insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='AB 4511'))
108 postcode_table.add('xx', 'AB 4511', *coords)
110 postcodes.update_postcodes(dsn, tmp_path, tokenizer)
112 assert postcode_table.row_set == {('xx', 'AB 4511', 10, 12)}
115 def test_postcodes_replace_coordinates_close(dsn, postcode_table,
116 insert_implicit_postcode, tokenizer):
117 insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='AB 4511'))
118 postcode_table.add('xx', 'AB 4511', 10, 11.99999999)
120 postcodes.update_postcodes(dsn, None, tokenizer)
122 assert postcode_table.row_set == {('xx', 'AB 4511', 10, 11.99999999)}
125 def test_postcodes_remove(dsn, postcode_table,
126 insert_implicit_postcode, tokenizer):
127 insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='AB 4511'))
128 postcode_table.add('xx', 'badname', 10, 12)
130 postcodes.update_postcodes(dsn, None, tokenizer)
132 assert postcode_table.row_set == {('xx', 'AB 4511', 10, 12)}
135 def test_postcodes_ignore_empty_country(dsn, postcode_table,
136 insert_implicit_postcode, tokenizer):
137 insert_implicit_postcode(1, None, 'POINT(10 12)', dict(postcode='AB 4511'))
138 postcodes.update_postcodes(dsn, None, tokenizer)
139 assert not postcode_table.row_set
142 def test_postcodes_remove_all(dsn, postcode_table, place_table, tokenizer):
143 postcode_table.add('ch', '5613', 10, 12)
144 postcodes.update_postcodes(dsn, None, tokenizer)
146 assert not postcode_table.row_set
149 def test_postcodes_multi_country(dsn, postcode_table,
150 insert_implicit_postcode, tokenizer):
151 insert_implicit_postcode(1, 'de', 'POINT(10 12)', dict(postcode='54451'))
152 insert_implicit_postcode(2, 'cc', 'POINT(100 56)', dict(postcode='DD23 T'))
153 insert_implicit_postcode(3, 'de', 'POINT(10.3 11.0)', dict(postcode='54452'))
154 insert_implicit_postcode(4, 'cc', 'POINT(10.3 11.0)', dict(postcode='54452'))
156 postcodes.update_postcodes(dsn, None, tokenizer)
158 assert postcode_table.row_set == {('de', '54451', 10, 12),
159 ('de', '54452', 10.3, 11.0),
160 ('cc', '54452', 10.3, 11.0),
161 ('cc', 'DD23 T', 100, 56)}
164 @pytest.mark.parametrize("gzipped", [True, False])
165 def test_postcodes_extern(dsn, postcode_table, tmp_path,
166 insert_implicit_postcode, tokenizer, gzipped):
167 insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='AB 4511'))
169 extfile = tmp_path / 'xx_postcodes.csv'
170 extfile.write_text("postcode,lat,lon\nAB 4511,-4,-1\nCD 4511,-5, -10")
173 subprocess.run(['gzip', str(extfile)])
174 assert not extfile.is_file()
176 postcodes.update_postcodes(dsn, tmp_path, tokenizer)
178 assert postcode_table.row_set == {('xx', 'AB 4511', 10, 12),
179 ('xx', 'CD 4511', -10, -5)}
182 def test_postcodes_extern_bad_column(dsn, postcode_table, tmp_path,
183 insert_implicit_postcode, tokenizer):
184 insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='AB 4511'))
186 extfile = tmp_path / 'xx_postcodes.csv'
187 extfile.write_text("postode,lat,lon\nAB 4511,-4,-1\nCD 4511,-5, -10")
189 postcodes.update_postcodes(dsn, tmp_path, tokenizer)
191 assert postcode_table.row_set == {('xx', 'AB 4511', 10, 12)}
194 def test_postcodes_extern_bad_number(dsn, insert_implicit_postcode,
195 postcode_table, tmp_path, tokenizer):
196 insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='AB 4511'))
198 extfile = tmp_path / 'xx_postcodes.csv'
199 extfile.write_text("postcode,lat,lon\nXX 4511,-4,NaN\nCD 4511,-5, -10\n34,200,0")
201 postcodes.update_postcodes(dsn, tmp_path, tokenizer)
203 assert postcode_table.row_set == {('xx', 'AB 4511', 10, 12),
204 ('xx', 'CD 4511', -10, -5)}
207 def test_can_compute(dsn, table_factory):
208 assert not postcodes.can_compute(dsn)
209 table_factory('place')
210 assert postcodes.can_compute(dsn)
213 def test_no_placex_entry(dsn, temp_db_cursor, place_row, postcode_table, tokenizer):
214 # Rewrite the get_country_code function to verify its execution.
215 temp_db_cursor.execute("""
216 CREATE OR REPLACE FUNCTION get_country_code(place geometry)
217 RETURNS TEXT AS $$ BEGIN
219 END; $$ LANGUAGE plpgsql;
221 place_row(geom='SRID=4326;POINT(10 12)', address=dict(postcode='AB 4511'))
222 postcodes.update_postcodes(dsn, None, tokenizer)
224 assert postcode_table.row_set == {('yy', 'AB 4511', 10, 12)}
227 def test_discard_badly_formatted_postcodes(dsn, temp_db_cursor, place_row,
228 postcode_table, tokenizer):
229 # Rewrite the get_country_code function to verify its execution.
230 temp_db_cursor.execute("""
231 CREATE OR REPLACE FUNCTION get_country_code(place geometry)
232 RETURNS TEXT AS $$ BEGIN
234 END; $$ LANGUAGE plpgsql;
236 place_row(geom='SRID=4326;POINT(10 12)', address=dict(postcode='AB 4511'))
237 postcodes.update_postcodes(dsn, None, tokenizer)
239 assert not postcode_table.row_set