]> git.openstreetmap.org Git - nominatim.git/blob - test/python/tools/test_import_special_phrases.py
Merge pull request #3991 from lonvia/interpolation-on-addresses
[nominatim.git] / test / python / tools / test_import_special_phrases.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2026 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8     Tests for import special phrases methods
9     of the class SPImporter.
10 """
11 import pytest
12 from nominatim_db.tools.special_phrases.sp_importer import SPImporter
13 from nominatim_db.tools.special_phrases.sp_wiki_loader import SPWikiLoader
14 from nominatim_db.tools.special_phrases.special_phrase import SpecialPhrase
15
16
17 @pytest.fixture
18 def sp_importer(temp_db_conn, def_config, monkeypatch):
19     """
20         Return an instance of SPImporter.
21     """
22     monkeypatch.setenv('NOMINATIM_LANGUAGES', 'en')
23     loader = SPWikiLoader(def_config)
24     return SPImporter(def_config, temp_db_conn, loader)
25
26
27 @pytest.fixture
28 def xml_wiki_content(src_dir):
29     """
30         return the content of the static xml test file.
31     """
32     xml_test_content = src_dir / 'test' / 'testdata' / 'special_phrases_test_content.txt'
33     return xml_test_content.read_text(encoding='utf-8')
34
35
36 @pytest.fixture
37 def default_phrases(table_factory):
38     table_factory('place_classtype_testclasstypetable_to_delete')
39     table_factory('place_classtype_testclasstypetable_to_keep')
40
41
42 def test_fetch_existing_place_classtype_tables(sp_importer, table_factory):
43     """
44         Check for the fetch_existing_place_classtype_tables() method.
45         It should return the table just created.
46     """
47     table_factory('place_classtype_testclasstypetable')
48
49     sp_importer._fetch_existing_place_classtype_tables()
50     contained_table = sp_importer.table_phrases_to_delete.pop()
51     assert contained_table == 'place_classtype_testclasstypetable'
52
53
54 def test_check_sanity_class(sp_importer):
55     """
56         Check for _check_sanity() method.
57         If a wrong class or type is given, an UsageError should raise.
58         If a good class and type are given, nothing special happens.
59     """
60
61     assert not sp_importer._check_sanity(SpecialPhrase('en', '', 'type', ''))
62     assert not sp_importer._check_sanity(SpecialPhrase('en', 'class', '', ''))
63
64     assert sp_importer._check_sanity(SpecialPhrase('en', 'class', 'type', ''))
65
66
67 def test_load_white_and_black_lists(sp_importer):
68     """
69         Test that _load_white_and_black_lists() well return
70         black list and white list and that they are of dict type.
71     """
72     black_list, white_list = sp_importer._load_white_and_black_lists()
73
74     assert isinstance(black_list, dict) and isinstance(white_list, dict)
75
76
77 def test_create_place_classtype_indexes(temp_db_with_extensions,
78                                         temp_db_conn, temp_db_cursor,
79                                         table_factory, sp_importer):
80     """
81         Test that _create_place_classtype_indexes() create the
82         place_id index and centroid index on the right place_class_type table.
83     """
84     phrase_class = 'class'
85     phrase_type = 'type'
86     table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
87
88     table_factory(table_name, 'place_id BIGINT, centroid GEOMETRY')
89
90     sp_importer._create_place_classtype_indexes('', phrase_class, phrase_type)
91     temp_db_conn.commit()
92
93     assert check_placeid_and_centroid_indexes(temp_db_cursor, phrase_class, phrase_type)
94
95
96 def test_create_place_classtype_table(temp_db_conn, temp_db_cursor, placex_table, sp_importer):
97     """
98         Test that _create_place_classtype_table() create
99         the right place_classtype table.
100     """
101     phrase_class = 'class'
102     phrase_type = 'type'
103     sp_importer._create_place_classtype_table('', phrase_class, phrase_type)
104     temp_db_conn.commit()
105
106     assert check_table_exist(temp_db_cursor, phrase_class, phrase_type)
107
108
109 def test_grant_access_to_web_user(temp_db_conn, temp_db_cursor, table_factory,
110                                   def_config, sp_importer):
111     """
112         Test that _grant_access_to_webuser() give
113         right access to the web user.
114     """
115     phrase_class = 'class'
116     phrase_type = 'type'
117     table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
118
119     table_factory(table_name)
120
121     sp_importer._grant_access_to_webuser(phrase_class, phrase_type)
122     temp_db_conn.commit()
123
124     assert check_grant_access(temp_db_cursor, def_config.DATABASE_WEBUSER,
125                               phrase_class, phrase_type)
126
127
128 def test_create_place_classtype_table_and_indexes(temp_db_cursor, def_config, placex_row,
129                                                   sp_importer, temp_db_conn, monkeypatch):
130     """
131         Test that _create_place_classtype_table_and_indexes()
132         create the right place_classtype tables and place_id indexes
133         and centroid indexes and grant access to the web user
134         for the given set of pairs.
135     """
136     pairs = set([('class1', 'type1'), ('class2', 'type2')])
137     for pair in pairs:
138         placex_row(cls=pair[0], typ=pair[1])   # adding to db
139     sp_importer._create_classtype_table_and_indexes(pairs)
140     temp_db_conn.commit()
141
142     for pair in pairs:
143         assert check_table_exist(temp_db_cursor, pair[0], pair[1])
144         assert check_placeid_and_centroid_indexes(temp_db_cursor, pair[0], pair[1])
145         assert check_grant_access(temp_db_cursor, def_config.DATABASE_WEBUSER, pair[0], pair[1])
146
147
148 def test_remove_non_existent_tables_from_db(sp_importer, default_phrases,
149                                             temp_db_conn, temp_db_cursor):
150     """
151         Check for the remove_non_existent_phrases_from_db() method.
152
153         It should removed entries from the word table which are contained
154         in the words_phrases_to_delete set and not those also contained
155         in the words_phrases_still_exist set.
156
157         place_classtype tables contained in table_phrases_to_delete should
158         be deleted.
159     """
160     sp_importer.table_phrases_to_delete = {
161         'place_classtype_testclasstypetable_to_delete'
162     }
163
164     query_tables = """
165         SELECT table_name
166         FROM information_schema.tables
167         WHERE table_schema='public'
168         AND table_name like 'place_classtype_%';
169     """
170
171     sp_importer._remove_non_existent_tables_from_db()
172     temp_db_conn.commit()
173
174     assert temp_db_cursor.row_set(query_tables) \
175         == {('place_classtype_testclasstypetable_to_keep', )}
176
177
178 @pytest.mark.parametrize("should_replace", [(True), (False)])
179 def test_import_phrases(monkeypatch, temp_db_cursor, def_config, sp_importer,
180                         placex_row, table_factory, tokenizer_mock,
181                         xml_wiki_content, should_replace):
182     """
183         Check that the main import_phrases() method is well executed.
184         It should create the place_classtype table, the place_id and centroid indexes,
185         grand access to the web user and executing the SQL functions for amenities.
186         It should also update the database well by deleting or preserving existing entries
187         of the database.
188     """
189     # Add some data to the database before execution in order to test
190     # what is deleted and what is preserved.
191     table_factory('place_classtype_amenity_animal_shelter')
192     table_factory('place_classtype_wrongclass_wrongtype')
193
194     monkeypatch.setattr('nominatim_db.tools.special_phrases.sp_wiki_loader._get_wiki_content',
195                         lambda lang: xml_wiki_content)
196
197     class_test = 'aerialway'
198     type_test = 'zip_line'
199
200     tokenizer = tokenizer_mock()
201     placex_row(cls=class_test, typ=type_test)  # in db for special phrase filtering
202     placex_row(cls='amenity', typ='animal_shelter')  # in db for special phrase filtering
203     sp_importer.import_phrases(tokenizer, should_replace)
204
205     assert len(tokenizer.analyser_cache['special_phrases']) == 19
206
207     assert check_table_exist(temp_db_cursor, class_test, type_test)
208     assert check_placeid_and_centroid_indexes(temp_db_cursor, class_test, type_test)
209     assert check_grant_access(temp_db_cursor, def_config.DATABASE_WEBUSER, class_test, type_test)
210     assert check_table_exist(temp_db_cursor, 'amenity', 'animal_shelter')
211     if should_replace:
212         assert not check_table_exist(temp_db_cursor, 'wrong_class', 'wrong_type')
213
214     assert temp_db_cursor.table_exists('place_classtype_amenity_animal_shelter')
215     if should_replace:
216         assert not temp_db_cursor.table_exists('place_classtype_wrongclass_wrongtype')
217
218
219 def check_table_exist(temp_db_cursor, phrase_class, phrase_type):
220     """
221         Verify that the place_classtype table exists for the given
222         phrase_class and phrase_type.
223     """
224     return temp_db_cursor.table_exists('place_classtype_{}_{}'.format(phrase_class, phrase_type))
225
226
227 def check_grant_access(temp_db_cursor, user, phrase_class, phrase_type):
228     """
229         Check that the web user has been granted right access to the
230         place_classtype table of the given phrase_class and phrase_type.
231     """
232     table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
233
234     temp_db_cursor.execute("""
235             SELECT * FROM information_schema.role_table_grants
236             WHERE table_name='{}'
237             AND grantee='{}'
238             AND privilege_type='SELECT'""".format(table_name, user))
239     return temp_db_cursor.fetchone()
240
241
242 def check_placeid_and_centroid_indexes(temp_db_cursor, phrase_class, phrase_type):
243     """
244         Check that the place_id index and centroid index exist for the
245         place_classtype table of the given phrase_class and phrase_type.
246     """
247     table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
248     index_prefix = 'idx_place_classtype_{}_{}_'.format(phrase_class, phrase_type)
249
250     return (
251         temp_db_cursor.index_exists(table_name, index_prefix + 'centroid')
252         and
253         temp_db_cursor.index_exists(table_name, index_prefix + 'place_id')
254     )
255
256
257 @pytest.mark.parametrize("should_replace", [(True), (False)])
258 def test_import_phrases_special_phrase_filtering(monkeypatch, temp_db_cursor, def_config,
259                                                  sp_importer, placex_row, tokenizer_mock,
260                                                  xml_wiki_content, should_replace):
261
262     monkeypatch.setattr('nominatim_db.tools.special_phrases.sp_wiki_loader._get_wiki_content',
263                         lambda lang: xml_wiki_content)
264
265     class_test = 'aerialway'
266     type_test = 'zip_line'
267
268     placex_row(cls=class_test, typ=type_test)  # add to the database to make valid
269     tokenizer = tokenizer_mock()
270     sp_importer.import_phrases(tokenizer, should_replace)
271
272     assert ('Zip Line', 'aerialway', 'zip_line', '-') in sp_importer.word_phrases
273     assert check_table_exist(temp_db_cursor, class_test, type_test)
274     assert check_placeid_and_centroid_indexes(temp_db_cursor, class_test, type_test)
275     assert check_grant_access(temp_db_cursor, def_config.DATABASE_WEBUSER, class_test, type_test)
276
277
278 def test_get_classtype_pairs_directly(placex_row, temp_db_conn, sp_importer):
279     for _ in range(101):
280         placex_row(cls='highway', typ='residential')
281     for _ in range(99):
282         placex_row(cls='amenity', typ='toilet')
283
284     temp_db_conn.commit()
285
286     result = sp_importer.get_classtype_pairs(100)
287     print("RESULT:", result)
288     assert ('highway', 'residential') in result
289     assert ('amenity', 'toilet') not in result