1 # SPDX-License-Identifier: GPL-2.0-only
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2022 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Tests for functions to maintain the artificial postcode table.
14 from nominatim.tools import postcodes
15 import dummy_tokenizer
17 class MockPostcodeTable:
18 """ A location_postcode table for testing.
20 def __init__(self, conn):
22 with conn.cursor() as cur:
23 cur.execute("""CREATE TABLE location_postcode (
25 parent_place_id BIGINT,
27 rank_address SMALLINT,
28 indexed_status SMALLINT,
29 indexed_date TIMESTAMP,
30 country_code varchar(2),
32 geometry GEOMETRY(Geometry, 4326))""")
33 cur.execute("""CREATE OR REPLACE FUNCTION token_normalized_postcode(postcode TEXT)
34 RETURNS TEXT AS $$ BEGIN RETURN postcode; END; $$ LANGUAGE plpgsql;
36 CREATE OR REPLACE FUNCTION get_country_code(place geometry)
37 RETURNS TEXT AS $$ BEGIN
39 END; $$ LANGUAGE plpgsql;
43 def add(self, country, postcode, x, y):
44 with self.conn.cursor() as cur:
45 cur.execute("""INSERT INTO location_postcode (place_id, indexed_status,
46 country_code, postcode,
48 VALUES (nextval('seq_place'), 1, %s, %s,
49 'SRID=4326;POINT(%s %s)')""",
50 (country, postcode, x, y))
56 with self.conn.cursor() as cur:
57 cur.execute("""SELECT country_code, postcode,
58 ST_X(geometry), ST_Y(geometry)
59 FROM location_postcode""")
60 return set((tuple(row) for row in cur))
65 return dummy_tokenizer.DummyTokenizer(None, None)
68 def postcode_table(temp_db_conn, placex_table, word_table):
69 return MockPostcodeTable(temp_db_conn)
72 def test_postcodes_empty(dsn, postcode_table, place_table,
74 postcodes.update_postcodes(dsn, tmp_path, tokenizer)
76 assert not postcode_table.row_set
79 def test_postcodes_add_new(dsn, postcode_table, tmp_path,
80 insert_implicit_postcode, tokenizer):
81 insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='9486'))
82 postcode_table.add('yy', '9486', 99, 34)
84 postcodes.update_postcodes(dsn, tmp_path, tokenizer)
86 assert postcode_table.row_set == {('xx', '9486', 10, 12), }
89 def test_postcodes_replace_coordinates(dsn, postcode_table, tmp_path,
90 insert_implicit_postcode, tokenizer):
91 insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='AB 4511'))
92 postcode_table.add('xx', 'AB 4511', 99, 34)
94 postcodes.update_postcodes(dsn, tmp_path, tokenizer)
96 assert postcode_table.row_set == {('xx', 'AB 4511', 10, 12)}
99 def test_postcodes_replace_coordinates_close(dsn, postcode_table, tmp_path,
100 insert_implicit_postcode, tokenizer):
101 insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='AB 4511'))
102 postcode_table.add('xx', 'AB 4511', 10, 11.99999)
104 postcodes.update_postcodes(dsn, tmp_path, tokenizer)
106 assert postcode_table.row_set == {('xx', 'AB 4511', 10, 11.99999)}
109 def test_postcodes_remove(dsn, postcode_table, tmp_path,
110 insert_implicit_postcode, tokenizer):
111 insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='AB 4511'))
112 postcode_table.add('xx', 'badname', 10, 12)
114 postcodes.update_postcodes(dsn, tmp_path, tokenizer)
116 assert postcode_table.row_set == {('xx', 'AB 4511', 10, 12)}
119 def test_postcodes_ignore_empty_country(dsn, postcode_table, tmp_path,
120 insert_implicit_postcode, tokenizer):
121 insert_implicit_postcode(1, None, 'POINT(10 12)', dict(postcode='AB 4511'))
122 postcodes.update_postcodes(dsn, tmp_path, tokenizer)
123 assert not postcode_table.row_set
126 def test_postcodes_remove_all(dsn, postcode_table, place_table,
127 tmp_path, tokenizer):
128 postcode_table.add('ch', '5613', 10, 12)
129 postcodes.update_postcodes(dsn, tmp_path, tokenizer)
131 assert not postcode_table.row_set
134 def test_postcodes_multi_country(dsn, postcode_table, tmp_path,
135 insert_implicit_postcode, tokenizer):
136 insert_implicit_postcode(1, 'de', 'POINT(10 12)', dict(postcode='54451'))
137 insert_implicit_postcode(2, 'cc', 'POINT(100 56)', dict(postcode='DD23 T'))
138 insert_implicit_postcode(3, 'de', 'POINT(10.3 11.0)', dict(postcode='54452'))
139 insert_implicit_postcode(4, 'cc', 'POINT(10.3 11.0)', dict(postcode='54452'))
141 postcodes.update_postcodes(dsn, tmp_path, tokenizer)
143 assert postcode_table.row_set == {('de', '54451', 10, 12),
144 ('de', '54452', 10.3, 11.0),
145 ('cc', '54452', 10.3, 11.0),
146 ('cc', 'DD23 T', 100, 56)}
149 @pytest.mark.parametrize("gzipped", [True, False])
150 def test_postcodes_extern(dsn, postcode_table, tmp_path,
151 insert_implicit_postcode, tokenizer, gzipped):
152 insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='AB 4511'))
154 extfile = tmp_path / 'xx_postcodes.csv'
155 extfile.write_text("postcode,lat,lon\nAB 4511,-4,-1\nCD 4511,-5, -10")
158 subprocess.run(['gzip', str(extfile)])
159 assert not extfile.is_file()
161 postcodes.update_postcodes(dsn, tmp_path, tokenizer)
163 assert postcode_table.row_set == {('xx', 'AB 4511', 10, 12),
164 ('xx', 'CD 4511', -10, -5)}
167 def test_postcodes_extern_bad_column(dsn, postcode_table, tmp_path,
168 insert_implicit_postcode, tokenizer):
169 insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='AB 4511'))
171 extfile = tmp_path / 'xx_postcodes.csv'
172 extfile.write_text("postode,lat,lon\nAB 4511,-4,-1\nCD 4511,-5, -10")
174 postcodes.update_postcodes(dsn, tmp_path, tokenizer)
176 assert postcode_table.row_set == {('xx', 'AB 4511', 10, 12)}
179 def test_postcodes_extern_bad_number(dsn, insert_implicit_postcode,
180 postcode_table, tmp_path, tokenizer):
181 insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='AB 4511'))
183 extfile = tmp_path / 'xx_postcodes.csv'
184 extfile.write_text("postcode,lat,lon\nXX 4511,-4,NaN\nCD 4511,-5, -10\n34,200,0")
186 postcodes.update_postcodes(dsn, tmp_path, tokenizer)
188 assert postcode_table.row_set == {('xx', 'AB 4511', 10, 12),
189 ('xx', 'CD 4511', -10, -5)}
191 def test_can_compute(dsn, table_factory):
192 assert not postcodes.can_compute(dsn)
193 table_factory('place')
194 assert postcodes.can_compute(dsn)
196 def test_no_placex_entry(dsn, tmp_path, temp_db_cursor, place_row, postcode_table, tokenizer):
197 #Rewrite the get_country_code function to verify its execution.
198 temp_db_cursor.execute("""
199 CREATE OR REPLACE FUNCTION get_country_code(place geometry)
200 RETURNS TEXT AS $$ BEGIN
202 END; $$ LANGUAGE plpgsql;
204 place_row(geom='SRID=4326;POINT(10 12)', address=dict(postcode='AB 4511'))
205 postcodes.update_postcodes(dsn, tmp_path, tokenizer)
207 assert postcode_table.row_set == {('fr', 'AB 4511', 10, 12)}
210 def insert_implicit_postcode(placex_table, place_row):
212 Inserts data into the placex and place table
213 which can then be used to compute one postcode.
215 def _insert_implicit_postcode(osm_id, country, geometry, address):
216 placex_table.add(osm_id=osm_id, country=country, geom=geometry)
217 place_row(osm_id=osm_id, geom='SRID=4326;'+geometry, address=address)
219 return _insert_implicit_postcode