5 from unittest.mock import MagicMock
7 from nominatim_db.errors import UsageError
8 from nominatim_db.tools.special_phrases.sp_csv_loader import SPCsvLoader
9 from nominatim_db.tools.special_phrases.special_phrase import SpecialPhrase
10 from nominatim_db.tools.special_phrases.sp_importer import SPImporter
13 def sample_style_file():
16 "keys" : ["emergency"],
18 "fire_hydrant" : "skip",
25 "keys" : ["historic", "military"],
33 "keys" : ["name:prefix", "name:suffix", "name:prefix:*", "name:suffix:*",
34 "name:botanical", "wikidata", "*:wikidata"],
40 "keys" : ["addr:housename"],
53 content = ",".join(json.dumps(entry) for entry in sample_data)
55 with tempfile.NamedTemporaryFile(mode='w+', delete=False) as tmp:
63 def test_get_sp_style(sample_style_file):
64 mock_config = MagicMock()
65 mock_config.get_import_style_file.return_value = sample_style_file
67 importer = SPImporter(config=mock_config, conn=None, sp_loader=None)
68 result = importer.get_sp_style()
71 ("highway", "motorway"),
74 assert result == expected
85 def test_create_classtype_table_and_indexes():
86 mock_config = MagicMock()
87 mock_config.TABLESPACE_AUX_DATA = ''
88 mock_config.DATABASE_WEBUSER = 'www-data'
90 mock_cursor = MagicMock()
91 mock_conn = MagicMock()
92 mock_conn.cursor.return_value.__enter__.return_value = mock_cursor
94 importer = SPImporter(config=mock_config, conn=mock_conn, sp_loader=None)
96 importer._create_place_classtype_table = MagicMock()
97 importer._create_place_classtype_indexes = MagicMock()
98 importer._grant_access_to_webuser = MagicMock()
99 importer.statistics_handler.notify_one_table_created = lambda: print("✓ Created table")
100 importer.statistics_handler.notify_one_table_ignored = lambda: print("⨉ Ignored table")
102 importer.table_phrases_to_delete = {"place_classtype_highway_motorway"}
104 test_pairs = [("highway", "motorway"), ("natural", "peak")]
105 importer._create_classtype_table_and_indexes(test_pairs)
107 print("create_place_classtype_table calls:")
108 for call in importer._create_place_classtype_table.call_args_list:
111 print("\ncreate_place_classtype_indexes calls:")
112 for call in importer._create_place_classtype_indexes.call_args_list:
115 print("\ngrant_access_to_webuser calls:")
116 for call in importer._grant_access_to_webuser.call_args_list:
122 config.TABLESPACE_AUX_DATA = ''
123 config.DATABASE_WEBUSER = 'www-data'
124 config.load_sub_configuration.return_value = {'blackList': {}, 'whiteList': {}}
128 def test_import_phrases_original(mock_config):
129 phrase = SpecialPhrase("roundabout", "highway", "motorway", "eq")
131 mock_conn = MagicMock()
132 mock_cursor = MagicMock()
133 mock_conn.cursor.return_value.__enter__.return_value = mock_cursor
134 mock_loader = MagicMock()
135 mock_loader.generate_phrases.return_value = [phrase]
137 mock_analyzer = MagicMock()
138 mock_tokenizer = MagicMock()
139 mock_tokenizer.name_analyzer.return_value.__enter__.return_value = mock_analyzer
141 importer = SPImporter(config=mock_config, conn=mock_conn, sp_loader=mock_loader)
142 importer._fetch_existing_place_classtype_tables = MagicMock()
143 importer._create_classtype_table_and_indexes = MagicMock()
144 importer._remove_non_existent_tables_from_db = MagicMock()
146 importer.import_phrases(tokenizer=mock_tokenizer, should_replace=True)
148 assert importer.word_phrases == {("roundabout", "highway", "motorway", "-")}
150 mock_analyzer.update_special_phrases.assert_called_once_with(
151 importer.word_phrases, True
155 def test_get_sp_filters_correctly(sample_style_file):
156 mock_config = MagicMock()
157 mock_config.get_import_style_file.return_value = sample_style_file
158 mock_config.load_sub_configuration.return_value = {"blackList": {}, "whiteList": {}}
160 importer = SPImporter(config=mock_config, conn=MagicMock(), sp_loader=None)
162 allowed_from_db = {("highway", "motorway"), ("historic", "castle")}
163 importer.get_sp_db = lambda: allowed_from_db
165 result = importer.get_sp()
167 expected = {("highway", "motorway")}
169 assert result == expected, f"Expected {expected}, got {result}"
171 def test_get_sp_db_filters_by_count_threshold(mock_config):
172 mock_cursor = MagicMock()
174 # Simulate only results above the threshold being returned (as SQL would)
175 # These tuples simulate real SELECT class, type FROM placex GROUP BY ... HAVING COUNT(*) > 100
176 mock_cursor.fetchall.return_value = [
177 ("highway", "motorway"),
178 ("historic", "castle")
181 mock_conn = MagicMock()
182 mock_conn.cursor.return_value.__enter__.return_value = mock_cursor
183 importer = SPImporter(config=mock_config, conn=mock_conn, sp_loader=None)
185 result = importer.get_sp_db()
188 ("highway", "motorway"),
189 ("historic", "castle")
192 assert result == expected
193 mock_cursor.execute.assert_called_once()