]> git.openstreetmap.org Git - nominatim.git/blob - test/python/test_tools_import_special_phrases.py
test: use src_dir fixture instead of self-computed paths
[nominatim.git] / test / python / test_tools_import_special_phrases.py
1 """
2     Tests for import special phrases methods
3     of the class SPImporter.
4 """
5 from nominatim.errors import UsageError
6 from pathlib import Path
7 import tempfile
8 from shutil import copyfile
9 import pytest
10 from nominatim.tools.special_phrases.sp_importer import SPImporter
11 from nominatim.tools.special_phrases.sp_wiki_loader import SPWikiLoader
12 from nominatim.tools.special_phrases.sp_csv_loader import SPCsvLoader
13 from nominatim.tools.special_phrases.special_phrase import SpecialPhrase
14
15 from cursor import CursorForTesting
16
17 @pytest.fixture
18 def testfile_dir(src_dir):
19     return src_dir / 'test' / 'testfiles'
20
21
22 @pytest.fixture
23 def sp_importer(temp_db_conn, def_config, temp_phplib_dir_with_migration):
24     """
25         Return an instance of SPImporter.
26     """
27     loader = SPWikiLoader(def_config, ['en'])
28     return SPImporter(def_config, temp_phplib_dir_with_migration, temp_db_conn, loader)
29
30
31 @pytest.fixture
32 def temp_phplib_dir_with_migration(src_dir, tmp_path):
33     """
34         Return temporary phpdir with migration subdirectory and
35         PhraseSettingsToJson.php script inside.
36     """
37     migration_file = (src_dir / 'lib-php' / 'migration' / 'PhraseSettingsToJson.php').resolve()
38
39     phpdir = tmp_path / 'tempphp'
40     phpdir.mkdir()
41
42     (phpdir / 'migration').mkdir()
43     migration_dest_path = (phpdir / 'migration' / 'PhraseSettingsToJson.php').resolve()
44     copyfile(str(migration_file), str(migration_dest_path))
45
46     return phpdir
47
48
49 @pytest.fixture
50 def xml_wiki_content(src_dir):
51     """
52         return the content of the static xml test file.
53     """
54     xml_test_content_path = (src_dir / 'test' / 'testdata' / 'special_phrases_test_content.txt').resolve()
55     return xml_test_content_path.read_text()
56
57
58 @pytest.fixture
59 def default_phrases(table_factory):
60     table_factory('place_classtype_testclasstypetable_to_delete')
61     table_factory('place_classtype_testclasstypetable_to_keep')
62
63
64 def test_fetch_existing_place_classtype_tables(sp_importer, table_factory):
65     """
66         Check for the fetch_existing_place_classtype_tables() method.
67         It should return the table just created.
68     """
69     table_factory('place_classtype_testclasstypetable')
70
71     sp_importer._fetch_existing_place_classtype_tables()
72     contained_table = sp_importer.table_phrases_to_delete.pop()
73     assert contained_table == 'place_classtype_testclasstypetable'
74
75 def test_check_sanity_class(sp_importer):
76     """
77         Check for _check_sanity() method.
78         If a wrong class or type is given, an UsageError should raise.
79         If a good class and type are given, nothing special happens.
80     """
81
82     assert not sp_importer._check_sanity(SpecialPhrase('en', '', 'type', ''))
83     assert not sp_importer._check_sanity(SpecialPhrase('en', 'class', '', ''))
84
85     assert sp_importer._check_sanity(SpecialPhrase('en', 'class', 'type', ''))
86
87 def test_load_white_and_black_lists(sp_importer):
88     """
89         Test that _load_white_and_black_lists() well return
90         black list and white list and that they are of dict type.
91     """
92     black_list, white_list = sp_importer._load_white_and_black_lists()
93
94     assert isinstance(black_list, dict) and isinstance(white_list, dict)
95
96 def test_convert_php_settings(sp_importer, testfile_dir):
97     """
98         Test that _convert_php_settings_if_needed() convert the given
99         php file to a json file.
100     """
101     php_file = (testfile_dir / 'phrase_settings.php').resolve()
102
103     with tempfile.TemporaryDirectory() as temp_dir:
104         temp_settings = (Path(temp_dir) / 'phrase_settings.php').resolve()
105         copyfile(php_file, temp_settings)
106         sp_importer._convert_php_settings_if_needed(temp_settings)
107
108         assert (Path(temp_dir) / 'phrase_settings.json').is_file()
109
110 def test_convert_settings_wrong_file(sp_importer):
111     """
112         Test that _convert_php_settings_if_needed() raise an exception
113         if the given file is not a valid file.
114     """
115     with pytest.raises(UsageError, match='random_file is not a valid file.'):
116         sp_importer._convert_php_settings_if_needed('random_file')
117
118 def test_convert_settings_json_already_exist(sp_importer, testfile_dir):
119     """
120         Test that if we give to '_convert_php_settings_if_needed' a php file path
121         and that a the corresponding json file already exists, it is returned.
122     """
123     php_file = (testfile_dir / 'phrase_settings.php').resolve()
124     json_file = (testfile_dir / 'phrase_settings.json').resolve()
125
126     returned = sp_importer._convert_php_settings_if_needed(php_file)
127
128     assert returned == json_file
129
130 def test_convert_settings_giving_json(sp_importer, testfile_dir):
131     """
132         Test that if we give to '_convert_php_settings_if_needed' a json file path
133         the same path is directly returned
134     """
135     json_file = (testfile_dir / 'phrase_settings.json').resolve()
136
137     returned = sp_importer._convert_php_settings_if_needed(json_file)
138
139     assert returned == json_file
140
141 def test_create_place_classtype_indexes(temp_db_with_extensions, temp_db_conn,
142                                         table_factory, sp_importer):
143     """
144         Test that _create_place_classtype_indexes() create the
145         place_id index and centroid index on the right place_class_type table.
146     """
147     phrase_class = 'class'
148     phrase_type = 'type'
149     table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
150
151     table_factory(table_name, 'place_id BIGINT, centroid GEOMETRY')
152
153     sp_importer._create_place_classtype_indexes('', phrase_class, phrase_type)
154
155     assert check_placeid_and_centroid_indexes(temp_db_conn, phrase_class, phrase_type)
156
157 def test_create_place_classtype_table(temp_db_conn, placex_table, sp_importer):
158     """
159         Test that _create_place_classtype_table() create
160         the right place_classtype table.
161     """
162     phrase_class = 'class'
163     phrase_type = 'type'
164     sp_importer._create_place_classtype_table('', phrase_class, phrase_type)
165
166     assert check_table_exist(temp_db_conn, phrase_class, phrase_type)
167
168 def test_grant_access_to_web_user(temp_db_conn, table_factory, def_config, sp_importer):
169     """
170         Test that _grant_access_to_webuser() give
171         right access to the web user.
172     """
173     phrase_class = 'class'
174     phrase_type = 'type'
175     table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
176
177     table_factory(table_name)
178
179     sp_importer._grant_access_to_webuser(phrase_class, phrase_type)
180
181     assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, phrase_class, phrase_type)
182
183 def test_create_place_classtype_table_and_indexes(
184         temp_db_conn, def_config, placex_table,
185         sp_importer):
186     """
187         Test that _create_place_classtype_table_and_indexes()
188         create the right place_classtype tables and place_id indexes
189         and centroid indexes and grant access to the web user
190         for the given set of pairs.
191     """
192     pairs = set([('class1', 'type1'), ('class2', 'type2')])
193
194     sp_importer._create_place_classtype_table_and_indexes(pairs)
195
196     for pair in pairs:
197         assert check_table_exist(temp_db_conn, pair[0], pair[1])
198         assert check_placeid_and_centroid_indexes(temp_db_conn, pair[0], pair[1])
199         assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, pair[0], pair[1])
200
201 def test_remove_non_existent_tables_from_db(sp_importer, default_phrases,
202                                             temp_db_conn):
203     """
204         Check for the remove_non_existent_phrases_from_db() method.
205
206         It should removed entries from the word table which are contained
207         in the words_phrases_to_delete set and not those also contained
208         in the words_phrases_still_exist set.
209
210         place_classtype tables contained in table_phrases_to_delete should
211         be deleted.
212     """
213     sp_importer.table_phrases_to_delete = {
214         'place_classtype_testclasstypetable_to_delete'
215     }
216
217     query_tables = """
218         SELECT table_name
219         FROM information_schema.tables
220         WHERE table_schema='public'
221         AND table_name like 'place_classtype_%';
222     """
223
224     sp_importer._remove_non_existent_tables_from_db()
225
226     # Changes are not committed yet. Use temp_db_conn for checking results.
227     with temp_db_conn.cursor(cursor_factory=CursorForTesting) as cur:
228         assert cur.row_set(query_tables) \
229                  == {('place_classtype_testclasstypetable_to_keep', )}
230
231
232 @pytest.mark.parametrize("should_replace", [(True), (False)])
233 def test_import_phrases(monkeypatch, temp_db_conn, def_config, sp_importer,
234                         placex_table, table_factory, tokenizer_mock,
235                         xml_wiki_content, should_replace):
236     """
237         Check that the main import_phrases() method is well executed.
238         It should create the place_classtype table, the place_id and centroid indexes,
239         grand access to the web user and executing the SQL functions for amenities.
240         It should also update the database well by deleting or preserving existing entries
241         of the database.
242     """
243     #Add some data to the database before execution in order to test
244     #what is deleted and what is preserved.
245     table_factory('place_classtype_amenity_animal_shelter')
246     table_factory('place_classtype_wrongclass_wrongtype')
247
248     monkeypatch.setattr('nominatim.tools.special_phrases.sp_wiki_loader.SPWikiLoader._get_wiki_content',
249                         lambda self, lang: xml_wiki_content)
250
251     tokenizer = tokenizer_mock()
252     sp_importer.import_phrases(tokenizer, should_replace)
253
254     assert len(tokenizer.analyser_cache['special_phrases']) == 18
255
256     class_test = 'aerialway'
257     type_test = 'zip_line'
258
259     assert check_table_exist(temp_db_conn, class_test, type_test)
260     assert check_placeid_and_centroid_indexes(temp_db_conn, class_test, type_test)
261     assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, class_test, type_test)
262     assert check_table_exist(temp_db_conn, 'amenity', 'animal_shelter')
263     if should_replace:
264         assert not check_table_exist(temp_db_conn, 'wrong_class', 'wrong_type')
265
266     assert temp_db_conn.table_exists('place_classtype_amenity_animal_shelter')
267     if should_replace:
268         assert not temp_db_conn.table_exists('place_classtype_wrongclass_wrongtype')
269
270 def check_table_exist(temp_db_conn, phrase_class, phrase_type):
271     """
272         Verify that the place_classtype table exists for the given
273         phrase_class and phrase_type.
274     """
275     return temp_db_conn.table_exists('place_classtype_{}_{}'.format(phrase_class, phrase_type))
276
277
278 def check_grant_access(temp_db_conn, user, phrase_class, phrase_type):
279     """
280         Check that the web user has been granted right access to the
281         place_classtype table of the given phrase_class and phrase_type.
282     """
283     table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
284
285     with temp_db_conn.cursor() as temp_db_cursor:
286         temp_db_cursor.execute("""
287                 SELECT * FROM information_schema.role_table_grants
288                 WHERE table_name='{}'
289                 AND grantee='{}'
290                 AND privilege_type='SELECT'""".format(table_name, user))
291         return temp_db_cursor.fetchone()
292
293 def check_placeid_and_centroid_indexes(temp_db_conn, phrase_class, phrase_type):
294     """
295         Check that the place_id index and centroid index exist for the
296         place_classtype table of the given phrase_class and phrase_type.
297     """
298     index_prefix = 'idx_place_classtype_{}_{}_'.format(phrase_class, phrase_type)
299
300     return (
301         temp_db_conn.index_exists(index_prefix + 'centroid')
302         and
303         temp_db_conn.index_exists(index_prefix + 'place_id')
304     )