1 # SPDX-License-Identifier: GPL-3.0-or-later
 
   3 # This file is part of Nominatim. (https://nominatim.org)
 
   5 # Copyright (C) 2025 by the Nominatim developer community.
 
   6 # For a full list of authors see the git log.
 
   8 Tests for functions to maintain the artificial postcode table.
 
  14 from nominatim_db.tools import postcodes
 
  15 from nominatim_db.data import country_info
 
  16 import dummy_tokenizer
 
  19 class MockPostcodeTable:
 
  20     """ A location_postcode table for testing.
 
  22     def __init__(self, conn):
 
  24         with conn.cursor() as cur:
 
  25             cur.execute("""CREATE TABLE location_postcode (
 
  27                                parent_place_id BIGINT,
 
  29                                rank_address SMALLINT,
 
  30                                indexed_status SMALLINT,
 
  31                                indexed_date TIMESTAMP,
 
  32                                country_code varchar(2),
 
  34                                geometry GEOMETRY(Geometry, 4326))""")
 
  35             cur.execute("""CREATE OR REPLACE FUNCTION token_normalized_postcode(postcode TEXT)
 
  36                            RETURNS TEXT AS $$ BEGIN RETURN postcode; END; $$ LANGUAGE plpgsql;
 
  38                            CREATE OR REPLACE FUNCTION get_country_code(place geometry)
 
  39                            RETURNS TEXT AS $$ BEGIN
 
  41                            END; $$ LANGUAGE plpgsql;
 
  45     def add(self, country, postcode, x, y):
 
  46         with self.conn.cursor() as cur:
 
  47             cur.execute("""INSERT INTO location_postcode (place_id, indexed_status,
 
  48                                                           country_code, postcode,
 
  50                            VALUES (nextval('seq_place'), 1, %s, %s,
 
  51                                    ST_SetSRID(ST_MakePoint(%s, %s), 4326))""",
 
  52                         (country, postcode, x, y))
 
  57         with self.conn.cursor() as cur:
 
  58             cur.execute("""SELECT country_code, postcode,
 
  59                                   ST_X(geometry), ST_Y(geometry)
 
  60                            FROM location_postcode""")
 
  61             return set((tuple(row) for row in cur))
 
  66     return dummy_tokenizer.DummyTokenizer(None)
 
  70 def postcode_table(def_config, temp_db_conn, placex_table):
 
  71     country_info.setup_country_config(def_config)
 
  72     return MockPostcodeTable(temp_db_conn)
 
  76 def insert_implicit_postcode(placex_table, place_row):
 
  78         Inserts data into the placex and place table
 
  79         which can then be used to compute one postcode.
 
  81     def _insert_implicit_postcode(osm_id, country, geometry, address):
 
  82         placex_table.add(osm_id=osm_id, country=country, geom=geometry)
 
  83         place_row(osm_id=osm_id, geom='SRID=4326;'+geometry, address=address)
 
  85     return _insert_implicit_postcode
 
  88 def test_postcodes_empty(dsn, postcode_table, place_table, tokenizer):
 
  89     postcodes.update_postcodes(dsn, None, tokenizer)
 
  91     assert not postcode_table.row_set
 
  94 def test_postcodes_add_new(dsn, postcode_table, insert_implicit_postcode, tokenizer):
 
  95     insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='9486'))
 
  96     postcode_table.add('yy', '9486', 99, 34)
 
  98     postcodes.update_postcodes(dsn, None, tokenizer)
 
 100     assert postcode_table.row_set == {('xx', '9486', 10, 12), }
 
 103 @pytest.mark.parametrize('coords', [(99, 34), (10, 34), (99, 12),
 
 104                                     (9, 34), (9, 11), (23, 11)])
 
 105 def test_postcodes_replace_coordinates(dsn, postcode_table, tmp_path,
 
 106                                        insert_implicit_postcode, tokenizer, coords):
 
 107     insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='AB 4511'))
 
 108     postcode_table.add('xx', 'AB 4511', *coords)
 
 110     postcodes.update_postcodes(dsn, tmp_path, tokenizer)
 
 112     assert postcode_table.row_set == {('xx', 'AB 4511', 10, 12)}
 
 115 def test_postcodes_replace_coordinates_close(dsn, postcode_table,
 
 116                                              insert_implicit_postcode, tokenizer):
 
 117     insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='AB 4511'))
 
 118     postcode_table.add('xx', 'AB 4511', 10, 11.99999999)
 
 120     postcodes.update_postcodes(dsn, None, tokenizer)
 
 122     assert postcode_table.row_set == {('xx', 'AB 4511', 10, 11.99999999)}
 
 125 def test_postcodes_remove(dsn, postcode_table,
 
 126                           insert_implicit_postcode, tokenizer):
 
 127     insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='AB 4511'))
 
 128     postcode_table.add('xx', 'badname', 10, 12)
 
 130     postcodes.update_postcodes(dsn, None, tokenizer)
 
 132     assert postcode_table.row_set == {('xx', 'AB 4511', 10, 12)}
 
 135 def test_postcodes_ignore_empty_country(dsn, postcode_table,
 
 136                                         insert_implicit_postcode, tokenizer):
 
 137     insert_implicit_postcode(1, None, 'POINT(10 12)', dict(postcode='AB 4511'))
 
 138     postcodes.update_postcodes(dsn, None, tokenizer)
 
 139     assert not postcode_table.row_set
 
 142 def test_postcodes_remove_all(dsn, postcode_table, place_table, tokenizer):
 
 143     postcode_table.add('ch', '5613', 10, 12)
 
 144     postcodes.update_postcodes(dsn, None, tokenizer)
 
 146     assert not postcode_table.row_set
 
 149 def test_postcodes_multi_country(dsn, postcode_table,
 
 150                                  insert_implicit_postcode, tokenizer):
 
 151     insert_implicit_postcode(1, 'de', 'POINT(10 12)', dict(postcode='54451'))
 
 152     insert_implicit_postcode(2, 'cc', 'POINT(100 56)', dict(postcode='DD23 T'))
 
 153     insert_implicit_postcode(3, 'de', 'POINT(10.3 11.0)', dict(postcode='54452'))
 
 154     insert_implicit_postcode(4, 'cc', 'POINT(10.3 11.0)', dict(postcode='54452'))
 
 156     postcodes.update_postcodes(dsn, None, tokenizer)
 
 158     assert postcode_table.row_set == {('de', '54451', 10, 12),
 
 159                                       ('de', '54452', 10.3, 11.0),
 
 160                                       ('cc', '54452', 10.3, 11.0),
 
 161                                       ('cc', 'DD23 T', 100, 56)}
 
 164 @pytest.mark.parametrize("gzipped", [True, False])
 
 165 def test_postcodes_extern(dsn, postcode_table, tmp_path,
 
 166                           insert_implicit_postcode, tokenizer, gzipped):
 
 167     insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='AB 4511'))
 
 169     extfile = tmp_path / 'xx_postcodes.csv'
 
 170     extfile.write_text("postcode,lat,lon\nAB 4511,-4,-1\nCD 4511,-5, -10")
 
 173         subprocess.run(['gzip', str(extfile)])
 
 174         assert not extfile.is_file()
 
 176     postcodes.update_postcodes(dsn, tmp_path, tokenizer)
 
 178     assert postcode_table.row_set == {('xx', 'AB 4511', 10, 12),
 
 179                                       ('xx', 'CD 4511', -10, -5)}
 
 182 def test_postcodes_extern_bad_column(dsn, postcode_table, tmp_path,
 
 183                                      insert_implicit_postcode, tokenizer):
 
 184     insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='AB 4511'))
 
 186     extfile = tmp_path / 'xx_postcodes.csv'
 
 187     extfile.write_text("postode,lat,lon\nAB 4511,-4,-1\nCD 4511,-5, -10")
 
 189     postcodes.update_postcodes(dsn, tmp_path, tokenizer)
 
 191     assert postcode_table.row_set == {('xx', 'AB 4511', 10, 12)}
 
 194 def test_postcodes_extern_bad_number(dsn, insert_implicit_postcode,
 
 195                                      postcode_table, tmp_path, tokenizer):
 
 196     insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='AB 4511'))
 
 198     extfile = tmp_path / 'xx_postcodes.csv'
 
 199     extfile.write_text("postcode,lat,lon\nXX 4511,-4,NaN\nCD 4511,-5, -10\n34,200,0")
 
 201     postcodes.update_postcodes(dsn, tmp_path, tokenizer)
 
 203     assert postcode_table.row_set == {('xx', 'AB 4511', 10, 12),
 
 204                                       ('xx', 'CD 4511', -10, -5)}
 
 207 def test_can_compute(dsn, table_factory):
 
 208     assert not postcodes.can_compute(dsn)
 
 209     table_factory('place')
 
 210     assert postcodes.can_compute(dsn)
 
 213 def test_no_placex_entry(dsn, temp_db_cursor, place_row, postcode_table, tokenizer):
 
 214     # Rewrite the get_country_code function to verify its execution.
 
 215     temp_db_cursor.execute("""
 
 216         CREATE OR REPLACE FUNCTION get_country_code(place geometry)
 
 217         RETURNS TEXT AS $$ BEGIN
 
 219         END; $$ LANGUAGE plpgsql;
 
 221     place_row(geom='SRID=4326;POINT(10 12)', address=dict(postcode='AB 4511'))
 
 222     postcodes.update_postcodes(dsn, None, tokenizer)
 
 224     assert postcode_table.row_set == {('yy', 'AB 4511', 10, 12)}
 
 227 def test_discard_badly_formatted_postcodes(dsn, temp_db_cursor, place_row,
 
 228                                            postcode_table, tokenizer):
 
 229     # Rewrite the get_country_code function to verify its execution.
 
 230     temp_db_cursor.execute("""
 
 231         CREATE OR REPLACE FUNCTION get_country_code(place geometry)
 
 232         RETURNS TEXT AS $$ BEGIN
 
 234         END; $$ LANGUAGE plpgsql;
 
 236     place_row(geom='SRID=4326;POINT(10 12)', address=dict(postcode='AB 4511'))
 
 237     postcodes.update_postcodes(dsn, None, tokenizer)
 
 239     assert not postcode_table.row_set