2 Tests for import special phrases methods
3 of the class SPImporter.
5 from shutil import copyfile
7 from nominatim.tools.special_phrases.sp_importer import SPImporter
8 from nominatim.tools.special_phrases.sp_wiki_loader import SPWikiLoader
9 from nominatim.tools.special_phrases.special_phrase import SpecialPhrase
10 from nominatim.errors import UsageError
12 from cursor import CursorForTesting
15 def testfile_dir(src_dir):
16 return src_dir / 'test' / 'testfiles'
20 def sp_importer(temp_db_conn, def_config, temp_phplib_dir_with_migration):
22 Return an instance of SPImporter.
24 loader = SPWikiLoader(def_config, ['en'])
25 return SPImporter(def_config, temp_phplib_dir_with_migration, temp_db_conn, loader)
29 def temp_phplib_dir_with_migration(src_dir, tmp_path):
31 Return temporary phpdir with migration subdirectory and
32 PhraseSettingsToJson.php script inside.
34 migration_file = (src_dir / 'lib-php' / 'migration' / 'PhraseSettingsToJson.php').resolve()
36 phpdir = tmp_path / 'tempphp'
39 (phpdir / 'migration').mkdir()
40 migration_dest_path = (phpdir / 'migration' / 'PhraseSettingsToJson.php').resolve()
41 copyfile(str(migration_file), str(migration_dest_path))
47 def xml_wiki_content(src_dir):
49 return the content of the static xml test file.
51 xml_test_content = src_dir / 'test' / 'testdata' / 'special_phrases_test_content.txt'
52 return xml_test_content.read_text()
56 def default_phrases(table_factory):
57 table_factory('place_classtype_testclasstypetable_to_delete')
58 table_factory('place_classtype_testclasstypetable_to_keep')
61 def test_fetch_existing_place_classtype_tables(sp_importer, table_factory):
63 Check for the fetch_existing_place_classtype_tables() method.
64 It should return the table just created.
66 table_factory('place_classtype_testclasstypetable')
68 sp_importer._fetch_existing_place_classtype_tables()
69 contained_table = sp_importer.table_phrases_to_delete.pop()
70 assert contained_table == 'place_classtype_testclasstypetable'
72 def test_check_sanity_class(sp_importer):
74 Check for _check_sanity() method.
75 If a wrong class or type is given, an UsageError should raise.
76 If a good class and type are given, nothing special happens.
79 assert not sp_importer._check_sanity(SpecialPhrase('en', '', 'type', ''))
80 assert not sp_importer._check_sanity(SpecialPhrase('en', 'class', '', ''))
82 assert sp_importer._check_sanity(SpecialPhrase('en', 'class', 'type', ''))
84 def test_load_white_and_black_lists(sp_importer):
86 Test that _load_white_and_black_lists() well return
87 black list and white list and that they are of dict type.
89 black_list, white_list = sp_importer._load_white_and_black_lists()
91 assert isinstance(black_list, dict) and isinstance(white_list, dict)
93 def test_convert_php_settings(sp_importer, testfile_dir, tmp_path):
95 Test that _convert_php_settings_if_needed() convert the given
96 php file to a json file.
98 php_file = (testfile_dir / 'phrase_settings.php').resolve()
100 temp_settings = (tmp_path / 'phrase_settings.php').resolve()
101 copyfile(php_file, temp_settings)
102 sp_importer._convert_php_settings_if_needed(temp_settings)
104 assert (tmp_path / 'phrase_settings.json').is_file()
106 def test_convert_settings_wrong_file(sp_importer):
108 Test that _convert_php_settings_if_needed() raise an exception
109 if the given file is not a valid file.
111 with pytest.raises(UsageError, match='random_file is not a valid file.'):
112 sp_importer._convert_php_settings_if_needed('random_file')
114 def test_convert_settings_json_already_exist(sp_importer, testfile_dir):
116 Test that if we give to '_convert_php_settings_if_needed' a php file path
117 and that a the corresponding json file already exists, it is returned.
119 php_file = (testfile_dir / 'phrase_settings.php').resolve()
120 json_file = (testfile_dir / 'phrase_settings.json').resolve()
122 returned = sp_importer._convert_php_settings_if_needed(php_file)
124 assert returned == json_file
126 def test_convert_settings_giving_json(sp_importer, testfile_dir):
128 Test that if we give to '_convert_php_settings_if_needed' a json file path
129 the same path is directly returned
131 json_file = (testfile_dir / 'phrase_settings.json').resolve()
133 returned = sp_importer._convert_php_settings_if_needed(json_file)
135 assert returned == json_file
137 def test_create_place_classtype_indexes(temp_db_with_extensions, temp_db_conn,
138 table_factory, sp_importer):
140 Test that _create_place_classtype_indexes() create the
141 place_id index and centroid index on the right place_class_type table.
143 phrase_class = 'class'
145 table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
147 table_factory(table_name, 'place_id BIGINT, centroid GEOMETRY')
149 sp_importer._create_place_classtype_indexes('', phrase_class, phrase_type)
151 assert check_placeid_and_centroid_indexes(temp_db_conn, phrase_class, phrase_type)
153 def test_create_place_classtype_table(temp_db_conn, placex_table, sp_importer):
155 Test that _create_place_classtype_table() create
156 the right place_classtype table.
158 phrase_class = 'class'
160 sp_importer._create_place_classtype_table('', phrase_class, phrase_type)
162 assert check_table_exist(temp_db_conn, phrase_class, phrase_type)
164 def test_grant_access_to_web_user(temp_db_conn, table_factory, def_config, sp_importer):
166 Test that _grant_access_to_webuser() give
167 right access to the web user.
169 phrase_class = 'class'
171 table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
173 table_factory(table_name)
175 sp_importer._grant_access_to_webuser(phrase_class, phrase_type)
177 assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, phrase_class, phrase_type)
179 def test_create_place_classtype_table_and_indexes(
180 temp_db_conn, def_config, placex_table,
183 Test that _create_place_classtype_table_and_indexes()
184 create the right place_classtype tables and place_id indexes
185 and centroid indexes and grant access to the web user
186 for the given set of pairs.
188 pairs = set([('class1', 'type1'), ('class2', 'type2')])
190 sp_importer._create_place_classtype_table_and_indexes(pairs)
193 assert check_table_exist(temp_db_conn, pair[0], pair[1])
194 assert check_placeid_and_centroid_indexes(temp_db_conn, pair[0], pair[1])
195 assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, pair[0], pair[1])
197 def test_remove_non_existent_tables_from_db(sp_importer, default_phrases,
200 Check for the remove_non_existent_phrases_from_db() method.
202 It should removed entries from the word table which are contained
203 in the words_phrases_to_delete set and not those also contained
204 in the words_phrases_still_exist set.
206 place_classtype tables contained in table_phrases_to_delete should
209 sp_importer.table_phrases_to_delete = {
210 'place_classtype_testclasstypetable_to_delete'
215 FROM information_schema.tables
216 WHERE table_schema='public'
217 AND table_name like 'place_classtype_%';
220 sp_importer._remove_non_existent_tables_from_db()
222 # Changes are not committed yet. Use temp_db_conn for checking results.
223 with temp_db_conn.cursor(cursor_factory=CursorForTesting) as cur:
224 assert cur.row_set(query_tables) \
225 == {('place_classtype_testclasstypetable_to_keep', )}
228 @pytest.mark.parametrize("should_replace", [(True), (False)])
229 def test_import_phrases(monkeypatch, temp_db_conn, def_config, sp_importer,
230 placex_table, table_factory, tokenizer_mock,
231 xml_wiki_content, should_replace):
233 Check that the main import_phrases() method is well executed.
234 It should create the place_classtype table, the place_id and centroid indexes,
235 grand access to the web user and executing the SQL functions for amenities.
236 It should also update the database well by deleting or preserving existing entries
239 #Add some data to the database before execution in order to test
240 #what is deleted and what is preserved.
241 table_factory('place_classtype_amenity_animal_shelter')
242 table_factory('place_classtype_wrongclass_wrongtype')
244 monkeypatch.setattr('nominatim.tools.special_phrases.sp_wiki_loader.SPWikiLoader._get_wiki_content',
245 lambda self, lang: xml_wiki_content)
247 tokenizer = tokenizer_mock()
248 sp_importer.import_phrases(tokenizer, should_replace)
250 assert len(tokenizer.analyser_cache['special_phrases']) == 18
252 class_test = 'aerialway'
253 type_test = 'zip_line'
255 assert check_table_exist(temp_db_conn, class_test, type_test)
256 assert check_placeid_and_centroid_indexes(temp_db_conn, class_test, type_test)
257 assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, class_test, type_test)
258 assert check_table_exist(temp_db_conn, 'amenity', 'animal_shelter')
260 assert not check_table_exist(temp_db_conn, 'wrong_class', 'wrong_type')
262 assert temp_db_conn.table_exists('place_classtype_amenity_animal_shelter')
264 assert not temp_db_conn.table_exists('place_classtype_wrongclass_wrongtype')
266 def check_table_exist(temp_db_conn, phrase_class, phrase_type):
268 Verify that the place_classtype table exists for the given
269 phrase_class and phrase_type.
271 return temp_db_conn.table_exists('place_classtype_{}_{}'.format(phrase_class, phrase_type))
274 def check_grant_access(temp_db_conn, user, phrase_class, phrase_type):
276 Check that the web user has been granted right access to the
277 place_classtype table of the given phrase_class and phrase_type.
279 table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
281 with temp_db_conn.cursor() as temp_db_cursor:
282 temp_db_cursor.execute("""
283 SELECT * FROM information_schema.role_table_grants
284 WHERE table_name='{}'
286 AND privilege_type='SELECT'""".format(table_name, user))
287 return temp_db_cursor.fetchone()
289 def check_placeid_and_centroid_indexes(temp_db_conn, phrase_class, phrase_type):
291 Check that the place_id index and centroid index exist for the
292 place_classtype table of the given phrase_class and phrase_type.
294 index_prefix = 'idx_place_classtype_{}_{}_'.format(phrase_class, phrase_type)
297 temp_db_conn.index_exists(index_prefix + 'centroid')
299 temp_db_conn.index_exists(index_prefix + 'place_id')