# SPDX-License-Identifier: GPL-3.0-or-later
#
# This file is part of Nominatim. (https://nominatim.org)
#
# Copyright (C) 2025 by the Nominatim developer community.
# For a full list of authors see the git log.
"""
Implementation of search for an address (search with housenumber).
"""
from typing import cast, List, AsyncIterator

import sqlalchemy as sa

from . import base
from ...typing import SaBind, SaExpression, SaColumn, SaFromClause, SaScalarSelect
from ...types import SearchDetails, Bbox
from ...sql.sqlalchemy_types import Geometry
from ...connection import SearchConnection
from ... import results as nres
from ..db_search_fields import SearchData


LIMIT_PARAM: SaBind = sa.bindparam('limit')
MIN_RANK_PARAM: SaBind = sa.bindparam('min_rank')
MAX_RANK_PARAM: SaBind = sa.bindparam('max_rank')
VIEWBOX_PARAM: SaBind = sa.bindparam('viewbox', type_=Geometry)
VIEWBOX2_PARAM: SaBind = sa.bindparam('viewbox2', type_=Geometry)
NEAR_PARAM: SaBind = sa.bindparam('near', type_=Geometry)
NEAR_RADIUS_PARAM: SaBind = sa.bindparam('near_radius')
COUNTRIES_PARAM: SaBind = sa.bindparam('countries')


def _int_list_to_subquery(inp: List[int]) -> 'sa.Subquery':
    """ Create a subselect that returns the given list of integers
        as rows in the column 'nr'.
    """
    vtab = sa.func.JsonArrayEach(sa.type_coerce(inp, sa.JSON))\
             .table_valued(sa.column('value', type_=sa.JSON))
    return sa.select(sa.cast(sa.cast(vtab.c.value, sa.Text), sa.Integer).label('nr')).subquery()


def _interpolated_position(table: SaFromClause, nr: SaColumn) -> SaColumn:
    pos = sa.cast(nr - table.c.startnumber, sa.Float) / (table.c.endnumber - table.c.startnumber)
    return sa.case(
            (table.c.endnumber == table.c.startnumber, table.c.linegeo.ST_Centroid()),
            else_=table.c.linegeo.ST_LineInterpolatePoint(pos)).label('centroid')


def _make_interpolation_subquery(table: SaFromClause, inner: SaFromClause,
                                 numerals: List[int], details: SearchDetails) -> SaScalarSelect:
    all_ids = sa.func.ArrayAgg(table.c.place_id)
    sql = sa.select(all_ids).where(table.c.parent_place_id == inner.c.place_id)

    if len(numerals) == 1:
        sql = sql.where(sa.between(numerals[0], table.c.startnumber, table.c.endnumber))\
                 .where((numerals[0] - table.c.startnumber) % table.c.step == 0)
    else:
        sql = sql.where(sa.or_(
                *(sa.and_(sa.between(n, table.c.startnumber, table.c.endnumber),
                          (n - table.c.startnumber) % table.c.step == 0)
                  for n in numerals)))

    if details.excluded:
        sql = sql.where(base.exclude_places(table))

    return sql.scalar_subquery()


async def _get_placex_housenumbers(conn: SearchConnection,
                                   place_ids: List[int],
                                   details: SearchDetails) -> AsyncIterator[nres.SearchResult]:
    t = conn.t.placex
    sql = base.select_placex(t).add_columns(t.c.importance)\
                               .where(t.c.place_id.in_(place_ids))

    if details.geometry_output:
        sql = base.add_geometry_columns(sql, t.c.geometry, details)

    for row in await conn.execute(sql):
        result = nres.create_from_placex_row(row, nres.SearchResult)
        result.bbox = Bbox.from_wkb(row.bbox)
        yield result


async def _get_osmline(conn: SearchConnection, place_ids: List[int],
                       numerals: List[int],
                       details: SearchDetails) -> AsyncIterator[nres.SearchResult]:
    t = conn.t.osmline

    values = _int_list_to_subquery(numerals)
    sql = sa.select(t.c.place_id, t.c.osm_id,
                    t.c.parent_place_id, t.c.address,
                    values.c.nr.label('housenumber'),
                    _interpolated_position(t, values.c.nr),
                    t.c.postcode, t.c.country_code)\
            .where(t.c.place_id.in_(place_ids))\
            .join(values, values.c.nr.between(t.c.startnumber, t.c.endnumber))

    if details.geometry_output:
        sub = sql.subquery()
        sql = base.add_geometry_columns(sa.select(sub), sub.c.centroid, details)

    for row in await conn.execute(sql):
        yield nres.create_from_osmline_row(row, nres.SearchResult)


