# SPDX-License-Identifier: GPL-3.0-or-later
#
# This file is part of Nominatim. (https://nominatim.org)
#
# Copyright (C) 2025 by the Nominatim developer community.
# For a full list of authors see the git log.
"""
Tests for functions to maintain the artificial postcode table.
"""
import subprocess

import pytest

from nominatim_db.tools import postcodes
from nominatim_db.data import country_info
from nominatim_db.db.sql_preprocessor import SQLPreprocessor

import dummy_tokenizer


class MockPostcodeTable:
    """ A location_postcodes table for testing.
    """
    def __init__(self, conn, config):
        self.conn = conn
        SQLPreprocessor(conn, config).run_sql_file(conn, 'functions/postcode_triggers.sql')
        with conn.cursor() as cur:
            cur.execute("""CREATE TABLE location_postcodes (
                               place_id BIGINT,
                               osm_id BIGINT,
                               parent_place_id BIGINT,
                               rank_search SMALLINT,
                               indexed_status SMALLINT,
                               indexed_date TIMESTAMP,
                               country_code varchar(2),
                               postcode TEXT,
                               geometry GEOMETRY(Geometry, 4326),
                               centroid GEOMETRY(Point, 4326))""")
            cur.execute("""CREATE OR REPLACE FUNCTION token_normalized_postcode(postcode TEXT)
                           RETURNS TEXT AS $$ BEGIN RETURN postcode; END; $$ LANGUAGE plpgsql;

                           CREATE OR REPLACE FUNCTION get_country_code(place geometry)
                           RETURNS TEXT AS $$ BEGIN
                           RETURN null;
                           END; $$ LANGUAGE plpgsql;
                        """)
            cur.execute("""CREATE OR REPLACE FUNCTION expand_by_meters(geom GEOMETRY, meters FLOAT)
                           RETURNS GEOMETRY AS $$
                           SELECT ST_Envelope(ST_Buffer(geom::geography, meters, 1)::geometry)
                           $$ LANGUAGE sql;""")

        conn.commit()

    def add(self, country, postcode, x, y):
        with self.conn.cursor() as cur:
            cur.execute(
                """INSERT INTO location_postcodes
                       (place_id, indexed_status, country_code, postcode, centroid, geometry)
                     VALUES (nextval('seq_place'), 1, %(cc)s, %(pc)s,
                             ST_SetSRID(ST_MakePoint(%(x)s, %(y)s), 4326),
                             ST_Expand(ST_SetSRID(ST_MakePoint(%(x)s, %(y)s), 4326), 0.005))""",
                {'cc': country, 'pc': postcode, 'x': x, 'y': y})

        self.conn.commit()

    @property
    def row_set(self):
        with self.conn.cursor() as cur:
            cur.execute("""SELECT osm_id, country_code, postcode,
                                  ST_X(centroid), ST_Y(centroid)
                           FROM location_postcodes""")
            return set((tuple(row) for row in cur))


@pytest.fixture
def postcode_table(def_config, temp_db_conn, placex_table, table_factory):
    country_info.setup_country_config(def_config)
    table_factory('country_name', 'partition INT', ((0, ), (1, ), (2, )))
    return MockPostcodeTable(temp_db_conn, def_config)


@pytest.fixture
def insert_implicit_postcode(placex_table, place_postcode_row):
    """ Insert data into the placex and place table
        which can then be used to compute one postcode.
    """
    def _insert_implicit_postcode(osm_id, country, geometry, postcode, in_placex=False):
        if in_placex:
            placex_table.add(osm_id=osm_id, country=country, geom=geometry,
                             centroid=f'SRID=4326;{geometry}',
                             address={'postcode': postcode})
        else:
            place_postcode_row(osm_id=osm_id, centroid=geometry,
                               country=country, postcode=postcode)

    return _insert_implicit_postcode


@pytest.fixture
def insert_postcode_area(place_postcode_row):
    """ Insert an area around a centroid to the postcode table.
    """
    def _do(osm_id, country, postcode, x, y):
        x1, x2, y1, y2 = x - 0.001, x + 0.001, y - 0.001, y + 0.001
        place_postcode_row(osm_type='R', osm_id=osm_id, postcode=postcode, country=country,
                           centroid=f"POINT({x} {y})",
                           geom=f"POLYGON(({x1} {y1}, {x1} {y2}, {x2} {y2}, {x2} {y1}, {x1} {y1}))")

    return _do


