]> git.openstreetmap.org Git - nominatim.git/blob - test/python/tools/test_import_special_phrases.py
release 5.1.0.post8
[nominatim.git] / test / python / tools / test_import_special_phrases.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2025 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8     Tests for import special phrases methods
9     of the class SPImporter.
10 """
11 import pytest
12 from nominatim_db.tools.special_phrases.sp_importer import SPImporter
13 from nominatim_db.tools.special_phrases.sp_wiki_loader import SPWikiLoader
14 from nominatim_db.tools.special_phrases.special_phrase import SpecialPhrase
15
16
17 @pytest.fixture
18 def sp_importer(temp_db_conn, def_config, monkeypatch):
19     """
20         Return an instance of SPImporter.
21     """
22     monkeypatch.setenv('NOMINATIM_LANGUAGES', 'en')
23     loader = SPWikiLoader(def_config)
24     return SPImporter(def_config, temp_db_conn, loader)
25
26
27 @pytest.fixture
28 def xml_wiki_content(src_dir):
29     """
30         return the content of the static xml test file.
31     """
32     xml_test_content = src_dir / 'test' / 'testdata' / 'special_phrases_test_content.txt'
33     return xml_test_content.read_text()
34
35
36 @pytest.fixture
37 def default_phrases(table_factory):
38     table_factory('place_classtype_testclasstypetable_to_delete')
39     table_factory('place_classtype_testclasstypetable_to_keep')
40
41
42 def test_fetch_existing_place_classtype_tables(sp_importer, table_factory):
43     """
44         Check for the fetch_existing_place_classtype_tables() method.
45         It should return the table just created.
46     """
47     table_factory('place_classtype_testclasstypetable')
48
49     sp_importer._fetch_existing_place_classtype_tables()
50     contained_table = sp_importer.table_phrases_to_delete.pop()
51     assert contained_table == 'place_classtype_testclasstypetable'
52
53
54 def test_check_sanity_class(sp_importer):
55     """
56         Check for _check_sanity() method.
57         If a wrong class or type is given, an UsageError should raise.
58         If a good class and type are given, nothing special happens.
59     """
60
61     assert not sp_importer._check_sanity(SpecialPhrase('en', '', 'type', ''))
62     assert not sp_importer._check_sanity(SpecialPhrase('en', 'class', '', ''))
63
64     assert sp_importer._check_sanity(SpecialPhrase('en', 'class', 'type', ''))
65
66
67 def test_load_white_and_black_lists(sp_importer):
68     """
69         Test that _load_white_and_black_lists() well return
70         black list and white list and that they are of dict type.
71     """
72     black_list, white_list = sp_importer._load_white_and_black_lists()
73
74     assert isinstance(black_list, dict) and isinstance(white_list, dict)
75
76
77 def test_create_place_classtype_indexes(temp_db_with_extensions,
78                                         temp_db_conn, temp_db_cursor,
79                                         table_factory, sp_importer):
80     """
81         Test that _create_place_classtype_indexes() create the
82         place_id index and centroid index on the right place_class_type table.
83     """
84     phrase_class = 'class'
85     phrase_type = 'type'
86     table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
87
88     table_factory(table_name, 'place_id BIGINT, centroid GEOMETRY')
89
90     sp_importer._create_place_classtype_indexes('', phrase_class, phrase_type)
91     temp_db_conn.commit()
92
93     assert check_placeid_and_centroid_indexes(temp_db_cursor, phrase_class, phrase_type)
94
95
96 def test_create_place_classtype_table(temp_db_conn, temp_db_cursor, placex_table, sp_importer):
97     """
98         Test that _create_place_classtype_table() create
99         the right place_classtype table.
100     """
101     phrase_class = 'class'
102     phrase_type = 'type'
103     sp_importer._create_place_classtype_table('', phrase_class, phrase_type)
104     temp_db_conn.commit()
105
106     assert check_table_exist(temp_db_cursor, phrase_class, phrase_type)
107
108
109 def test_grant_access_to_web_user(temp_db_conn, temp_db_cursor, table_factory,
110                                   def_config, sp_importer):
111     """
112         Test that _grant_access_to_webuser() give
113         right access to the web user.
114     """
115     phrase_class = 'class'
116     phrase_type = 'type'
117     table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
118
119     table_factory(table_name)
120
121     sp_importer._grant_access_to_webuser(phrase_class, phrase_type)
122     temp_db_conn.commit()
123
124     assert check_grant_access(temp_db_cursor, def_config.DATABASE_WEBUSER,
125                               phrase_class, phrase_type)
126
127
128 def test_create_place_classtype_table_and_indexes(
129         temp_db_cursor, def_config, placex_table,
130         sp_importer, temp_db_conn, monkeypatch):
131     """
132         Test that _create_place_classtype_table_and_indexes()
133         create the right place_classtype tables and place_id indexes
134         and centroid indexes and grant access to the web user
135         for the given set of pairs.
136     """
137     pairs = set([('class1', 'type1'), ('class2', 'type2')])
138     for pair in pairs:
139         placex_table.add(cls=pair[0], typ=pair[1])   # adding to db
140     sp_importer._create_classtype_table_and_indexes(pairs)
141     temp_db_conn.commit()
142
143     for pair in pairs:
144         assert check_table_exist(temp_db_cursor, pair[0], pair[1])
145         assert check_placeid_and_centroid_indexes(temp_db_cursor, pair[0], pair[1])
146         assert check_grant_access(temp_db_cursor, def_config.DATABASE_WEBUSER, pair[0], pair[1])
147
148
149 def test_remove_non_existent_tables_from_db(sp_importer, default_phrases,
150                                             temp_db_conn, temp_db_cursor):
151     """
152         Check for the remove_non_existent_phrases_from_db() method.
153
154         It should removed entries from the word table which are contained
155         in the words_phrases_to_delete set and not those also contained
156         in the words_phrases_still_exist set.
157
158         place_classtype tables contained in table_phrases_to_delete should
159         be deleted.
160     """
161     sp_importer.table_phrases_to_delete = {
162         'place_classtype_testclasstypetable_to_delete'
163     }
164
165     query_tables = """
166         SELECT table_name
167         FROM information_schema.tables
168         WHERE table_schema='public'
169         AND table_name like 'place_classtype_%';
170     """
171
172     sp_importer._remove_non_existent_tables_from_db()
173     temp_db_conn.commit()
174
175     assert temp_db_cursor.row_set(query_tables) \
176         == {('place_classtype_testclasstypetable_to_keep', )}
177
178
179 @pytest.mark.parametrize("should_replace", [(True), (False)])
180 def test_import_phrases(monkeypatch, temp_db_cursor, def_config, sp_importer,
181                         placex_table, table_factory, tokenizer_mock,
182                         xml_wiki_content, should_replace):
183     """
184         Check that the main import_phrases() method is well executed.
185         It should create the place_classtype table, the place_id and centroid indexes,
186         grand access to the web user and executing the SQL functions for amenities.
187         It should also update the database well by deleting or preserving existing entries
188         of the database.
189     """
190     # Add some data to the database before execution in order to test
191     # what is deleted and what is preserved.
192     table_factory('place_classtype_amenity_animal_shelter')
193     table_factory('place_classtype_wrongclass_wrongtype')
194
195     monkeypatch.setattr('nominatim_db.tools.special_phrases.sp_wiki_loader._get_wiki_content',
196                         lambda lang: xml_wiki_content)
197
198     class_test = 'aerialway'
199     type_test = 'zip_line'
200
201     tokenizer = tokenizer_mock()
202     placex_table.add(cls=class_test, typ=type_test)  # in db for special phrase filtering
203     placex_table.add(cls='amenity', typ='animal_shelter')  # in db for special phrase filtering
204     sp_importer.import_phrases(tokenizer, should_replace)
205
206     assert len(tokenizer.analyser_cache['special_phrases']) == 18
207
208     assert check_table_exist(temp_db_cursor, class_test, type_test)
209     assert check_placeid_and_centroid_indexes(temp_db_cursor, class_test, type_test)
210     assert check_grant_access(temp_db_cursor, def_config.DATABASE_WEBUSER, class_test, type_test)
211     assert check_table_exist(temp_db_cursor, 'amenity', 'animal_shelter')
212     if should_replace:
213         assert not check_table_exist(temp_db_cursor, 'wrong_class', 'wrong_type')
214
215     assert temp_db_cursor.table_exists('place_classtype_amenity_animal_shelter')
216     if should_replace:
217         assert not temp_db_cursor.table_exists('place_classtype_wrongclass_wrongtype')
218
219
220 def check_table_exist(temp_db_cursor, phrase_class, phrase_type):
221     """
222         Verify that the place_classtype table exists for the given
223         phrase_class and phrase_type.
224     """
225     return temp_db_cursor.table_exists('place_classtype_{}_{}'.format(phrase_class, phrase_type))
226
227
228 def check_grant_access(temp_db_cursor, user, phrase_class, phrase_type):
229     """
230         Check that the web user has been granted right access to the
231         place_classtype table of the given phrase_class and phrase_type.
232     """
233     table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
234
235     temp_db_cursor.execute("""
236             SELECT * FROM information_schema.role_table_grants
237             WHERE table_name='{}'
238             AND grantee='{}'
239             AND privilege_type='SELECT'""".format(table_name, user))
240     return temp_db_cursor.fetchone()
241
242
243 def check_placeid_and_centroid_indexes(temp_db_cursor, phrase_class, phrase_type):
244     """
245         Check that the place_id index and centroid index exist for the
246         place_classtype table of the given phrase_class and phrase_type.
247     """
248     table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
249     index_prefix = 'idx_place_classtype_{}_{}_'.format(phrase_class, phrase_type)
250
251     return (
252         temp_db_cursor.index_exists(table_name, index_prefix + 'centroid')
253         and
254         temp_db_cursor.index_exists(table_name, index_prefix + 'place_id')
255     )
256
257
258 @pytest.mark.parametrize("should_replace", [(True), (False)])
259 def test_import_phrases_special_phrase_filtering(monkeypatch, temp_db_cursor, def_config,
260                                                  sp_importer, placex_table, tokenizer_mock,
261                                                  xml_wiki_content, should_replace):
262
263     monkeypatch.setattr('nominatim_db.tools.special_phrases.sp_wiki_loader._get_wiki_content',
264                         lambda lang: xml_wiki_content)
265
266     class_test = 'aerialway'
267     type_test = 'zip_line'
268
269     placex_table.add(cls=class_test, typ=type_test)  # add to the database to make valid
270     tokenizer = tokenizer_mock()
271     sp_importer.import_phrases(tokenizer, should_replace)
272
273     assert ('Zip Line', 'aerialway', 'zip_line', '-') in sp_importer.word_phrases
274     assert check_table_exist(temp_db_cursor, class_test, type_test)
275     assert check_placeid_and_centroid_indexes(temp_db_cursor, class_test, type_test)
276     assert check_grant_access(temp_db_cursor, def_config.DATABASE_WEBUSER, class_test, type_test)
277
278
279 def test_get_classtype_pairs_directly(placex_table, temp_db_conn, sp_importer):
280     for _ in range(101):
281         placex_table.add(cls='highway', typ='residential')
282     for _ in range(99):
283         placex_table.add(cls='amenity', typ='toilet')
284
285     temp_db_conn.commit()
286
287     result = sp_importer.get_classtype_pairs(100)
288     print("RESULT:", result)
289     assert ('highway', 'residential') in result
290     assert ('amenity', 'toilet') not in result