]> git.openstreetmap.org Git - nominatim.git/blob - test/python/test_tools_import_special_phrases.py
Resolve conflicts
[nominatim.git] / test / python / test_tools_import_special_phrases.py
1 """
2     Tests for import special phrases methods
3     of the class SPImporter.
4 """
5 from nominatim.errors import UsageError
6 from pathlib import Path
7 import tempfile
8 from shutil import copyfile
9 import pytest
10 from nominatim.tools.special_phrases.sp_importer import SPImporter
11 from nominatim.tools.special_phrases.sp_wiki_loader import SPWikiLoader
12 from nominatim.tools.special_phrases.sp_csv_loader import SPCsvLoader
13 from nominatim.tools.special_phrases.special_phrase import SpecialPhrase
14
15 TEST_BASE_DIR = Path(__file__) / '..' / '..'
16
17 def test_fetch_existing_place_classtype_tables(sp_importer, temp_db_cursor):
18     """
19         Check for the fetch_existing_place_classtype_tables() method.
20         It should return the table just created.
21     """
22     temp_db_cursor.execute('CREATE TABLE place_classtype_testclasstypetable()')
23
24     sp_importer._fetch_existing_place_classtype_tables()
25     contained_table = sp_importer.table_phrases_to_delete.pop()
26     assert contained_table == 'place_classtype_testclasstypetable'
27
28 def test_check_sanity_class(sp_importer):
29     """
30         Check for _check_sanity() method.
31         If a wrong class or type is given, an UsageError should raise.
32         If a good class and type are given, nothing special happens.
33     """
34
35     assert not sp_importer._check_sanity(SpecialPhrase('en', '', 'type', ''))
36     assert not sp_importer._check_sanity(SpecialPhrase('en', 'class', '', ''))
37
38     assert sp_importer._check_sanity(SpecialPhrase('en', 'class', 'type', ''))
39
40 def test_load_white_and_black_lists(sp_importer):
41     """
42         Test that _load_white_and_black_lists() well return
43         black list and white list and that they are of dict type.
44     """
45     black_list, white_list = sp_importer._load_white_and_black_lists()
46
47     assert isinstance(black_list, dict) and isinstance(white_list, dict)
48
49 def test_convert_php_settings(sp_importer):
50     """
51         Test that _convert_php_settings_if_needed() convert the given
52         php file to a json file.
53     """
54     php_file = (TEST_BASE_DIR / 'testfiles' / 'phrase_settings.php').resolve()
55
56     with tempfile.TemporaryDirectory() as temp_dir:
57         temp_settings = (Path(temp_dir) / 'phrase_settings.php').resolve()
58         copyfile(php_file, temp_settings)
59         sp_importer._convert_php_settings_if_needed(temp_settings)
60
61         assert (Path(temp_dir) / 'phrase_settings.json').is_file()
62
63 def test_convert_settings_wrong_file(sp_importer):
64     """
65         Test that _convert_php_settings_if_needed() raise an exception
66         if the given file is not a valid file.
67     """
68     with pytest.raises(UsageError, match='random_file is not a valid file.'):
69         sp_importer._convert_php_settings_if_needed('random_file')
70
71 def test_convert_settings_json_already_exist(sp_importer):
72     """
73         Test that if we give to '_convert_php_settings_if_needed' a php file path
74         and that a the corresponding json file already exists, it is returned.
75     """
76     php_file = (TEST_BASE_DIR / 'testfiles' / 'phrase_settings.php').resolve()
77     json_file = (TEST_BASE_DIR / 'testfiles' / 'phrase_settings.json').resolve()
78
79     returned = sp_importer._convert_php_settings_if_needed(php_file)
80
81     assert returned == json_file
82
83 def test_convert_settings_giving_json(sp_importer):
84     """
85         Test that if we give to '_convert_php_settings_if_needed' a json file path
86         the same path is directly returned
87     """
88     json_file = (TEST_BASE_DIR / 'testfiles' / 'phrase_settings.json').resolve()
89
90     returned = sp_importer._convert_php_settings_if_needed(json_file)
91
92     assert returned == json_file
93
94 def test_create_place_classtype_indexes(temp_db_conn, sp_importer):
95     """
96         Test that _create_place_classtype_indexes() create the
97         place_id index and centroid index on the right place_class_type table.
98     """
99     phrase_class = 'class'
100     phrase_type = 'type'
101     table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
102
103     with temp_db_conn.cursor() as temp_db_cursor:
104         temp_db_cursor.execute("CREATE EXTENSION postgis;")
105         temp_db_cursor.execute('CREATE TABLE {}(place_id BIGINT, centroid GEOMETRY)'.format(table_name))
106
107     sp_importer._create_place_classtype_indexes('', phrase_class, phrase_type)
108
109     assert check_placeid_and_centroid_indexes(temp_db_conn, phrase_class, phrase_type)
110
111 def test_create_place_classtype_table(temp_db_conn, placex_table, sp_importer):
112     """
113         Test that _create_place_classtype_table() create
114         the right place_classtype table.
115     """
116     phrase_class = 'class'
117     phrase_type = 'type'
118     sp_importer._create_place_classtype_table('', phrase_class, phrase_type)
119
120     assert check_table_exist(temp_db_conn, phrase_class, phrase_type)
121
122 def test_grant_access_to_web_user(temp_db_conn, def_config, sp_importer):
123     """
124         Test that _grant_access_to_webuser() give
125         right access to the web user.
126     """
127     phrase_class = 'class'
128     phrase_type = 'type'
129     table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
130
131     with temp_db_conn.cursor() as temp_db_cursor:
132         temp_db_cursor.execute('CREATE TABLE {}()'.format(table_name))
133
134     sp_importer._grant_access_to_webuser(phrase_class, phrase_type)
135
136     assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, phrase_class, phrase_type)
137
138 def test_create_place_classtype_table_and_indexes(
139         temp_db_conn, def_config, placex_table,
140         sp_importer):
141     """
142         Test that _create_place_classtype_table_and_indexes()
143         create the right place_classtype tables and place_id indexes
144         and centroid indexes and grant access to the web user
145         for the given set of pairs.
146     """
147     pairs = set([('class1', 'type1'), ('class2', 'type2')])
148
149     sp_importer._create_place_classtype_table_and_indexes(pairs)
150
151     for pair in pairs:
152         assert check_table_exist(temp_db_conn, pair[0], pair[1])
153         assert check_placeid_and_centroid_indexes(temp_db_conn, pair[0], pair[1])
154         assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, pair[0], pair[1])
155
156 def test_remove_non_existent_tables_from_db(sp_importer, default_phrases,
157                                              temp_db_conn):
158     """
159         Check for the remove_non_existent_phrases_from_db() method.
160
161         It should removed entries from the word table which are contained
162         in the words_phrases_to_delete set and not those also contained
163         in the words_phrases_still_exist set.
164
165         place_classtype tables contained in table_phrases_to_delete should
166         be deleted.
167     """
168     with temp_db_conn.cursor() as temp_db_cursor:
169         sp_importer.table_phrases_to_delete = {
170             'place_classtype_testclasstypetable_to_delete'
171         }
172
173         query_tables = """
174             SELECT table_name
175             FROM information_schema.tables
176             WHERE table_schema='public'
177             AND table_name like 'place_classtype_%';
178         """
179
180         sp_importer._remove_non_existent_tables_from_db()
181
182         temp_db_cursor.execute(query_tables)
183         tables_result = temp_db_cursor.fetchall()
184         assert (len(tables_result) == 1 and
185             tables_result[0][0] == 'place_classtype_testclasstypetable_to_keep'
186         )
187
188 @pytest.mark.parametrize("should_replace", [(True), (False)])
189 def test_import_phrases(monkeypatch, temp_db_conn, def_config, sp_importer,
190                         placex_table, tokenizer_mock, should_replace):
191     """
192         Check that the main import_phrases() method is well executed.
193         It should create the place_classtype table, the place_id and centroid indexes,
194         grand access to the web user and executing the SQL functions for amenities.
195         It should also update the database well by deleting or preserving existing entries
196         of the database.
197     """
198     #Add some data to the database before execution in order to test
199     #what is deleted and what is preserved.
200     with temp_db_conn.cursor() as temp_db_cursor:
201         temp_db_cursor.execute("""
202             CREATE TABLE place_classtype_amenity_animal_shelter();
203             CREATE TABLE place_classtype_wrongclass_wrongtype();""")
204     
205     monkeypatch.setattr('nominatim.tools.special_phrases.sp_wiki_loader.SPWikiLoader._get_wiki_content',
206                         mock_get_wiki_content)
207
208     tokenizer = tokenizer_mock()
209     sp_importer.import_phrases(tokenizer, should_replace)
210
211     assert len(tokenizer.analyser_cache['special_phrases']) == 18
212
213     class_test = 'aerialway'
214     type_test = 'zip_line'
215
216     assert check_table_exist(temp_db_conn, class_test, type_test)
217     assert check_placeid_and_centroid_indexes(temp_db_conn, class_test, type_test)
218     assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, class_test, type_test)
219     assert check_table_exist(temp_db_conn, 'amenity', 'animal_shelter')
220     if should_replace:
221         assert not check_table_exist(temp_db_conn, 'wrong_class', 'wrong_type')
222
223     #Format (query, should_return_something_bool) use to easily execute all asserts
224     queries_tests = set()
225
226     #Used to check that correct place_classtype table already in the datase before is still there.
227     query_existing_table = """
228         SELECT table_name
229         FROM information_schema.tables
230         WHERE table_schema='public'
231         AND table_name = 'place_classtype_amenity_animal_shelter';
232     """
233     queries_tests.add((query_existing_table, True))
234
235     #Used to check that wrong place_classtype table was deleted from the database.
236     query_wrong_table = """
237         SELECT table_name
238         FROM information_schema.tables
239         WHERE table_schema='public'
240         AND table_name = 'place_classtype_wrongclass_wrongtype';
241     """
242     if should_replace:
243         queries_tests.add((query_wrong_table, False))
244
245     with temp_db_conn.cursor() as temp_db_cursor:
246         for query in queries_tests:
247             temp_db_cursor.execute(query[0])
248             if (query[1] == True):
249                 assert temp_db_cursor.fetchone()
250             else:
251                 assert not temp_db_cursor.fetchone()
252
253 def mock_get_wiki_content(self, lang):
254     """
255         Mock the _get_wiki_content() method to return
256         static xml test file content.
257     """
258     return get_test_xml_wiki_content()
259
260 def get_test_xml_wiki_content():
261     """
262         return the content of the static xml test file.
263     """
264     xml_test_content_path = (TEST_BASE_DIR / 'testdata' / 'special_phrases_test_content.txt').resolve()
265     with open(xml_test_content_path) as xml_content_reader:
266         return xml_content_reader.read()
267
268 def check_table_exist(temp_db_conn, phrase_class, phrase_type):
269     """
270         Verify that the place_classtype table exists for the given
271         phrase_class and phrase_type.
272     """
273     table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
274
275     with temp_db_conn.cursor() as temp_db_cursor:
276         temp_db_cursor.execute("""
277             SELECT *
278             FROM information_schema.tables
279             WHERE table_type='BASE TABLE'
280             AND table_name='{}'""".format(table_name))
281         return temp_db_cursor.fetchone()
282
283 def check_grant_access(temp_db_conn, user, phrase_class, phrase_type):
284     """
285         Check that the web user has been granted right access to the
286         place_classtype table of the given phrase_class and phrase_type.
287     """
288     table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
289
290     with temp_db_conn.cursor() as temp_db_cursor:
291         temp_db_cursor.execute("""
292                 SELECT * FROM information_schema.role_table_grants
293                 WHERE table_name='{}'
294                 AND grantee='{}'
295                 AND privilege_type='SELECT'""".format(table_name, user))
296         return temp_db_cursor.fetchone()
297
298 def check_placeid_and_centroid_indexes(temp_db_conn, phrase_class, phrase_type):
299     """
300         Check that the place_id index and centroid index exist for the
301         place_classtype table of the given phrase_class and phrase_type.
302     """
303     index_prefix = 'idx_place_classtype_{}_{}_'.format(phrase_class, phrase_type)
304
305     return (
306         temp_db_conn.index_exists(index_prefix + 'centroid')
307         and
308         temp_db_conn.index_exists(index_prefix + 'place_id')
309     )
310
311 @pytest.fixture
312 def sp_importer(temp_db_conn, def_config, temp_phplib_dir_with_migration):
313     """
314         Return an instance of SPImporter.
315     """
316     loader = SPWikiLoader(def_config, ['en'])
317     return SPImporter(def_config, temp_phplib_dir_with_migration, temp_db_conn, loader)
318
319 @pytest.fixture
320 def temp_phplib_dir_with_migration():
321     """
322         Return temporary phpdir with migration subdirectory and
323         PhraseSettingsToJson.php script inside.
324     """
325     migration_file = (TEST_BASE_DIR / '..' / 'lib-php' / 'migration'
326                       / 'PhraseSettingsToJson.php').resolve()
327     with tempfile.TemporaryDirectory() as phpdir:
328         (Path(phpdir) / 'migration').mkdir()
329         migration_dest_path = (Path(phpdir) / 'migration' / 'PhraseSettingsToJson.php').resolve()
330         copyfile(migration_file, migration_dest_path)
331
332         yield Path(phpdir)
333
334 @pytest.fixture
335 def default_phrases(temp_db_cursor):
336     temp_db_cursor.execute("""
337         CREATE TABLE place_classtype_testclasstypetable_to_delete();
338         CREATE TABLE place_classtype_testclasstypetable_to_keep();""")