""" Update postcode tokens in the word table from the location_postcode
table.
"""
- to_delete = []
+ analyzer = self.token_analysis.analysis.get('@postcode')
+
with self.conn.cursor() as cur:
- # This finds us the rows in location_postcode and word that are
- # missing in the other table.
- cur.execute("""SELECT * FROM
- (SELECT pc, word FROM
- (SELECT distinct(postcode) as pc FROM location_postcode) p
- FULL JOIN
- (SELECT word FROM word WHERE type = 'P') w
- ON pc = word) x
- WHERE pc is null or word is null""")
-
- with CopyBuffer() as copystr:
- for postcode, word in cur:
- if postcode is None:
- to_delete.append(word)
- else:
- copystr.add(self._search_normalized(postcode),
- 'P', postcode)
-
- if to_delete:
- cur.execute("""DELETE FROM WORD
- WHERE type ='P' and word = any(%s)
- """, (to_delete, ))
-
- copystr.copy_out(cur, 'word',
- columns=['word_token', 'type', 'word'])
+ # First get all postcode names currently in the word table.
+ cur.execute("SELECT DISTINCT word FROM word WHERE type = 'P'")
+ word_entries = set((entry[0] for entry in cur))
+
+ # Then compute the required postcode names from the postcode table.
+ needed_entries = set()
+ cur.execute("SELECT country_code, postcode FROM location_postcode")
+ for cc, postcode in cur:
+ info = PlaceInfo({'country_code': cc,
+ 'class': 'place', 'type': 'postcode',
+ 'address': {'postcode': postcode}})
+ address = self.sanitizer.process_names(info)[1]
+ for place in address:
+ if place.kind == 'postcode':
+ if analyzer is None:
+ postcode_name = place.name.strip().upper()
+ variant_base = None
+ else:
+ postcode_name = analyzer.normalize(place.name)
+ variant_base = place.get_attr("variant")
+
+ if variant_base:
+ needed_entries.add(f'{postcode_name}@{variant_base}')
+ else:
+ needed_entries.add(postcode_name)
+ break
+
+ # Now update the word table.
+ self._delete_unused_postcode_words(word_entries - needed_entries)
+ self._add_missing_postcode_words(needed_entries - word_entries)
+
+ def _delete_unused_postcode_words(self, tokens):
+ if tokens:
+ with self.conn.cursor() as cur:
+ cur.execute("DELETE FROM word WHERE type = 'P' and word = any(%s)",
+ (list(tokens), ))
+
+ def _add_missing_postcode_words(self, tokens):
+ if not tokens:
+ return
+
+ analyzer = self.token_analysis.analysis.get('@postcode')
+ terms = []
+
+ for postcode_name in tokens:
+ if '@' in postcode_name:
+ term, variant = postcode_name.split('@', 2)
+ term = self._search_normalized(term)
+ variants = {term}
+ if analyzer is not None:
+ variants.update(analyzer.get_variants_ascii(variant))
+ variants = list(variants)
+ else:
+ variants = [self._search_normalized(postcode_name)]
+ terms.append((postcode_name, variants))
+
+ if terms:
+ with self.conn.cursor() as cur:
+ cur.execute_values("""SELECT create_postcode_word(pc, var)
+ FROM (VALUES %s) AS v(pc, var)""",
+ terms)
+
+
def update_special_phrases(self, phrases, should_replace):
postcode_name = analyzer.normalize(item.name)
variant_base = item.get_attr("variant")
- if variant_base is not None:
+ if variant_base:
postcode = f'{postcode_name}@{variant_base}'
else:
postcode = postcode_name
return None
variants = {term}
- if analyzer is not None and variant_base is not None:
+ if analyzer is not None and variant_base:
variants.update(analyzer.get_variants_ascii(variant_base))
with self.conn.cursor() as cur:
"""
return PostcodeTokenAnalysis(normalizer, transliterator)
+
class PostcodeTokenAnalysis:
- """ Detects common housenumber patterns and normalizes them.
+ """ Special normalization and variant generation for postcodes.
+
+ This analyser must not be used with anything but postcodes as
+ it follows some special rules: `normalize` doesn't necessarily
+ need to return a standard form as per normalization rules. It
+ needs to return the canonical form of the postcode that is also
+ used for output. `get_variants_ascii` then needs to ensure that
+ the generated variants once more follow the standard normalization
+ and transliteration, so that postcodes are correctly recognised by
+ the search algorithm.
"""
def __init__(self, norm, trans):
self.norm = norm
def get_variants_ascii(self, norm_name):
""" Compute the spelling variants for the given normalized postcode.
- The official form creates one variant. If a 'lookup version' is
- given, then it will create variants with optional spaces.
+ Takes the canonical form of the postcode, normalizes it using the
+ standard rules and then creates variants of the result where
+ all spaces are optional.
"""
# Postcodes follow their own transliteration rules.
# Make sure at this point, that the terms are normalized in a way
# that they are searchable with the standard transliteration rules.
return [self.trans.transliterate(term) for term in
- self.mutator.generate([self.norm.transliterate(norm_name)])]
+ self.mutator.generate([self.norm.transliterate(norm_name)]) if term]
def check_database_integrity(context):
""" Check some generic constraints on the tables.
"""
- # place_addressline should not have duplicate (place_id, address_place_id)
- cur = context.db.cursor()
- cur.execute("""SELECT count(*) FROM
- (SELECT place_id, address_place_id, count(*) as c
- FROM place_addressline GROUP BY place_id, address_place_id) x
- WHERE c > 1""")
- assert cur.fetchone()[0] == 0, "Duplicates found in place_addressline"
+ with context.db.cursor() as cur:
+ # place_addressline should not have duplicate (place_id, address_place_id)
+ cur.execute("""SELECT count(*) FROM
+ (SELECT place_id, address_place_id, count(*) as c
+ FROM place_addressline GROUP BY place_id, address_place_id) x
+ WHERE c > 1""")
+ assert cur.fetchone()[0] == 0, "Duplicates found in place_addressline"
+
+ # word table must not have empty word_tokens
+ cur.execute("SELECT count(*) FROM word WHERE word_token = ''")
+ assert cur.fetchone()[0] == 0, "Empty word tokens found in word table"
+
################################ GIVEN ##################################
def _mk_analyser(norm=("[[:Punctuation:][:Space:]]+ > ' '",), trans=(':: upper()',),
variants=('~gasse -> gasse', 'street => st', ),
- sanitizers=[], with_housenumber=False):
+ sanitizers=[], with_housenumber=False,
+ with_postcode=False):
cfgstr = {'normalization': list(norm),
'sanitizers': sanitizers,
'transliteration': list(trans),
if with_housenumber:
cfgstr['token-analysis'].append({'id': '@housenumber',
'analyzer': 'housenumbers'})
+ if with_postcode:
+ cfgstr['token-analysis'].append({'id': '@postcode',
+ 'analyzer': 'postcodes'})
(test_config.project_dir / 'icu_tokenizer.yaml').write_text(yaml.dump(cfgstr))
tok.loader = nominatim.tokenizer.icu_rule_loader.ICURuleLoader(test_config)
anl.normalize_postcode('38 Б') == '38 Б'
-def test_update_postcodes_from_db_empty(analyzer, table_factory, word_table):
- table_factory('location_postcode', 'postcode TEXT',
- content=(('1234',), ('12 34',), ('AB23',), ('1234',)))
+class TestPostcodes:
- with analyzer() as anl:
- anl.update_postcodes_from_db()
+ @pytest.fixture(autouse=True)
+ def setup(self, analyzer, sql_functions):
+ sanitizers = [{'step': 'clean-postcodes'}]
+ with analyzer(sanitizers=sanitizers, with_postcode=True) as anl:
+ self.analyzer = anl
+ yield anl
- assert word_table.count() == 3
- assert word_table.get_postcodes() == {'1234', '12 34', 'AB23'}
+ def process_postcode(self, cc, postcode):
+ return self.analyzer.process_place(PlaceInfo({'country_code': cc,
+ 'address': {'postcode': postcode}}))
-def test_update_postcodes_from_db_add_and_remove(analyzer, table_factory, word_table):
- table_factory('location_postcode', 'postcode TEXT',
- content=(('1234',), ('45BC', ), ('XX45', )))
- word_table.add_postcode(' 1234', '1234')
- word_table.add_postcode(' 5678', '5678')
- with analyzer() as anl:
- anl.update_postcodes_from_db()
+ def test_update_postcodes_from_db_empty(self, table_factory, word_table):
+ table_factory('location_postcode', 'country_code TEXT, postcode TEXT',
+ content=(('de', '12345'), ('se', '132 34'),
+ ('bm', 'AB23'), ('fr', '12345')))
+
+ self.analyzer.update_postcodes_from_db()
+
+ assert word_table.count() == 5
+ assert word_table.get_postcodes() == {'12345', '132 34@132 34', 'AB 23@AB 23'}
+
+
+ def test_update_postcodes_from_db_ambigious(self, table_factory, word_table):
+ table_factory('location_postcode', 'country_code TEXT, postcode TEXT',
+ content=(('in', '123456'), ('sg', '123456')))
+
+ self.analyzer.update_postcodes_from_db()
+
+ assert word_table.count() == 3
+ assert word_table.get_postcodes() == {'123456', '123456@123 456'}
+
+
+ def test_update_postcodes_from_db_add_and_remove(self, table_factory, word_table):
+ table_factory('location_postcode', 'country_code TEXT, postcode TEXT',
+ content=(('ch', '1234'), ('bm', 'BC 45'), ('bm', 'XX45')))
+ word_table.add_postcode(' 1234', '1234')
+ word_table.add_postcode(' 5678', '5678')
+
+ self.analyzer.update_postcodes_from_db()
+
+ assert word_table.count() == 5
+ assert word_table.get_postcodes() == {'1234', 'BC 45@BC 45', 'XX 45@XX 45'}
+
+
+ def test_process_place_postcode_simple(self, word_table):
+ info = self.process_postcode('de', '12345')
+
+ assert info['postcode'] == '12345'
+
+ assert word_table.get_postcodes() == {'12345', }
+
+
+ def test_process_place_postcode_with_space(self, word_table):
+ info = self.process_postcode('in', '123 567')
+
+ assert info['postcode'] == '123567'
+
+ assert word_table.get_postcodes() == {'123567@123 567', }
- assert word_table.count() == 3
- assert word_table.get_postcodes() == {'1234', '45BC', 'XX45'}
def test_update_special_phrase_empty_table(analyzer, word_table):
--- /dev/null
+# SPDX-License-Identifier: GPL-2.0-only
+#
+# This file is part of Nominatim. (https://nominatim.org)
+#
+# Copyright (C) 2022 by the Nominatim developer community.
+# For a full list of authors see the git log.
+"""
+Tests for special postcode analysis and variant generation.
+"""
+import pytest
+
+from icu import Transliterator
+
+import nominatim.tokenizer.token_analysis.postcodes as module
+from nominatim.errors import UsageError
+
+DEFAULT_NORMALIZATION = """ :: NFD ();
+ '🜳' > ' ';
+ [[:Nonspacing Mark:] [:Cf:]] >;
+ :: lower ();
+ [[:Punctuation:][:Space:]]+ > ' ';
+ :: NFC ();
+ """
+
+DEFAULT_TRANSLITERATION = """ :: Latin ();
+ '🜵' > ' ';
+ """
+
+@pytest.fixture
+def analyser():
+ rules = { 'analyzer': 'postcodes'}
+ config = module.configure(rules, DEFAULT_NORMALIZATION)
+
+ trans = Transliterator.createFromRules("test_trans", DEFAULT_TRANSLITERATION)
+ norm = Transliterator.createFromRules("test_norm", DEFAULT_NORMALIZATION)
+
+ return module.create(norm, trans, config)
+
+
+def get_normalized_variants(proc, name):
+ norm = Transliterator.createFromRules("test_norm", DEFAULT_NORMALIZATION)
+ return proc.get_variants_ascii(norm.transliterate(name).strip())
+
+
+@pytest.mark.parametrize('name,norm', [('12', '12'),
+ ('A 34 ', 'A 34'),
+ ('34-av', '34-AV')])
+def test_normalize(analyser, name, norm):
+ assert analyser.normalize(name) == norm
+
+
+@pytest.mark.parametrize('postcode,variants', [('12345', {'12345'}),
+ ('AB-998', {'ab 998', 'ab998'}),
+ ('23 FGH D3', {'23 fgh d3', '23fgh d3',
+ '23 fghd3', '23fghd3'})])
+def test_get_variants_ascii(analyser, postcode, variants):
+ out = analyser.get_variants_ascii(postcode)
+
+ assert len(out) == len(set(out))
+ assert set(out) == variants