@pytest.fixture
def postcode_update(dsn, temp_db_conn):
    tokenizer = dummy_tokenizer.DummyTokenizer(None)

    def _do(data_path=None):
        with temp_db_conn.cursor() as cur:
            cur.execute("""CREATE TRIGGER location_postcodes_before_update
                            BEFORE UPDATE ON location_postcodes
                            FOR EACH ROW EXECUTE PROCEDURE postcodes_update()""")
            cur.execute("""CREATE TRIGGER location_postcodes_before_delete
                            BEFORE DELETE ON location_postcodes
                            FOR EACH ROW EXECUTE PROCEDURE postcodes_delete()""")
            cur.execute("""CREATE TRIGGER location_postcodes_before_insert
                            BEFORE INSERT ON location_postcodes
                            FOR EACH ROW EXECUTE PROCEDURE postcodes_insert()""")
        temp_db_conn.commit()

        postcodes.update_postcodes(dsn, data_path, tokenizer)

    return _do


def test_postcodes_empty(postcode_update, postcode_table, place_postcode_table):
    postcode_update()

    assert not postcode_table.row_set


@pytest.mark.parametrize('in_placex', [True, False])
def test_postcodes_add_new_point(postcode_update, postcode_table,
                                 insert_implicit_postcode, in_placex):
    insert_implicit_postcode(1, 'xx', 'POINT(10 12)', '9486', in_placex)
    postcode_table.add('yy', '9486', 99, 34)

    postcode_update()

    assert postcode_table.row_set == {(None, 'xx', '9486', 10, 12), }


def test_postcodes_add_new_area(postcode_update, insert_postcode_area, postcode_table):
    insert_postcode_area(345, 'de', '10445', 23.5, 46.2)

    postcode_update()

    assert postcode_table.row_set == {(345, 'de', '10445', 23.5, 46.2)}


@pytest.mark.parametrize('in_placex', [True, False])
def test_postcodes_add_area_and_point(postcode_update, insert_postcode_area,
                                      insert_implicit_postcode, postcode_table, in_placex):
    insert_implicit_postcode(1, 'xx', 'POINT(10 12)', '10445', in_placex)
    insert_postcode_area(345, 'xx', '10445', 23.5, 46.2)

    postcode_update()

    assert postcode_table.row_set == {(345, 'xx', '10445', 23.5, 46.2)}


@pytest.mark.parametrize('in_placex', [True, False])
def test_postcodes_add_point_within_area(postcode_update, insert_postcode_area,
                                         insert_implicit_postcode, postcode_table, in_placex):
    insert_implicit_postcode(1, 'xx', 'POINT(23.5 46.2)', '10446', in_placex)
    insert_postcode_area(345, 'xx', '10445', 23.5, 46.2)

    postcode_update()

    assert postcode_table.row_set == {(345, 'xx', '10445', 23.5, 46.2)}


@pytest.mark.parametrize('coords', [(99, 34), (10, 34), (99, 12),
                                    (9, 34), (9, 11), (23, 11)])
def test_postcodes_replace_coordinates(postcode_update, postcode_table, tmp_path,
                                       insert_implicit_postcode, coords):
    insert_implicit_postcode(1, 'xx', 'POINT(10 12)', 'AB 4511')
    postcode_table.add('xx', 'AB 4511', *coords)

    postcode_update(tmp_path)

    assert postcode_table.row_set == {(None, 'xx', 'AB 4511', 10, 12)}


def test_postcodes_replace_coordinates_close(postcode_update, postcode_table,
                                             insert_implicit_postcode):
    insert_implicit_postcode(1, 'xx', 'POINT(10 12)', 'AB 4511')
    postcode_table.add('xx', 'AB 4511', 10, 11.99999999)

    postcode_update()

    assert postcode_table.row_set == {(None, 'xx', 'AB 4511', 10, 11.99999999)}


def test_postcodes_remove_point(postcode_update, postcode_table,
                                insert_implicit_postcode):
    insert_implicit_postcode(1, 'xx', 'POINT(10 12)', 'AB 4511')
    postcode_table.add('xx', 'badname', 10, 12)

    postcode_update()

    assert postcode_table.row_set == {(None, 'xx', 'AB 4511', 10, 12)}


