]> git.openstreetmap.org Git - nominatim.git/blob - test/python/tools/test_postcodes.py
Merge remote-tracking branch 'upstream/master'
[nominatim.git] / test / python / tools / test_postcodes.py
1 # SPDX-License-Identifier: GPL-2.0-only
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2022 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Tests for functions to maintain the artificial postcode table.
9 """
10 import subprocess
11
12 import pytest
13
14 from nominatim.tools import postcodes, country_info
15 import dummy_tokenizer
16
17 class MockPostcodeTable:
18     """ A location_postcode table for testing.
19     """
20     def __init__(self, conn):
21         self.conn = conn
22         with conn.cursor() as cur:
23             cur.execute("""CREATE TABLE location_postcode (
24                                place_id BIGINT,
25                                parent_place_id BIGINT,
26                                rank_search SMALLINT,
27                                rank_address SMALLINT,
28                                indexed_status SMALLINT,
29                                indexed_date TIMESTAMP,
30                                country_code varchar(2),
31                                postcode TEXT,
32                                geometry GEOMETRY(Geometry, 4326))""")
33             cur.execute("""CREATE OR REPLACE FUNCTION token_normalized_postcode(postcode TEXT)
34                            RETURNS TEXT AS $$ BEGIN RETURN postcode; END; $$ LANGUAGE plpgsql;
35
36                            CREATE OR REPLACE FUNCTION get_country_code(place geometry)
37                            RETURNS TEXT AS $$ BEGIN 
38                            RETURN null;
39                            END; $$ LANGUAGE plpgsql;
40                         """)
41         conn.commit()
42
43     def add(self, country, postcode, x, y):
44         with self.conn.cursor() as cur:
45             cur.execute("""INSERT INTO location_postcode (place_id, indexed_status,
46                                                           country_code, postcode,
47                                                           geometry)
48                            VALUES (nextval('seq_place'), 1, %s, %s,
49                                    'SRID=4326;POINT(%s %s)')""",
50                         (country, postcode, x, y))
51         self.conn.commit()
52
53
54     @property
55     def row_set(self):
56         with self.conn.cursor() as cur:
57             cur.execute("""SELECT country_code, postcode,
58                                   ST_X(geometry), ST_Y(geometry)
59                            FROM location_postcode""")
60             return set((tuple(row) for row in cur))
61
62
63 @pytest.fixture
64 def tokenizer():
65     return dummy_tokenizer.DummyTokenizer(None, None)
66
67
68 @pytest.fixture
69 def postcode_table(def_config, temp_db_conn, placex_table):
70     country_info.setup_country_config(def_config)
71     return MockPostcodeTable(temp_db_conn)
72
73
74 @pytest.fixture
75 def insert_implicit_postcode(placex_table, place_row):
76     """
77         Inserts data into the placex and place table
78         which can then be used to compute one postcode.
79     """
80     def _insert_implicit_postcode(osm_id, country, geometry, address):
81         placex_table.add(osm_id=osm_id, country=country, geom=geometry)
82         place_row(osm_id=osm_id, geom='SRID=4326;'+geometry, address=address)
83
84     return _insert_implicit_postcode
85
86
87 def test_postcodes_empty(dsn, postcode_table, place_table,
88                          tmp_path, tokenizer):
89     postcodes.update_postcodes(dsn, tmp_path, tokenizer)
90
91     assert not postcode_table.row_set
92
93
94 def test_postcodes_add_new(dsn, postcode_table, tmp_path,
95                            insert_implicit_postcode, tokenizer):
96     insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='9486'))
97     postcode_table.add('yy', '9486', 99, 34)
98
99     postcodes.update_postcodes(dsn, tmp_path, tokenizer)
100
101     assert postcode_table.row_set == {('xx', '9486', 10, 12), }
102
103
104 def test_postcodes_replace_coordinates(dsn, postcode_table, tmp_path,
105                                        insert_implicit_postcode, tokenizer):
106     insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='AB 4511'))
107     postcode_table.add('xx', 'AB 4511', 99, 34)
108
109     postcodes.update_postcodes(dsn, tmp_path, tokenizer)
110
111     assert postcode_table.row_set == {('xx', 'AB 4511', 10, 12)}
112
113
114 def test_postcodes_replace_coordinates_close(dsn, postcode_table, tmp_path,
115                                              insert_implicit_postcode, tokenizer):
116     insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='AB 4511'))
117     postcode_table.add('xx', 'AB 4511', 10, 11.99999)
118
119     postcodes.update_postcodes(dsn, tmp_path, tokenizer)
120
121     assert postcode_table.row_set == {('xx', 'AB 4511', 10, 11.99999)}
122
123
124 def test_postcodes_remove(dsn, postcode_table, tmp_path,
125                           insert_implicit_postcode, tokenizer):
126     insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='AB 4511'))
127     postcode_table.add('xx', 'badname', 10, 12)
128
129     postcodes.update_postcodes(dsn, tmp_path, tokenizer)
130
131     assert postcode_table.row_set == {('xx', 'AB 4511', 10, 12)}
132
133
134 def test_postcodes_ignore_empty_country(dsn, postcode_table, tmp_path,
135                                         insert_implicit_postcode, tokenizer):
136     insert_implicit_postcode(1, None, 'POINT(10 12)', dict(postcode='AB 4511'))
137     postcodes.update_postcodes(dsn, tmp_path, tokenizer)
138     assert not postcode_table.row_set
139
140
141 def test_postcodes_remove_all(dsn, postcode_table, place_table,
142                               tmp_path, tokenizer):
143     postcode_table.add('ch', '5613', 10, 12)
144     postcodes.update_postcodes(dsn, tmp_path, tokenizer)
145
146     assert not postcode_table.row_set
147
148
149 def test_postcodes_multi_country(dsn, postcode_table, tmp_path,
150                                  insert_implicit_postcode, tokenizer):
151     insert_implicit_postcode(1, 'de', 'POINT(10 12)', dict(postcode='54451'))
152     insert_implicit_postcode(2, 'cc', 'POINT(100 56)', dict(postcode='DD23 T'))
153     insert_implicit_postcode(3, 'de', 'POINT(10.3 11.0)', dict(postcode='54452'))
154     insert_implicit_postcode(4, 'cc', 'POINT(10.3 11.0)', dict(postcode='54452'))
155
156     postcodes.update_postcodes(dsn, tmp_path, tokenizer)
157
158     assert postcode_table.row_set == {('de', '54451', 10, 12),
159                                       ('de', '54452', 10.3, 11.0),
160                                       ('cc', '54452', 10.3, 11.0),
161                                       ('cc', 'DD23 T', 100, 56)}
162
163
164 @pytest.mark.parametrize("gzipped", [True, False])
165 def test_postcodes_extern(dsn, postcode_table, tmp_path,
166                           insert_implicit_postcode, tokenizer, gzipped):
167     insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='AB 4511'))
168
169     extfile = tmp_path / 'xx_postcodes.csv'
170     extfile.write_text("postcode,lat,lon\nAB 4511,-4,-1\nCD 4511,-5, -10")
171
172     if gzipped:
173         subprocess.run(['gzip', str(extfile)])
174         assert not extfile.is_file()
175
176     postcodes.update_postcodes(dsn, tmp_path, tokenizer)
177
178     assert postcode_table.row_set == {('xx', 'AB 4511', 10, 12),
179                                       ('xx', 'CD 4511', -10, -5)}
180
181
182 def test_postcodes_extern_bad_column(dsn, postcode_table, tmp_path, 
183                                      insert_implicit_postcode, tokenizer):
184     insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='AB 4511'))
185
186     extfile = tmp_path / 'xx_postcodes.csv'
187     extfile.write_text("postode,lat,lon\nAB 4511,-4,-1\nCD 4511,-5, -10")
188
189     postcodes.update_postcodes(dsn, tmp_path, tokenizer)
190
191     assert postcode_table.row_set == {('xx', 'AB 4511', 10, 12)}
192
193
194 def test_postcodes_extern_bad_number(dsn, insert_implicit_postcode,
195                                      postcode_table, tmp_path, tokenizer):
196     insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='AB 4511'))
197
198     extfile = tmp_path / 'xx_postcodes.csv'
199     extfile.write_text("postcode,lat,lon\nXX 4511,-4,NaN\nCD 4511,-5, -10\n34,200,0")
200
201     postcodes.update_postcodes(dsn, tmp_path, tokenizer)
202
203     assert postcode_table.row_set == {('xx', 'AB 4511', 10, 12),
204                                       ('xx', 'CD 4511', -10, -5)}
205
206 def test_can_compute(dsn, table_factory):
207     assert not postcodes.can_compute(dsn)
208     table_factory('place')
209     assert postcodes.can_compute(dsn)
210
211
212 def test_no_placex_entry(dsn, tmp_path, temp_db_cursor, place_row, postcode_table, tokenizer):
213     #Rewrite the get_country_code function to verify its execution.
214     temp_db_cursor.execute("""
215         CREATE OR REPLACE FUNCTION get_country_code(place geometry)
216         RETURNS TEXT AS $$ BEGIN 
217         RETURN 'yy';
218         END; $$ LANGUAGE plpgsql;
219     """)
220     place_row(geom='SRID=4326;POINT(10 12)', address=dict(postcode='AB 4511'))
221     postcodes.update_postcodes(dsn, tmp_path, tokenizer)
222
223     assert postcode_table.row_set == {('yy', 'AB 4511', 10, 12)}
224
225
226 def test_discard_badly_formatted_postcodes(dsn, tmp_path, temp_db_cursor, place_row, postcode_table, tokenizer):
227     #Rewrite the get_country_code function to verify its execution.
228     temp_db_cursor.execute("""
229         CREATE OR REPLACE FUNCTION get_country_code(place geometry)
230         RETURNS TEXT AS $$ BEGIN 
231         RETURN 'fr';
232         END; $$ LANGUAGE plpgsql;
233     """)
234     place_row(geom='SRID=4326;POINT(10 12)', address=dict(postcode='AB 4511'))
235     postcodes.update_postcodes(dsn, tmp_path, tokenizer)
236
237     assert not postcode_table.row_set