]> git.openstreetmap.org Git - nominatim.git/blob - test/python/test_tools_import_special_phrases.py
d2693a9a702251fb8b0e9d4e7c58720424f18e2c
[nominatim.git] / test / python / test_tools_import_special_phrases.py
1 """
2     Tests for import special phrases functions
3 """
4 import pytest
5 from nominatim.tools.special_phrases import _create_place_classtype_indexes, _create_place_classtype_table, _get_wiki_content, _grant_access_to_webuser, _process_amenity
6
7 def test_get_wiki_content():
8     assert _get_wiki_content('fr')
9
10 def execute_and_verify_add_word(temp_db_conn, phrase_label, normalized_label, 
11                                 phrase_class, phrase_type):
12     _process_amenity(temp_db_conn, phrase_label, normalized_label,
13                      phrase_class, phrase_type, '')
14
15     with temp_db_conn.cursor() as temp_db_cursor:
16         temp_db_cursor.execute(f"""
17             SELECT * FROM word 
18             WHERE word_token=' {normalized_label}'
19             AND word='{normalized_label}'
20             AND class='{phrase_class}'
21             AND type='{phrase_type}'
22             AND type='{phrase_type}'""")
23         return temp_db_cursor.fetchone()
24
25 def execute_and_verify_add_word_with_operator(temp_db_conn, phrase_label, normalized_label,
26                                               phrase_class, phrase_type, phrase_operator):
27     _process_amenity(temp_db_conn, phrase_label, normalized_label,
28                      phrase_class, phrase_type, phrase_operator)
29
30     with temp_db_conn.cursor() as temp_db_cursor:
31         temp_db_cursor.execute(f"""
32             SELECT * FROM word 
33             WHERE word_token=' {normalized_label}'
34             AND word='{normalized_label}'
35             AND class='{phrase_class}'
36             AND type='{phrase_type}'
37             AND operator='{phrase_operator}'""")
38         return temp_db_cursor.fetchone()
39
40 def test_process_amenity_with_near_operator(temp_db_conn, word_table, amenity_operator_funcs):
41     phrase_label = ' label    '
42     normalized_label = 'label'
43     phrase_class = 'class'
44     phrase_type = 'type'
45
46     assert execute_and_verify_add_word(temp_db_conn, phrase_label, normalized_label, 
47                                        phrase_class, phrase_type)
48     assert execute_and_verify_add_word_with_operator(temp_db_conn, phrase_label, normalized_label, 
49                                                      phrase_class, phrase_type, 'near')
50     assert execute_and_verify_add_word_with_operator(temp_db_conn, phrase_label, normalized_label, 
51                                                      phrase_class, phrase_type, 'in')
52
53 def index_exists(db_connect, index):
54         """ Check that an index with the given name exists in the database.
55         """
56         with db_connect.cursor() as cur:
57             cur.execute("""SELECT tablename FROM pg_indexes
58                            WHERE indexname = %s and schemaname = 'public'""", (index, ))
59             if cur.rowcount == 0:
60                 return False
61         return True
62
63 def test_create_place_classtype_indexes(temp_db_conn):
64     phrase_class = 'class'
65     phrase_type = 'type'
66     table_name = f'place_classtype_{phrase_class}_{phrase_type}'
67
68     with temp_db_conn.cursor() as temp_db_cursor:
69         temp_db_cursor.execute("CREATE EXTENSION postgis;")
70         temp_db_cursor.execute(f'CREATE TABLE {table_name}(place_id BIGINT, centroid GEOMETRY)')
71
72     _create_place_classtype_indexes(temp_db_conn, '', phrase_class, phrase_type)
73
74     centroid_index_exists = index_exists(temp_db_conn, f'idx_place_classtype_{phrase_class}_{phrase_type}_centroid')
75     place_id_index_exists = index_exists(temp_db_conn, f'idx_place_classtype_{phrase_class}_{phrase_type}_place_id')
76
77     assert centroid_index_exists and place_id_index_exists
78
79 def test_create_place_classtype_table(temp_db_conn, placex_table):
80     phrase_class = 'class'
81     phrase_type = 'type'
82     _create_place_classtype_table(temp_db_conn, '', phrase_class, phrase_type)
83
84     with temp_db_conn.cursor() as temp_db_cursor:
85         temp_db_cursor.execute(f"""
86             SELECT *
87             FROM information_schema.tables
88             WHERE table_type='BASE TABLE'
89             AND table_name='place_classtype_{phrase_class}_{phrase_type}'""")
90         result = temp_db_cursor.fetchone()
91     assert result
92
93 def test_grant_access_to_web_user(temp_db_conn, def_config):
94     phrase_class = 'class'
95     phrase_type = 'type'
96     table_name = f'place_classtype_{phrase_class}_{phrase_type}'
97
98     with temp_db_conn.cursor() as temp_db_cursor:
99         temp_db_cursor.execute(f'CREATE TABLE {table_name}()')
100
101     _grant_access_to_webuser(temp_db_conn, def_config, phrase_class, phrase_type)
102
103     with temp_db_conn.cursor() as temp_db_cursor:
104         temp_db_cursor.execute(f"""
105                 SELECT * FROM information_schema.role_table_grants
106                 WHERE table_name='{table_name}' 
107                 AND grantee='{def_config.DATABASE_WEBUSER}' 
108                 AND privilege_type='SELECT'""")
109         result = temp_db_cursor.fetchone()
110     assert result
111
112 @pytest.fixture
113 def amenity_operator_funcs(temp_db_cursor):                        
114     temp_db_cursor.execute(f"""
115         CREATE OR REPLACE FUNCTION make_standard_name(name TEXT) RETURNS TEXT
116         AS $$
117         DECLARE
118         o TEXT;
119         BEGIN
120         RETURN trim(name); --Basically return only the trimed name for the tests
121         END;
122         $$
123         LANGUAGE plpgsql IMMUTABLE;
124
125         CREATE SEQUENCE seq_word start 1;
126
127         CREATE OR REPLACE FUNCTION getorcreate_amenity(lookup_word TEXT, normalized_word TEXT,
128                                                     lookup_class text, lookup_type text)
129         RETURNS INTEGER
130         AS $$
131         DECLARE
132         lookup_token TEXT;
133         return_word_id INTEGER;
134         BEGIN
135         lookup_token := ' '||trim(lookup_word);
136         SELECT min(word_id) FROM word
137         WHERE word_token = lookup_token and word = normalized_word
138                 and class = lookup_class and type = lookup_type
139         INTO return_word_id;
140         IF return_word_id IS NULL THEN
141             return_word_id := nextval('seq_word');
142             INSERT INTO word VALUES (return_word_id, lookup_token, normalized_word,
143                                     lookup_class, lookup_type, null, 0);
144         END IF;
145         RETURN return_word_id;
146         END;
147         $$
148         LANGUAGE plpgsql;
149
150         CREATE OR REPLACE FUNCTION getorcreate_amenityoperator(lookup_word TEXT,
151                                                        normalized_word TEXT,
152                                                        lookup_class text,
153                                                        lookup_type text,
154                                                        op text)
155         RETURNS INTEGER
156         AS $$
157         DECLARE
158         lookup_token TEXT;
159         return_word_id INTEGER;
160         BEGIN
161         lookup_token := ' '||trim(lookup_word);
162         SELECT min(word_id) FROM word
163         WHERE word_token = lookup_token and word = normalized_word
164                 and class = lookup_class and type = lookup_type and operator = op
165         INTO return_word_id;
166         IF return_word_id IS NULL THEN
167             return_word_id := nextval('seq_word');
168             INSERT INTO word VALUES (return_word_id, lookup_token, normalized_word,
169                                     lookup_class, lookup_type, null, 0, op);
170         END IF;
171         RETURN return_word_id;
172         END;
173         $$
174         LANGUAGE plpgsql;""")