2 Tests for import special phrases methods
3 of the class SPImporter.
5 from nominatim.errors import UsageError
6 from pathlib import Path
8 from shutil import copyfile
10 from nominatim.tools.special_phrases.sp_importer import SPImporter
11 from nominatim.tools.special_phrases.sp_wiki_loader import SPWikiLoader
12 from nominatim.tools.special_phrases.sp_csv_loader import SPCsvLoader
13 from nominatim.tools.special_phrases.special_phrase import SpecialPhrase
15 from cursor import CursorForTesting
18 def testfile_dir(src_dir):
19 return src_dir / 'test' / 'testfiles'
23 def sp_importer(temp_db_conn, def_config, temp_phplib_dir_with_migration):
25 Return an instance of SPImporter.
27 loader = SPWikiLoader(def_config, ['en'])
28 return SPImporter(def_config, temp_phplib_dir_with_migration, temp_db_conn, loader)
32 def temp_phplib_dir_with_migration(src_dir, tmp_path):
34 Return temporary phpdir with migration subdirectory and
35 PhraseSettingsToJson.php script inside.
37 migration_file = (src_dir / 'lib-php' / 'migration' / 'PhraseSettingsToJson.php').resolve()
39 phpdir = tmp_path / 'tempphp'
42 (phpdir / 'migration').mkdir()
43 migration_dest_path = (phpdir / 'migration' / 'PhraseSettingsToJson.php').resolve()
44 copyfile(str(migration_file), str(migration_dest_path))
50 def xml_wiki_content(src_dir):
52 return the content of the static xml test file.
54 xml_test_content_path = (src_dir / 'test' / 'testdata' / 'special_phrases_test_content.txt').resolve()
55 return xml_test_content_path.read_text()
59 def default_phrases(table_factory):
60 table_factory('place_classtype_testclasstypetable_to_delete')
61 table_factory('place_classtype_testclasstypetable_to_keep')
64 def test_fetch_existing_place_classtype_tables(sp_importer, table_factory):
66 Check for the fetch_existing_place_classtype_tables() method.
67 It should return the table just created.
69 table_factory('place_classtype_testclasstypetable')
71 sp_importer._fetch_existing_place_classtype_tables()
72 contained_table = sp_importer.table_phrases_to_delete.pop()
73 assert contained_table == 'place_classtype_testclasstypetable'
75 def test_check_sanity_class(sp_importer):
77 Check for _check_sanity() method.
78 If a wrong class or type is given, an UsageError should raise.
79 If a good class and type are given, nothing special happens.
82 assert not sp_importer._check_sanity(SpecialPhrase('en', '', 'type', ''))
83 assert not sp_importer._check_sanity(SpecialPhrase('en', 'class', '', ''))
85 assert sp_importer._check_sanity(SpecialPhrase('en', 'class', 'type', ''))
87 def test_load_white_and_black_lists(sp_importer):
89 Test that _load_white_and_black_lists() well return
90 black list and white list and that they are of dict type.
92 black_list, white_list = sp_importer._load_white_and_black_lists()
94 assert isinstance(black_list, dict) and isinstance(white_list, dict)
96 def test_convert_php_settings(sp_importer, testfile_dir):
98 Test that _convert_php_settings_if_needed() convert the given
99 php file to a json file.
101 php_file = (testfile_dir / 'phrase_settings.php').resolve()
103 with tempfile.TemporaryDirectory() as temp_dir:
104 temp_settings = (Path(temp_dir) / 'phrase_settings.php').resolve()
105 copyfile(php_file, temp_settings)
106 sp_importer._convert_php_settings_if_needed(temp_settings)
108 assert (Path(temp_dir) / 'phrase_settings.json').is_file()
110 def test_convert_settings_wrong_file(sp_importer):
112 Test that _convert_php_settings_if_needed() raise an exception
113 if the given file is not a valid file.
115 with pytest.raises(UsageError, match='random_file is not a valid file.'):
116 sp_importer._convert_php_settings_if_needed('random_file')
118 def test_convert_settings_json_already_exist(sp_importer, testfile_dir):
120 Test that if we give to '_convert_php_settings_if_needed' a php file path
121 and that a the corresponding json file already exists, it is returned.
123 php_file = (testfile_dir / 'phrase_settings.php').resolve()
124 json_file = (testfile_dir / 'phrase_settings.json').resolve()
126 returned = sp_importer._convert_php_settings_if_needed(php_file)
128 assert returned == json_file
130 def test_convert_settings_giving_json(sp_importer, testfile_dir):
132 Test that if we give to '_convert_php_settings_if_needed' a json file path
133 the same path is directly returned
135 json_file = (testfile_dir / 'phrase_settings.json').resolve()
137 returned = sp_importer._convert_php_settings_if_needed(json_file)
139 assert returned == json_file
141 def test_create_place_classtype_indexes(temp_db_with_extensions, temp_db_conn,
142 table_factory, sp_importer):
144 Test that _create_place_classtype_indexes() create the
145 place_id index and centroid index on the right place_class_type table.
147 phrase_class = 'class'
149 table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
151 table_factory(table_name, 'place_id BIGINT, centroid GEOMETRY')
153 sp_importer._create_place_classtype_indexes('', phrase_class, phrase_type)
155 assert check_placeid_and_centroid_indexes(temp_db_conn, phrase_class, phrase_type)
157 def test_create_place_classtype_table(temp_db_conn, placex_table, sp_importer):
159 Test that _create_place_classtype_table() create
160 the right place_classtype table.
162 phrase_class = 'class'
164 sp_importer._create_place_classtype_table('', phrase_class, phrase_type)
166 assert check_table_exist(temp_db_conn, phrase_class, phrase_type)
168 def test_grant_access_to_web_user(temp_db_conn, table_factory, def_config, sp_importer):
170 Test that _grant_access_to_webuser() give
171 right access to the web user.
173 phrase_class = 'class'
175 table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
177 table_factory(table_name)
179 sp_importer._grant_access_to_webuser(phrase_class, phrase_type)
181 assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, phrase_class, phrase_type)
183 def test_create_place_classtype_table_and_indexes(
184 temp_db_conn, def_config, placex_table,
187 Test that _create_place_classtype_table_and_indexes()
188 create the right place_classtype tables and place_id indexes
189 and centroid indexes and grant access to the web user
190 for the given set of pairs.
192 pairs = set([('class1', 'type1'), ('class2', 'type2')])
194 sp_importer._create_place_classtype_table_and_indexes(pairs)
197 assert check_table_exist(temp_db_conn, pair[0], pair[1])
198 assert check_placeid_and_centroid_indexes(temp_db_conn, pair[0], pair[1])
199 assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, pair[0], pair[1])
201 def test_remove_non_existent_tables_from_db(sp_importer, default_phrases,
204 Check for the remove_non_existent_phrases_from_db() method.
206 It should removed entries from the word table which are contained
207 in the words_phrases_to_delete set and not those also contained
208 in the words_phrases_still_exist set.
210 place_classtype tables contained in table_phrases_to_delete should
213 sp_importer.table_phrases_to_delete = {
214 'place_classtype_testclasstypetable_to_delete'
219 FROM information_schema.tables
220 WHERE table_schema='public'
221 AND table_name like 'place_classtype_%';
224 sp_importer._remove_non_existent_tables_from_db()
226 # Changes are not committed yet. Use temp_db_conn for checking results.
227 with temp_db_conn.cursor(cursor_factory=CursorForTesting) as cur:
228 assert cur.row_set(query_tables) \
229 == {('place_classtype_testclasstypetable_to_keep', )}
232 @pytest.mark.parametrize("should_replace", [(True), (False)])
233 def test_import_phrases(monkeypatch, temp_db_conn, def_config, sp_importer,
234 placex_table, table_factory, tokenizer_mock,
235 xml_wiki_content, should_replace):
237 Check that the main import_phrases() method is well executed.
238 It should create the place_classtype table, the place_id and centroid indexes,
239 grand access to the web user and executing the SQL functions for amenities.
240 It should also update the database well by deleting or preserving existing entries
243 #Add some data to the database before execution in order to test
244 #what is deleted and what is preserved.
245 table_factory('place_classtype_amenity_animal_shelter')
246 table_factory('place_classtype_wrongclass_wrongtype')
248 monkeypatch.setattr('nominatim.tools.special_phrases.sp_wiki_loader.SPWikiLoader._get_wiki_content',
249 lambda self, lang: xml_wiki_content)
251 tokenizer = tokenizer_mock()
252 sp_importer.import_phrases(tokenizer, should_replace)
254 assert len(tokenizer.analyser_cache['special_phrases']) == 18
256 class_test = 'aerialway'
257 type_test = 'zip_line'
259 assert check_table_exist(temp_db_conn, class_test, type_test)
260 assert check_placeid_and_centroid_indexes(temp_db_conn, class_test, type_test)
261 assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, class_test, type_test)
262 assert check_table_exist(temp_db_conn, 'amenity', 'animal_shelter')
264 assert not check_table_exist(temp_db_conn, 'wrong_class', 'wrong_type')
266 assert temp_db_conn.table_exists('place_classtype_amenity_animal_shelter')
268 assert not temp_db_conn.table_exists('place_classtype_wrongclass_wrongtype')
270 def check_table_exist(temp_db_conn, phrase_class, phrase_type):
272 Verify that the place_classtype table exists for the given
273 phrase_class and phrase_type.
275 return temp_db_conn.table_exists('place_classtype_{}_{}'.format(phrase_class, phrase_type))
278 def check_grant_access(temp_db_conn, user, phrase_class, phrase_type):
280 Check that the web user has been granted right access to the
281 place_classtype table of the given phrase_class and phrase_type.
283 table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
285 with temp_db_conn.cursor() as temp_db_cursor:
286 temp_db_cursor.execute("""
287 SELECT * FROM information_schema.role_table_grants
288 WHERE table_name='{}'
290 AND privilege_type='SELECT'""".format(table_name, user))
291 return temp_db_cursor.fetchone()
293 def check_placeid_and_centroid_indexes(temp_db_conn, phrase_class, phrase_type):
295 Check that the place_id index and centroid index exist for the
296 place_classtype table of the given phrase_class and phrase_type.
298 index_prefix = 'idx_place_classtype_{}_{}_'.format(phrase_class, phrase_type)
301 temp_db_conn.index_exists(index_prefix + 'centroid')
303 temp_db_conn.index_exists(index_prefix + 'place_id')