1 # SPDX-License-Identifier: GPL-3.0-or-later
 
   3 # This file is part of Nominatim. (https://nominatim.org)
 
   5 # Copyright (C) 2024 by the Nominatim developer community.
 
   6 # For a full list of authors see the git log.
 
   8 Functions for importing, updating and otherwise maintaining the table
 
   9 of artificial postcode centroids.
 
  11 from typing import Optional, Tuple, Dict, List, TextIO
 
  12 from collections import defaultdict
 
  13 from pathlib import Path
 
  17 from math import isfinite
 
  19 from psycopg import sql as pysql
 
  21 from ..db.connection import connect, Connection, table_exists
 
  22 from ..utils.centroid import PointsCentroid
 
  23 from ..data.postcode_format import PostcodeFormatter, CountryPostcodeMatcher
 
  24 from ..tokenizer.base import AbstractAnalyzer, AbstractTokenizer
 
  26 LOG = logging.getLogger()
 
  29 def _to_float(numstr: str, max_value: float) -> float:
 
  30     """ Convert the number in string into a float. The number is expected
 
  31         to be in the range of [-max_value, max_value]. Otherwise rises a
 
  35     if not isfinite(num) or num <= -max_value or num >= max_value:
 
  41 class _PostcodeCollector:
 
  42     """ Collector for postcodes of a single country.
 
  45     def __init__(self, country: str, matcher: Optional[CountryPostcodeMatcher]):
 
  46         self.country = country
 
  47         self.matcher = matcher
 
  48         self.collected: Dict[str, PointsCentroid] = defaultdict(PointsCentroid)
 
  49         self.normalization_cache: Optional[Tuple[str, Optional[str]]] = None
 
  51     def add(self, postcode: str, x: float, y: float) -> None:
 
  52         """ Add the given postcode to the collection cache. If the postcode
 
  53             already existed, it is overwritten with the new centroid.
 
  55         if self.matcher is not None:
 
  56             normalized: Optional[str]
 
  57             if self.normalization_cache and self.normalization_cache[0] == postcode:
 
  58                 normalized = self.normalization_cache[1]
 
  60                 match = self.matcher.match(postcode)
 
  61                 normalized = self.matcher.normalize(match) if match else None
 
  62                 self.normalization_cache = (postcode, normalized)
 
  65                 self.collected[normalized] += (x, y)
 
  67     def commit(self, conn: Connection, analyzer: AbstractAnalyzer, project_dir: Path) -> None:
 
  68         """ Update postcodes for the country from the postcodes selected so far
 
  69             as well as any externally supplied postcodes.
 
  71         self._update_from_external(analyzer, project_dir)
 
  72         to_add, to_delete, to_update = self._compute_changes(conn)
 
  74         LOG.info("Processing country '%s' (%s added, %s deleted, %s updated).",
 
  75                  self.country, len(to_add), len(to_delete), len(to_update))
 
  77         with conn.cursor() as cur:
 
  79                 cur.executemany(pysql.SQL(
 
  80                     """INSERT INTO location_postcode
 
  81                          (place_id, indexed_status, country_code,
 
  83                        VALUES (nextval('seq_place'), 1, {}, %s,
 
  84                                ST_SetSRID(ST_MakePoint(%s, %s), 4326))
 
  85                     """).format(pysql.Literal(self.country)),
 
  88                 cur.execute("""DELETE FROM location_postcode
 
  89                                WHERE country_code = %s and postcode = any(%s)
 
  90                             """, (self.country, to_delete))
 
  93                     pysql.SQL("""UPDATE location_postcode
 
  94                                  SET indexed_status = 2,
 
  95                                      geometry = ST_SetSRID(ST_Point(%s, %s), 4326)
 
  96                                  WHERE country_code = {} and postcode = %s
 
  97                               """).format(pysql.Literal(self.country)),
 
 100     def _compute_changes(
 
 101             self, conn: Connection
 
 102             ) -> Tuple[List[Tuple[str, float, float]], List[str], List[Tuple[float, float, str]]]:
 
 103         """ Compute which postcodes from the collected postcodes have to be
 
 104             added or modified and which from the location_postcode table
 
 109         with conn.cursor() as cur:
 
 110             cur.execute("""SELECT postcode, ST_X(geometry), ST_Y(geometry)
 
 111                            FROM location_postcode
 
 112                            WHERE country_code = %s""",
 
 114             for postcode, x, y in cur:
 
 115                 pcobj = self.collected.pop(postcode, None)
 
 117                     newx, newy = pcobj.centroid()
 
 118                     if (x - newx) > 0.0000001 or (y - newy) > 0.0000001:
 
 119                         to_update.append((newx, newy, postcode))
 
 121                     to_delete.append(postcode)
 
 123         to_add = [(k, *v.centroid()) for k, v in self.collected.items()]
 
 124         self.collected = defaultdict(PointsCentroid)
 
 126         return to_add, to_delete, to_update
 
 128     def _update_from_external(self, analyzer: AbstractAnalyzer, project_dir: Path) -> None:
 
 129         """ Look for an external postcode file for the active country in
 
 130             the project directory and add missing postcodes when found.
 
 132         csvfile = self._open_external(project_dir)
 
 137             reader = csv.DictReader(csvfile)
 
 139                 if 'postcode' not in row or 'lat' not in row or 'lon' not in row:
 
 140                     LOG.warning("Bad format for external postcode file for country '%s'."
 
 141                                 " Ignored.", self.country)
 
 143                 postcode = analyzer.normalize_postcode(row['postcode'])
 
 144                 if postcode not in self.collected:
 
 146                         # Do float conversation separately, it might throw
 
 147                         centroid = (_to_float(row['lon'], 180),
 
 148                                     _to_float(row['lat'], 90))
 
 149                         self.collected[postcode] += centroid
 
 151                         LOG.warning("Bad coordinates %s, %s in %s country postcode file.",
 
 152                                     row['lat'], row['lon'], self.country)
 
 157     def _open_external(self, project_dir: Path) -> Optional[TextIO]:
 
 158         fname = project_dir / f'{self.country}_postcodes.csv'
 
 161             LOG.info("Using external postcode file '%s'.", fname)
 
 162             return open(fname, 'r', encoding='utf-8')
 
 164         fname = project_dir / f'{self.country}_postcodes.csv.gz'
 
 167             LOG.info("Using external postcode file '%s'.", fname)
 
 168             return gzip.open(fname, 'rt')
 
 173 def update_postcodes(dsn: str, project_dir: Path, tokenizer: AbstractTokenizer) -> None:
 
 174     """ Update the table of artificial postcodes.
 
 176         Computes artificial postcode centroids from the placex table,
 
 177         potentially enhances it with external data and then updates the
 
 178         postcodes in the table 'location_postcode'.
 
 180     matcher = PostcodeFormatter()
 
 181     with tokenizer.name_analyzer() as analyzer:
 
 182         with connect(dsn) as conn:
 
 183             # First get the list of countries that currently have postcodes.
 
 184             # (Doing this before starting to insert, so it is fast on import.)
 
 185             with conn.cursor() as cur:
 
 186                 cur.execute("SELECT DISTINCT country_code FROM location_postcode")
 
 187                 todo_countries = set((row[0] for row in cur))
 
 189             # Recompute the list of valid postcodes from placex.
 
 190             with conn.cursor(name="placex_postcodes") as cur:
 
 192                 SELECT cc, pc, ST_X(centroid), ST_Y(centroid)
 
 194                         COALESCE(plx.country_code,
 
 195                                  get_country_code(ST_Centroid(pl.geometry))) as cc,
 
 196                         pl.address->'postcode' as pc,
 
 197                         COALESCE(plx.centroid, ST_Centroid(pl.geometry)) as centroid
 
 198                       FROM place AS pl LEFT OUTER JOIN placex AS plx
 
 199                              ON pl.osm_id = plx.osm_id AND pl.osm_type = plx.osm_type
 
 200                     WHERE pl.address ? 'postcode' AND pl.geometry IS NOT null) xx
 
 201                 WHERE pc IS NOT null AND cc IS NOT null
 
 206                 for country, postcode, x, y in cur:
 
 207                     if collector is None or country != collector.country:
 
 208                         if collector is not None:
 
 209                             collector.commit(conn, analyzer, project_dir)
 
 210                         collector = _PostcodeCollector(country, matcher.get_matcher(country))
 
 211                         todo_countries.discard(country)
 
 212                     collector.add(postcode, x, y)
 
 214                 if collector is not None:
 
 215                     collector.commit(conn, analyzer, project_dir)
 
 217             # Now handle any countries that are only in the postcode table.
 
 218             for country in todo_countries:
 
 219                 fmt = matcher.get_matcher(country)
 
 220                 _PostcodeCollector(country, fmt).commit(conn, analyzer, project_dir)
 
 224         analyzer.update_postcodes_from_db()
 
 227 def can_compute(dsn: str) -> bool:
 
 229         Check that the place table exists so that
 
 230         postcodes can be computed.
 
 232     with connect(dsn) as conn:
 
 233         return table_exists(conn, 'place')