1 # SPDX-License-Identifier: GPL-3.0-or-later
 
   3 # This file is part of Nominatim. (https://nominatim.org)
 
   5 # Copyright (C) 2025 by the Nominatim developer community.
 
   6 # For a full list of authors see the git log.
 
   8 Functions for importing, updating and otherwise maintaining the table
 
   9 of artificial postcode centroids.
 
  11 from typing import Optional, Tuple, Dict, List, TextIO
 
  12 from collections import defaultdict
 
  13 from pathlib import Path
 
  17 from math import isfinite
 
  19 from psycopg import sql as pysql
 
  21 from ..db.connection import connect, Connection, table_exists
 
  22 from ..utils.centroid import PointsCentroid
 
  23 from ..data.postcode_format import PostcodeFormatter, CountryPostcodeMatcher
 
  24 from ..tokenizer.base import AbstractAnalyzer, AbstractTokenizer
 
  26 LOG = logging.getLogger()
 
  29 def _to_float(numstr: str, max_value: float) -> float:
 
  30     """ Convert the number in string into a float. The number is expected
 
  31         to be in the range of [-max_value, max_value]. Otherwise rises a
 
  35     if not isfinite(num) or num <= -max_value or num >= max_value:
 
  41 class _PostcodeCollector:
 
  42     """ Collector for postcodes of a single country.
 
  45     def __init__(self, country: str, matcher: Optional[CountryPostcodeMatcher]):
 
  46         self.country = country
 
  47         self.matcher = matcher
 
  48         self.collected: Dict[str, PointsCentroid] = defaultdict(PointsCentroid)
 
  49         self.normalization_cache: Optional[Tuple[str, Optional[str]]] = None
 
  51     def add(self, postcode: str, x: float, y: float) -> None:
 
  52         """ Add the given postcode to the collection cache. If the postcode
 
  53             already existed, it is overwritten with the new centroid.
 
  55         if self.matcher is not None:
 
  56             normalized: Optional[str]
 
  57             if self.normalization_cache and self.normalization_cache[0] == postcode:
 
  58                 normalized = self.normalization_cache[1]
 
  60                 match = self.matcher.match(postcode)
 
  61                 normalized = self.matcher.normalize(match) if match else None
 
  62                 self.normalization_cache = (postcode, normalized)
 
  65                 self.collected[normalized] += (x, y)
 
  67     def commit(self, conn: Connection, analyzer: AbstractAnalyzer,
 
  68                project_dir: Optional[Path]) -> None:
 
  69         """ Update postcodes for the country from the postcodes selected so far.
 
  71             When 'project_dir' is set, then any postcode files found in this
 
  72             directory are taken into account as well.
 
  74         if project_dir is not None:
 
  75             self._update_from_external(analyzer, project_dir)
 
  76         to_add, to_delete, to_update = self._compute_changes(conn)
 
  78         LOG.info("Processing country '%s' (%s added, %s deleted, %s updated).",
 
  79                  self.country, len(to_add), len(to_delete), len(to_update))
 
  81         with conn.cursor() as cur:
 
  83                 cur.executemany(pysql.SQL(
 
  84                     """INSERT INTO location_postcode
 
  85                          (place_id, indexed_status, country_code,
 
  87                        VALUES (nextval('seq_place'), 1, {}, %s,
 
  88                                ST_SetSRID(ST_MakePoint(%s, %s), 4326))
 
  89                     """).format(pysql.Literal(self.country)),
 
  92                 cur.execute("""DELETE FROM location_postcode
 
  93                                WHERE country_code = %s and postcode = any(%s)
 
  94                             """, (self.country, to_delete))
 
  97                     pysql.SQL("""UPDATE location_postcode
 
  98                                  SET indexed_status = 2,
 
  99                                      geometry = ST_SetSRID(ST_Point(%s, %s), 4326)
 
 100                                  WHERE country_code = {} and postcode = %s
 
 101                               """).format(pysql.Literal(self.country)),
 
 104     def _compute_changes(
 
 105             self, conn: Connection
 
 106             ) -> Tuple[List[Tuple[str, float, float]], List[str], List[Tuple[float, float, str]]]:
 
 107         """ Compute which postcodes from the collected postcodes have to be
 
 108             added or modified and which from the location_postcode table
 
 113         with conn.cursor() as cur:
 
 114             cur.execute("""SELECT postcode, ST_X(geometry), ST_Y(geometry)
 
 115                            FROM location_postcode
 
 116                            WHERE country_code = %s""",
 
 118             for postcode, x, y in cur:
 
 119                 pcobj = self.collected.pop(postcode, None)
 
 121                     newx, newy = pcobj.centroid()
 
 122                     if abs(x - newx) > 0.0000001 or abs(y - newy) > 0.0000001:
 
 123                         to_update.append((newx, newy, postcode))
 
 125                     to_delete.append(postcode)
 
 127         to_add = [(k, *v.centroid()) for k, v in self.collected.items()]
 
 128         self.collected = defaultdict(PointsCentroid)
 
 130         return to_add, to_delete, to_update
 
 132     def _update_from_external(self, analyzer: AbstractAnalyzer, project_dir: Path) -> None:
 
 133         """ Look for an external postcode file for the active country in
 
 134             the project directory and add missing postcodes when found.
 
 136         csvfile = self._open_external(project_dir)
 
 141             reader = csv.DictReader(csvfile)
 
 143                 if 'postcode' not in row or 'lat' not in row or 'lon' not in row:
 
 144                     LOG.warning("Bad format for external postcode file for country '%s'."
 
 145                                 " Ignored.", self.country)
 
 147                 postcode = analyzer.normalize_postcode(row['postcode'])
 
 148                 if postcode not in self.collected:
 
 150                         # Do float conversation separately, it might throw
 
 151                         centroid = (_to_float(row['lon'], 180),
 
 152                                     _to_float(row['lat'], 90))
 
 153                         self.collected[postcode] += centroid
 
 155                         LOG.warning("Bad coordinates %s, %s in %s country postcode file.",
 
 156                                     row['lat'], row['lon'], self.country)
 
 161     def _open_external(self, project_dir: Path) -> Optional[TextIO]:
 
 162         fname = project_dir / f'{self.country}_postcodes.csv'
 
 165             LOG.info("Using external postcode file '%s'.", fname)
 
 166             return open(fname, 'r', encoding='utf-8')
 
 168         fname = project_dir / f'{self.country}_postcodes.csv.gz'
 
 171             LOG.info("Using external postcode file '%s'.", fname)
 
 172             return gzip.open(fname, 'rt')
 
 177 def update_postcodes(dsn: str, project_dir: Optional[Path], tokenizer: AbstractTokenizer) -> None:
 
 178     """ Update the table of artificial postcodes.
 
 180         Computes artificial postcode centroids from the placex table,
 
 181         potentially enhances it with external data and then updates the
 
 182         postcodes in the table 'location_postcode'.
 
 184     matcher = PostcodeFormatter()
 
 185     with tokenizer.name_analyzer() as analyzer:
 
 186         with connect(dsn) as conn:
 
 187             # First get the list of countries that currently have postcodes.
 
 188             # (Doing this before starting to insert, so it is fast on import.)
 
 189             with conn.cursor() as cur:
 
 190                 cur.execute("SELECT DISTINCT country_code FROM location_postcode")
 
 191                 todo_countries = set((row[0] for row in cur))
 
 193             # Recompute the list of valid postcodes from placex.
 
 194             with conn.cursor(name="placex_postcodes") as cur:
 
 196                 SELECT cc, pc, ST_X(centroid), ST_Y(centroid)
 
 198                         COALESCE(plx.country_code,
 
 199                                  get_country_code(ST_Centroid(pl.geometry))) as cc,
 
 200                         pl.address->'postcode' as pc,
 
 201                         COALESCE(plx.centroid, ST_Centroid(pl.geometry)) as centroid
 
 202                       FROM place AS pl LEFT OUTER JOIN placex AS plx
 
 203                              ON pl.osm_id = plx.osm_id AND pl.osm_type = plx.osm_type
 
 204                     WHERE pl.address ? 'postcode' AND pl.geometry IS NOT null) xx
 
 205                 WHERE pc IS NOT null AND cc IS NOT null
 
 210                 for country, postcode, x, y in cur:
 
 211                     if collector is None or country != collector.country:
 
 212                         if collector is not None:
 
 213                             collector.commit(conn, analyzer, project_dir)
 
 214                         collector = _PostcodeCollector(country, matcher.get_matcher(country))
 
 215                         todo_countries.discard(country)
 
 216                     collector.add(postcode, x, y)
 
 218                 if collector is not None:
 
 219                     collector.commit(conn, analyzer, project_dir)
 
 221             # Now handle any countries that are only in the postcode table.
 
 222             for country in todo_countries:
 
 223                 fmt = matcher.get_matcher(country)
 
 224                 _PostcodeCollector(country, fmt).commit(conn, analyzer, project_dir)
 
 228         analyzer.update_postcodes_from_db()
 
 231 def can_compute(dsn: str) -> bool:
 
 233         Check that the place table exists so that
 
 234         postcodes can be computed.
 
 236     with connect(dsn) as conn:
 
 237         return table_exists(conn, 'place')