"""
Tests for functions to maintain the artificial postcode table.
"""
import subprocess

import pytest

from nominatim.tools import postcodes
import dummy_tokenizer

class MockPostcodeTable:
    """ A location_postcode table for testing.
    """
    def __init__(self, conn):
        self.conn = conn
        with conn.cursor() as cur:
            cur.execute("""CREATE TABLE location_postcode (
                               place_id BIGINT,
                               parent_place_id BIGINT,
                               rank_search SMALLINT,
                               rank_address SMALLINT,
                               indexed_status SMALLINT,
                               indexed_date TIMESTAMP,
                               country_code varchar(2),
                               postcode TEXT,
                               geometry GEOMETRY(Geometry, 4326))""")
            cur.execute("""CREATE OR REPLACE FUNCTION token_normalized_postcode(postcode TEXT)
                           RETURNS TEXT AS $$ BEGIN RETURN postcode; END; $$ LANGUAGE plpgsql;

                           CREATE OR REPLACE FUNCTION get_country_code(place geometry)
                           RETURNS TEXT AS $$ BEGIN 
                           RETURN null;
                           END; $$ LANGUAGE plpgsql;
                        """)
        conn.commit()

    def add(self, country, postcode, x, y):
        with self.conn.cursor() as cur:
            cur.execute("""INSERT INTO location_postcode (place_id, indexed_status,
                                                          country_code, postcode,
                                                          geometry)
                           VALUES (nextval('seq_place'), 1, %s, %s,
                                   'SRID=4326;POINT(%s %s)')""",
                        (country, postcode, x, y))
        self.conn.commit()


    @property
    def row_set(self):
        with self.conn.cursor() as cur:
            cur.execute("""SELECT country_code, postcode,
                                  ST_X(geometry), ST_Y(geometry)
                           FROM location_postcode""")
            return set((tuple(row) for row in cur))


@pytest.fixture
def tokenizer():
    return dummy_tokenizer.DummyTokenizer(None, None)

@pytest.fixture
def postcode_table(temp_db_conn, placex_table, word_table):
    return MockPostcodeTable(temp_db_conn)


def test_postcodes_empty(dsn, postcode_table, place_table,
                         tmp_path, tokenizer):
    postcodes.update_postcodes(dsn, tmp_path, tokenizer)

    assert not postcode_table.row_set


def test_postcodes_add_new(dsn, postcode_table, tmp_path,
                           insert_implicit_postcode, tokenizer):
    insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='9486'))
    postcode_table.add('yy', '9486', 99, 34)

    postcodes.update_postcodes(dsn, tmp_path, tokenizer)

    assert postcode_table.row_set == {('xx', '9486', 10, 12), }


def test_postcodes_replace_coordinates(dsn, postcode_table, tmp_path,
                                       insert_implicit_postcode, tokenizer):
    insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='AB 4511'))
    postcode_table.add('xx', 'AB 4511', 99, 34)

    postcodes.update_postcodes(dsn, tmp_path, tokenizer)

    assert postcode_table.row_set == {('xx', 'AB 4511', 10, 12)}


def test_postcodes_replace_coordinates_close(dsn, postcode_table, tmp_path,
                                             insert_implicit_postcode, tokenizer):
    insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='AB 4511'))
    postcode_table.add('xx', 'AB 4511', 10, 11.99999)

    postcodes.update_postcodes(dsn, tmp_path, tokenizer)

    assert postcode_table.row_set == {('xx', 'AB 4511', 10, 11.99999)}


def test_postcodes_remove(dsn, postcode_table, tmp_path,
                          insert_implicit_postcode, tokenizer):
    insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='AB 4511'))
    postcode_table.add('xx', 'badname', 10, 12)

    postcodes.update_postcodes(dsn, tmp_path, tokenizer)

    assert postcode_table.row_set == {('xx', 'AB 4511', 10, 12)}


def test_postcodes_ignore_empty_country(dsn, postcode_table, tmp_path,
                                        insert_implicit_postcode, tokenizer):
    insert_implicit_postcode(1, None, 'POINT(10 12)', dict(postcode='AB 4511'))
    postcodes.update_postcodes(dsn, tmp_path, tokenizer)
    assert not postcode_table.row_set