async def _get_tiger(conn: SearchConnection, place_ids: List[int],
                     numerals: List[int], osm_id: int,
                     details: SearchDetails) -> AsyncIterator[nres.SearchResult]:
    t = conn.t.tiger
    values = _int_list_to_subquery(numerals)
    sql = sa.select(t.c.place_id, t.c.parent_place_id,
                    sa.literal('W').label('osm_type'),
                    sa.literal(osm_id).label('osm_id'),
                    values.c.nr.label('housenumber'),
                    _interpolated_position(t, values.c.nr),
                    t.c.postcode)\
            .where(t.c.place_id.in_(place_ids))\
            .join(values, values.c.nr.between(t.c.startnumber, t.c.endnumber))

    if details.geometry_output:
        sub = sql.subquery()
        sql = base.add_geometry_columns(sa.select(sub), sub.c.centroid, details)

    for row in await conn.execute(sql):
        yield nres.create_from_tiger_row(row, nres.SearchResult)


class AddressSearch(base.AbstractSearch):
    """ Generic search for an address or named place.
    """
    SEARCH_PRIO = 1

    def __init__(self, extra_penalty: float, sdata: SearchData,
                 expected_count: int, has_address_terms: bool) -> None:
        assert sdata.housenumbers
        super().__init__(sdata.penalty + extra_penalty)
        self.countries = sdata.countries
        self.postcodes = sdata.postcodes
        self.housenumbers = sdata.housenumbers
        self.qualifiers = sdata.qualifiers
        self.lookups = sdata.lookups
        self.rankings = sdata.rankings
        self.expected_count = expected_count
        self.has_address_terms = has_address_terms

    def _inner_search_name_cte(self, conn: SearchConnection,
                               details: SearchDetails) -> 'sa.CTE':
        """ Create a subquery that preselects the rows in the search_name
            table.
        """
        t = conn.t.search_name

        penalty: SaExpression = sa.literal(self.penalty)
        for ranking in self.rankings:
            penalty += ranking.sql_penalty(t)

        sql = sa.select(t.c.place_id, t.c.search_rank, t.c.address_rank,
                        t.c.country_code, t.c.centroid,
                        t.c.name_vector, t.c.nameaddress_vector,
                        sa.case((t.c.importance > 0, t.c.importance),
                                else_=0.40001-(sa.cast(t.c.search_rank, sa.Float())/75))
                          .label('importance'),
                        penalty.label('penalty'))

        for lookup in self.lookups:
            sql = sql.where(lookup.sql_condition(t))

        if self.countries:
            sql = sql.where(t.c.country_code.in_(self.countries.values))

        if self.postcodes:
            if self.expected_count > 10000:
                tpc = conn.t.postcode
                sql = sql.where(sa.select(tpc.c.postcode)
                                  .where(tpc.c.postcode.in_(self.postcodes.values))
                                  .where(tpc.c.country_code == t.c.country_code)
                                  .where(t.c.centroid.within_distance(tpc.c.geometry, 0.4))
                                  .exists())

        if details.viewbox is not None:
            if details.bounded_viewbox:
                sql = sql.where(t.c.centroid
                                   .intersects(VIEWBOX_PARAM,
                                               use_index=details.viewbox.area < 0.2))

        if details.near is not None and details.near_radius is not None:
            if details.near_radius < 0.1:
                sql = sql.where(t.c.centroid.within_distance(NEAR_PARAM,
                                                             NEAR_RADIUS_PARAM))
            else:
                sql = sql.where(t.c.centroid
                                 .ST_Distance(NEAR_PARAM) < NEAR_RADIUS_PARAM)

        if self.has_address_terms:
            sql = sql.where(t.c.address_rank.between(16, 30))
        else:
            # If no further address terms are given, then the base street must
            # be in the name. No search for named POIs with the given house number.
            sql = sql.where(t.c.address_rank.between(16, 27))

        inner = sql.limit(10000).order_by(sa.desc(sa.text('importance'))).subquery()

        sql = sa.select(inner.c.place_id, inner.c.search_rank, inner.c.address_rank,
                        inner.c.country_code, inner.c.centroid, inner.c.importance,
                        inner.c.penalty)

        return sql.cte('searches')

    async def lookup(self, conn: SearchConnection,
                     details: SearchDetails) -> nres.SearchResults:
        """ Find results for the search in the database.
        """
        t = conn.t.placex
        tsearch = self._inner_search_name_cte(conn, details)

        sql = base.select_placex(t).join(tsearch, t.c.place_id == tsearch.c.place_id)

        if details.geometry_output:
            sql = base.add_geometry_columns(sql, t.c.geometry, details)

        penalty: SaExpression = tsearch.c.penalty

        if self.postcodes:
            tpc = conn.t.postcode
            pcs = self.postcodes.values

            pc_near = sa.select(sa.func.min(tpc.c.geometry.ST_Distance(t.c.centroid)
                                            * (tpc.c.rank_search - 19)))\
                        .where(tpc.c.postcode.in_(pcs))\
                        .where(tpc.c.country_code == t.c.country_code)\
                        .scalar_subquery()
            penalty += sa.case((t.c.postcode.in_(pcs), 0.0),
                               else_=sa.func.coalesce(pc_near, cast(SaColumn, 2.0)))

        if details.viewbox is not None and not details.bounded_viewbox:
            penalty += sa.case((t.c.geometry.intersects(VIEWBOX_PARAM, use_index=False), 0.0),
                               (t.c.geometry.intersects(VIEWBOX2_PARAM, use_index=False), 0.5),
                               else_=1.0)

        if details.near is not None:
            sql = sql.add_columns((-tsearch.c.centroid.ST_Distance(NEAR_PARAM))
                                  .label('importance'))
            sql = sql.order_by(sa.desc(sa.text('importance')))
        else:
            sql = sql.order_by(penalty - tsearch.c.importance)
            sql = sql.add_columns(tsearch.c.importance)

        sql = sql.add_columns(penalty.label('accuracy'))\
                 .order_by(sa.text('accuracy'))

        hnr_list = '|'.join(self.housenumbers.values)

        if self.has_address_terms:
            sql = sql.where(sa.or_(tsearch.c.address_rank < 30,
                                   sa.func.RegexpWord(hnr_list, t.c.housenumber)))

        inner = sql.subquery()

        # Housenumbers from placex
        thnr = conn.t.placex.alias('hnr')
        pid_list = sa.func.ArrayAgg(thnr.c.place_id)
        place_sql = sa.select(pid_list)\
                      .where(thnr.c.parent_place_id == inner.c.place_id)\
                      .where(sa.func.RegexpWord(hnr_list, thnr.c.housenumber))\
                      .where(thnr.c.linked_place_id == None)\
                      .where(thnr.c.indexed_status == 0)

        if details.excluded:
            place_sql = place_sql.where(thnr.c.place_id.not_in(sa.bindparam('excluded')))
        if self.qualifiers:
            place_sql = place_sql.where(self.qualifiers.sql_restrict(thnr))

        numerals = [int(n) for n in self.housenumbers.values
                    if n.isdigit() and len(n) < 8]
        interpol_sql: SaColumn
        tiger_sql: SaColumn
        if numerals and \
           (not self.qualifiers or ('place', 'house') in self.qualifiers.values):
            # Housenumbers from interpolations
            interpol_sql = _make_interpolation_subquery(conn.t.osmline, inner,
                                                        numerals, details)
            # Housenumbers from Tiger
            tiger_sql = sa.case((inner.c.country_code == 'us',
                                 _make_interpolation_subquery(conn.t.tiger, inner,
                                                              numerals, details)
                                 ), else_=None)
        else:
            interpol_sql = sa.null()
            tiger_sql = sa.null()

        unsort = sa.select(inner, place_sql.scalar_subquery().label('placex_hnr'),
                           interpol_sql.label('interpol_hnr'),
                           tiger_sql.label('tiger_hnr')).subquery('unsort')
        sql = sa.select(unsort)\
                .order_by(unsort.c.accuracy +
                          sa.case((unsort.c.placex_hnr != None, 0),
                                  (unsort.c.interpol_hnr != None, 0),
                                  (unsort.c.tiger_hnr != None, 0),
                                  else_=1),
                          sa.case((unsort.c.placex_hnr != None, 1),
                                  (unsort.c.interpol_hnr != None, 2),
                                  (unsort.c.tiger_hnr != None, 3),
                                  else_=4))

        sql = sql.limit(LIMIT_PARAM)

        bind_params = {
            'limit': details.max_results,
            'min_rank': details.min_rank,
            'max_rank': details.max_rank,
            'viewbox': details.viewbox,
            'viewbox2': details.viewbox_x2,
            'near': details.near,
            'near_radius': details.near_radius,
            'excluded': details.excluded,
            'countries': details.countries
        }

        results = nres.SearchResults()
        for row in await conn.execute(sql, bind_params):
            result = nres.create_from_placex_row(row, nres.SearchResult)
            result.bbox = Bbox.from_wkb(row.bbox)
            result.accuracy = row.accuracy
            if row.rank_address < 30:
                if row.placex_hnr:
                    subs = _get_placex_housenumbers(conn, row.placex_hnr, details)
                elif row.interpol_hnr:
                    subs = _get_osmline(conn, row.interpol_hnr, numerals, details)
                elif row.tiger_hnr:
                    subs = _get_tiger(conn, row.tiger_hnr, numerals, row.osm_id, details)
                else:
                    subs = None

                if subs is not None:
                    async for sub in subs:
                        assert sub.housenumber
                        sub.accuracy = result.accuracy
                        if not any(nr in self.housenumbers.values
                                   for nr in sub.housenumber.split(';')):
                            sub.accuracy += 0.6
                        results.append(sub)

                # Only add the street as a result, if it meets all other
                # filter conditions.
                if (not details.excluded or result.place_id not in details.excluded)\
                   and (not self.qualifiers or result.category in self.qualifiers.values)\
                   and result.rank_address >= details.min_rank:
                    result.accuracy += 1.0  # penalty for missing housenumber
                    results.append(result)
            else:
                results.append(result)

        return results