def test_postcodes_ignore_empty_country(postcode_update, postcode_table,
                                        insert_implicit_postcode):
    insert_implicit_postcode(1, None, 'POINT(10 12)', 'AB 4511')
    postcode_update()
    assert not postcode_table.row_set


def test_postcodes_remove_all(postcode_update, postcode_table, place_postcode_table):
    postcode_table.add('ch', '5613', 10, 12)
    postcode_update()

    assert not postcode_table.row_set


def test_postcodes_multi_country(postcode_update, postcode_table,
                                 insert_implicit_postcode):
    insert_implicit_postcode(1, 'de', 'POINT(10 12)', '54451')
    insert_implicit_postcode(2, 'cc', 'POINT(100 56)', 'DD23 T')
    insert_implicit_postcode(3, 'de', 'POINT(10.3 11.0)', '54452')
    insert_implicit_postcode(4, 'cc', 'POINT(10.3 11.0)', '54452')

    postcode_update()

    assert postcode_table.row_set == {(None, 'de', '54451', 10, 12),
                                      (None, 'de', '54452', 10.3, 11.0),
                                      (None, 'cc', '54452', 10.3, 11.0),
                                      (None, 'cc', 'DD23 T', 100, 56)}


@pytest.mark.parametrize("gzipped", [True, False])
def test_postcodes_extern(postcode_update, postcode_table, tmp_path,
                          insert_implicit_postcode, gzipped):
    insert_implicit_postcode(1, 'xx', 'POINT(10 12)', 'AB 4511')

    extfile = tmp_path / 'xx_postcodes.csv'
    extfile.write_text("postcode,lat,lon\nAB 4511,-4,-1\nCD 4511,-5, -10")

    if gzipped:
        subprocess.run(['gzip', str(extfile)])
        assert not extfile.is_file()

    postcode_update(tmp_path)

    assert postcode_table.row_set == {(None, 'xx', 'AB 4511', 10, 12),
                                      (None, 'xx', 'CD 4511', -10, -5)}


def test_postcodes_extern_bad_column(postcode_update, postcode_table, tmp_path,
                                     insert_implicit_postcode):
    insert_implicit_postcode(1, 'xx', 'POINT(10 12)', 'AB 4511')

    extfile = tmp_path / 'xx_postcodes.csv'
    extfile.write_text("postode,lat,lon\nAB 4511,-4,-1\nCD 4511,-5, -10")

    postcode_update(tmp_path)

    assert postcode_table.row_set == {(None, 'xx', 'AB 4511', 10, 12)}


def test_postcodes_extern_bad_number(postcode_update, insert_implicit_postcode,
                                     postcode_table, tmp_path):
    insert_implicit_postcode(1, 'xx', 'POINT(10 12)', 'AB 4511')

    extfile = tmp_path / 'xx_postcodes.csv'
    extfile.write_text("postcode,lat,lon\nXX 4511,-4,NaN\nCD 4511,-5, -10\n34,200,0")

    postcode_update(tmp_path)

    assert postcode_table.row_set == {(None, 'xx', 'AB 4511', 10, 12),
                                      (None, 'xx', 'CD 4511', -10, -5)}


def test_can_compute(dsn, table_factory):
    assert not postcodes.can_compute(dsn)
    table_factory('place_postcode')
    assert postcodes.can_compute(dsn)


def test_no_placex_entry(postcode_update, temp_db_cursor, place_postcode_row, postcode_table):
    # Rewrite the get_country_code function to verify its execution.
    temp_db_cursor.execute("""
        CREATE OR REPLACE FUNCTION get_country_code(place geometry)
        RETURNS TEXT AS $$ BEGIN
        RETURN 'yy';
        END; $$ LANGUAGE plpgsql;
    """)
    place_postcode_row(centroid='POINT(10 12)', postcode='AB 4511')
    postcode_update()

    assert postcode_table.row_set == {(None, 'yy', 'AB 4511', 10, 12)}


def test_discard_badly_formatted_postcodes(postcode_update, place_postcode_row, postcode_table):
    place_postcode_row(centroid='POINT(10 12)', country='fr', postcode='AB 4511')
    postcode_update()

    assert not postcode_table.row_set
