1 # SPDX-License-Identifier: GPL-2.0-only
 
   3 # This file is part of Nominatim. (https://nominatim.org)
 
   5 # Copyright (C) 2022 by the Nominatim developer community.
 
   6 # For a full list of authors see the git log.
 
   8 Functions for importing, updating and otherwise maintaining the table
 
   9 of artificial postcode centroids.
 
  11 from typing import Optional, Tuple, Dict, List, TextIO
 
  12 from collections import defaultdict
 
  13 from pathlib import Path
 
  17 from math import isfinite
 
  19 from psycopg2 import sql as pysql
 
  21 from nominatim.db.connection import connect, Connection
 
  22 from nominatim.utils.centroid import PointsCentroid
 
  23 from nominatim.data.postcode_format import PostcodeFormatter, CountryPostcodeMatcher
 
  24 from nominatim.tokenizer.base import AbstractAnalyzer, AbstractTokenizer
 
  26 LOG = logging.getLogger()
 
  28 def _to_float(numstr: str, max_value: float) -> float:
 
  29     """ Convert the number in string into a float. The number is expected
 
  30         to be in the range of [-max_value, max_value]. Otherwise rises a
 
  34     if not isfinite(num) or num <= -max_value or num >= max_value:
 
  39 class _PostcodeCollector:
 
  40     """ Collector for postcodes of a single country.
 
  43     def __init__(self, country: str, matcher: Optional[CountryPostcodeMatcher]):
 
  44         self.country = country
 
  45         self.matcher = matcher
 
  46         self.collected: Dict[str, PointsCentroid] = defaultdict(PointsCentroid)
 
  47         self.normalization_cache: Optional[Tuple[str, Optional[str]]] = None
 
  50     def add(self, postcode: str, x: float, y: float) -> None:
 
  51         """ Add the given postcode to the collection cache. If the postcode
 
  52             already existed, it is overwritten with the new centroid.
 
  54         if self.matcher is not None:
 
  55             normalized: Optional[str]
 
  56             if self.normalization_cache and self.normalization_cache[0] == postcode:
 
  57                 normalized = self.normalization_cache[1]
 
  59                 match = self.matcher.match(postcode)
 
  60                 normalized = self.matcher.normalize(match) if match else None
 
  61                 self.normalization_cache = (postcode, normalized)
 
  64                 self.collected[normalized] += (x, y)
 
  67     def commit(self, conn: Connection, analyzer: AbstractAnalyzer, project_dir: Path) -> None:
 
  68         """ Update postcodes for the country from the postcodes selected so far
 
  69             as well as any externally supplied postcodes.
 
  71         self._update_from_external(analyzer, project_dir)
 
  72         to_add, to_delete, to_update = self._compute_changes(conn)
 
  74         LOG.info("Processing country '%s' (%s added, %s deleted, %s updated).",
 
  75                  self.country, len(to_add), len(to_delete), len(to_update))
 
  77         with conn.cursor() as cur:
 
  80                     """INSERT INTO location_postcode
 
  81                          (place_id, indexed_status, country_code,
 
  82                           postcode, geometry) VALUES %s""",
 
  84                     template=pysql.SQL("""(nextval('seq_place'), 1, {},
 
  85                                           %s, 'SRID=4326;POINT(%s %s)')
 
  86                                        """).format(pysql.Literal(self.country)))
 
  88                 cur.execute("""DELETE FROM location_postcode
 
  89                                WHERE country_code = %s and postcode = any(%s)
 
  90                             """, (self.country, to_delete))
 
  93                     pysql.SQL("""UPDATE location_postcode
 
  94                                  SET indexed_status = 2,
 
  95                                      geometry = ST_SetSRID(ST_Point(v.x, v.y), 4326)
 
  96                                  FROM (VALUES %s) AS v (pc, x, y)
 
  97                                  WHERE country_code = {} and postcode = pc
 
  98                               """).format(pysql.Literal(self.country)), to_update)
 
 101     def _compute_changes(self, conn: Connection) \
 
 102           -> Tuple[List[Tuple[str, float, float]], List[str], List[Tuple[str, float, float]]]:
 
 103         """ Compute which postcodes from the collected postcodes have to be
 
 104             added or modified and which from the location_postcode table
 
 109         with conn.cursor() as cur:
 
 110             cur.execute("""SELECT postcode, ST_X(geometry), ST_Y(geometry)
 
 111                            FROM location_postcode
 
 112                            WHERE country_code = %s""",
 
 114             for postcode, x, y in cur:
 
 115                 pcobj = self.collected.pop(postcode, None)
 
 117                     newx, newy = pcobj.centroid()
 
 118                     if (x - newx) > 0.0000001 or (y - newy) > 0.0000001:
 
 119                         to_update.append((postcode, newx, newy))
 
 121                     to_delete.append(postcode)
 
 123         to_add = [(k, *v.centroid()) for k, v in self.collected.items()]
 
 124         self.collected = defaultdict(PointsCentroid)
 
 126         return to_add, to_delete, to_update
 
 129     def _update_from_external(self, analyzer: AbstractAnalyzer, project_dir: Path) -> None:
 
 130         """ Look for an external postcode file for the active country in
 
 131             the project directory and add missing postcodes when found.
 
 133         csvfile = self._open_external(project_dir)
 
 138             reader = csv.DictReader(csvfile)
 
 140                 if 'postcode' not in row or 'lat' not in row or 'lon' not in row:
 
 141                     LOG.warning("Bad format for external postcode file for country '%s'."
 
 142                                 " Ignored.", self.country)
 
 144                 postcode = analyzer.normalize_postcode(row['postcode'])
 
 145                 if postcode not in self.collected:
 
 147                         # Do float conversation separately, it might throw
 
 148                         centroid = (_to_float(row['lon'], 180),
 
 149                                     _to_float(row['lat'], 90))
 
 150                         self.collected[postcode] += centroid
 
 152                         LOG.warning("Bad coordinates %s, %s in %s country postcode file.",
 
 153                                     row['lat'], row['lon'], self.country)
 
 159     def _open_external(self, project_dir: Path) -> Optional[TextIO]:
 
 160         fname = project_dir / f'{self.country}_postcodes.csv'
 
 163             LOG.info("Using external postcode file '%s'.", fname)
 
 164             return open(fname, 'r', encoding='utf-8')
 
 166         fname = project_dir / f'{self.country}_postcodes.csv.gz'
 
 169             LOG.info("Using external postcode file '%s'.", fname)
 
 170             return gzip.open(fname, 'rt')
 
 175 def update_postcodes(dsn: str, project_dir: Path, tokenizer: AbstractTokenizer) -> None:
 
 176     """ Update the table of artificial postcodes.
 
 178         Computes artificial postcode centroids from the placex table,
 
 179         potentially enhances it with external data and then updates the
 
 180         postcodes in the table 'location_postcode'.
 
 182     matcher = PostcodeFormatter()
 
 183     with tokenizer.name_analyzer() as analyzer:
 
 184         with connect(dsn) as conn:
 
 185             # First get the list of countries that currently have postcodes.
 
 186             # (Doing this before starting to insert, so it is fast on import.)
 
 187             with conn.cursor() as cur:
 
 188                 cur.execute("SELECT DISTINCT country_code FROM location_postcode")
 
 189                 todo_countries = set((row[0] for row in cur))
 
 191             # Recompute the list of valid postcodes from placex.
 
 192             with conn.cursor(name="placex_postcodes") as cur:
 
 194                 SELECT cc, pc, ST_X(centroid), ST_Y(centroid)
 
 196                         COALESCE(plx.country_code,
 
 197                                  get_country_code(ST_Centroid(pl.geometry))) as cc,
 
 198                         pl.address->'postcode' as pc,
 
 199                         COALESCE(plx.centroid, ST_Centroid(pl.geometry)) as centroid
 
 200                       FROM place AS pl LEFT OUTER JOIN placex AS plx
 
 201                              ON pl.osm_id = plx.osm_id AND pl.osm_type = plx.osm_type
 
 202                     WHERE pl.address ? 'postcode' AND pl.geometry IS NOT null) xx
 
 203                 WHERE pc IS NOT null AND cc IS NOT null
 
 208                 for country, postcode, x, y in cur:
 
 209                     if collector is None or country != collector.country:
 
 210                         if collector is not None:
 
 211                             collector.commit(conn, analyzer, project_dir)
 
 212                         collector = _PostcodeCollector(country, matcher.get_matcher(country))
 
 213                         todo_countries.discard(country)
 
 214                     collector.add(postcode, x, y)
 
 216                 if collector is not None:
 
 217                     collector.commit(conn, analyzer, project_dir)
 
 219             # Now handle any countries that are only in the postcode table.
 
 220             for country in todo_countries:
 
 221                 fmt = matcher.get_matcher(country)
 
 222                 _PostcodeCollector(country, fmt).commit(conn, analyzer, project_dir)
 
 226         analyzer.update_postcodes_from_db()
 
 228 def can_compute(dsn: str) -> bool:
 
 230         Check that the place table exists so that
 
 231         postcodes can be computed.
 
 233     with connect(dsn) as conn:
 
 234         return conn.table_exists('place')