def test_postcodes_remove_all(dsn, postcode_table, place_table,
                              tmp_path, tokenizer):
    postcode_table.add('ch', '5613', 10, 12)
    postcodes.update_postcodes(dsn, tmp_path, tokenizer)

    assert not postcode_table.row_set


def test_postcodes_multi_country(dsn, postcode_table, tmp_path,
                                 insert_implicit_postcode, tokenizer):
    insert_implicit_postcode(1, 'de', 'POINT(10 12)', dict(postcode='54451'))
    insert_implicit_postcode(2, 'cc', 'POINT(100 56)', dict(postcode='DD23 T'))
    insert_implicit_postcode(3, 'de', 'POINT(10.3 11.0)', dict(postcode='54452'))
    insert_implicit_postcode(4, 'cc', 'POINT(10.3 11.0)', dict(postcode='54452'))

    postcodes.update_postcodes(dsn, tmp_path, tokenizer)

    assert postcode_table.row_set == {('de', '54451', 10, 12),
                                      ('de', '54452', 10.3, 11.0),
                                      ('cc', '54452', 10.3, 11.0),
                                      ('cc', 'DD23 T', 100, 56)}


@pytest.mark.parametrize("gzipped", [True, False])
def test_postcodes_extern(dsn, postcode_table, tmp_path,
                          insert_implicit_postcode, tokenizer, gzipped):
    insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='AB 4511'))

    extfile = tmp_path / 'xx_postcodes.csv'
    extfile.write_text("postcode,lat,lon\nAB 4511,-4,-1\nCD 4511,-5, -10")

    if gzipped:
        subprocess.run(['gzip', str(extfile)])
        assert not extfile.is_file()

    postcodes.update_postcodes(dsn, tmp_path, tokenizer)

    assert postcode_table.row_set == {('xx', 'AB 4511', 10, 12),
                                      ('xx', 'CD 4511', -10, -5)}


def test_postcodes_extern_bad_column(dsn, postcode_table, tmp_path, 
                                     insert_implicit_postcode, tokenizer):
    insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='AB 4511'))

    extfile = tmp_path / 'xx_postcodes.csv'
    extfile.write_text("postode,lat,lon\nAB 4511,-4,-1\nCD 4511,-5, -10")

    postcodes.update_postcodes(dsn, tmp_path, tokenizer)

    assert postcode_table.row_set == {('xx', 'AB 4511', 10, 12)}


def test_postcodes_extern_bad_number(dsn, insert_implicit_postcode,
                                     postcode_table, tmp_path, tokenizer):
    insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='AB 4511'))

    extfile = tmp_path / 'xx_postcodes.csv'
    extfile.write_text("postcode,lat,lon\nXX 4511,-4,NaN\nCD 4511,-5, -10\n34,200,0")

    postcodes.update_postcodes(dsn, tmp_path, tokenizer)

    assert postcode_table.row_set == {('xx', 'AB 4511', 10, 12),
                                      ('xx', 'CD 4511', -10, -5)}

def test_can_compute(dsn, table_factory):
    assert not postcodes.can_compute(dsn)
    table_factory('place')
    assert postcodes.can_compute(dsn)

def test_no_placex_entry(dsn, tmp_path, temp_db_cursor, place_row, postcode_table, tokenizer):
    #Rewrite the get_country_code function to verify its execution.
    temp_db_cursor.execute("""
        CREATE OR REPLACE FUNCTION get_country_code(place geometry)
        RETURNS TEXT AS $$ BEGIN 
        RETURN 'fr';
        END; $$ LANGUAGE plpgsql;
    """)
    place_row(geom='SRID=4326;POINT(10 12)', address=dict(postcode='AB 4511'))
    postcodes.update_postcodes(dsn, tmp_path, tokenizer)

    assert postcode_table.row_set == {('fr', 'AB 4511', 10, 12)}

@pytest.fixture
def insert_implicit_postcode(placex_table, place_row):
    """
        Inserts data into the placex and place table
        which can then be used to compute one postcode.
    """
    def _insert_implicit_postcode(osm_id, country, geometry, address):
        placex_table.add(osm_id=osm_id, country=country, geom=geometry)
        place_row(osm_id=osm_id, geom='SRID=4326;'+geometry, address=address)

    return _insert_implicit_postcode
