1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2025 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Functions for importing, updating and otherwise maintaining the table
9 of artificial postcode centroids.
11 from typing import Optional, Tuple, Dict, TextIO
12 from collections import defaultdict
13 from pathlib import Path
17 from math import isfinite
19 from psycopg import sql as pysql
21 from ..db.connection import connect, Connection, table_exists
22 from ..utils.centroid import PointsCentroid
23 from ..data.postcode_format import PostcodeFormatter, CountryPostcodeMatcher
24 from ..tokenizer.base import AbstractAnalyzer, AbstractTokenizer
26 LOG = logging.getLogger()
29 def _to_float(numstr: str, max_value: float) -> float:
30 """ Convert the number in string into a float. The number is expected
31 to be in the range of [-max_value, max_value]. Otherwise rises a
35 if not isfinite(num) or num <= -max_value or num >= max_value:
41 def _extent_to_rank(extent: int) -> int:
42 """ Guess a suitable search rank from the extent of a postcode.
51 class _PostcodeCollector:
52 """ Collector for postcodes of a single country.
55 def __init__(self, country: str, matcher: Optional[CountryPostcodeMatcher],
56 default_extent: int, exclude: set[str] = set()):
57 self.country = country
58 self.matcher = matcher
59 self.extent = default_extent
60 self.exclude = exclude
61 self.collected: Dict[str, PointsCentroid] = defaultdict(PointsCentroid)
62 self.normalization_cache: Optional[Tuple[str, Optional[str]]] = None
64 def add(self, postcode: str, x: float, y: float) -> None:
65 """ Add the given postcode to the collection cache. If the postcode
66 already existed, it is overwritten with the new centroid.
68 if self.matcher is not None:
69 normalized: Optional[str]
70 if self.normalization_cache and self.normalization_cache[0] == postcode:
71 normalized = self.normalization_cache[1]
73 match = self.matcher.match(postcode)
74 normalized = self.matcher.normalize(match) if match else None
75 self.normalization_cache = (postcode, normalized)
77 if normalized and normalized not in self.exclude:
78 self.collected[normalized] += (x, y)
80 def commit(self, conn: Connection, analyzer: AbstractAnalyzer,
81 project_dir: Optional[Path]) -> None:
82 """ Update postcodes for the country from the postcodes selected so far.
84 When 'project_dir' is set, then any postcode files found in this
85 directory are taken into account as well.
87 if project_dir is not None:
88 self._update_from_external(analyzer, project_dir)
90 with conn.cursor() as cur:
91 cur.execute("""SELECT postcode FROM location_postcodes
92 WHERE country_code = %s AND osm_id is null""",
94 to_delete = [row[0] for row in cur if row[0] not in self.collected]
96 to_add = [dict(zip(('pc', 'x', 'y'), (k, *v.centroid())))
97 for k, v in self.collected.items()]
98 self.collected = defaultdict(PointsCentroid)
100 LOG.info("Processing country '%s' (%s added, %s deleted).",
101 self.country, len(to_add), len(to_delete))
103 with conn.cursor() as cur:
105 cur.executemany(pysql.SQL(
106 """INSERT INTO location_postcodes
107 (country_code, rank_search, postcode, centroid, geometry)
108 VALUES ({}, {}, %(pc)s,
109 ST_SetSRID(ST_MakePoint(%(x)s, %(y)s), 4326),
110 expand_by_meters(ST_SetSRID(ST_MakePoint(%(x)s, %(y)s), 4326), {}))
111 """).format(pysql.Literal(self.country),
112 pysql.Literal(_extent_to_rank(self.extent)),
113 pysql.Literal(self.extent)),
116 cur.execute("""DELETE FROM location_postcodes
117 WHERE country_code = %s and postcode = any(%s)
119 """, (self.country, to_delete))
120 cur.execute("ANALYSE location_postcodes")
122 def _update_from_external(self, analyzer: AbstractAnalyzer, project_dir: Path) -> None:
123 """ Look for an external postcode file for the active country in
124 the project directory and add missing postcodes when found.
126 csvfile = self._open_external(project_dir)
131 reader = csv.DictReader(csvfile)
133 if 'postcode' not in row or 'lat' not in row or 'lon' not in row:
134 LOG.warning("Bad format for external postcode file for country '%s'."
135 " Ignored.", self.country)
137 postcode = analyzer.normalize_postcode(row['postcode'])
138 if postcode not in self.collected:
140 # Do float conversation separately, it might throw
141 centroid = (_to_float(row['lon'], 180),
142 _to_float(row['lat'], 90))
143 self.collected[postcode] += centroid
145 LOG.warning("Bad coordinates %s, %s in '%s' country postcode file.",
146 row['lat'], row['lon'], self.country)
151 def _open_external(self, project_dir: Path) -> Optional[TextIO]:
152 fname = project_dir / f'{self.country}_postcodes.csv'
155 LOG.info("Using external postcode file '%s'.", fname)
156 return open(fname, 'r', encoding='utf-8')
158 fname = project_dir / f'{self.country}_postcodes.csv.gz'
161 LOG.info("Using external postcode file '%s'.", fname)
162 return gzip.open(fname, 'rt')
167 def update_postcodes(dsn: str, project_dir: Optional[Path], tokenizer: AbstractTokenizer) -> None:
168 """ Update the table of postcodes from the input tables
169 placex and place_postcode.
171 matcher = PostcodeFormatter()
172 with tokenizer.name_analyzer() as analyzer:
173 with connect(dsn) as conn:
174 # Backfill country_code column where required
175 conn.execute("""UPDATE place_postcode
176 SET country_code = get_country_code(centroid)
177 WHERE country_code is null
179 # Now update first postcode areas
180 _update_postcode_areas(conn, analyzer, matcher)
181 # Then fill with estimated postcode centroids from other info
182 _update_guessed_postcode(conn, analyzer, matcher, project_dir)
185 analyzer.update_postcodes_from_db()
188 def _insert_postcode_areas(conn: Connection, country_code: str,
189 extent: int, pcs: list[dict[str, str]]) -> None:
191 with conn.cursor() as cur:
194 """ INSERT INTO location_postcodes
195 (osm_id, country_code, rank_search, postcode, centroid, geometry)
196 SELECT osm_id, country_code, {}, %(out)s, centroid, geometry
199 and country_code = {} and postcode = %(in)s
200 and geometry is not null
201 """).format(pysql.Literal(_extent_to_rank(extent)),
202 pysql.Literal(country_code)),
206 def _update_postcode_areas(conn: Connection, analyzer: AbstractAnalyzer,
207 matcher: PostcodeFormatter) -> None:
208 """ Update the postcode areas made from postcode boundaries.
210 # first delete all areas that have gone
211 conn.execute(""" DELETE FROM location_postcodes pc
212 WHERE pc.osm_id is not null
214 SELECT * FROM place_postcode pp
215 WHERE pp.osm_type = 'R' and pp.osm_id = pc.osm_id
216 and geometry is not null)
218 # now insert all in country batches, triggers will ensure proper updates
219 with conn.cursor() as cur:
220 cur.execute(""" SELECT country_code, postcode FROM place_postcode
221 WHERE geometry is not null and osm_type = 'R'
222 ORDER BY country_code
227 for cc, postcode in cur:
228 if country_code is None:
230 fmt = matcher.get_matcher(country_code)
231 elif country_code != cc:
232 _insert_postcode_areas(conn, country_code,
233 matcher.get_postcode_extent(country_code), pcs)
235 fmt = matcher.get_matcher(country_code)
239 if (m := fmt.match(postcode)):
240 pcs.append({'out': fmt.normalize(m), 'in': postcode})
242 if country_code is not None and pcs:
243 _insert_postcode_areas(conn, country_code,
244 matcher.get_postcode_extent(country_code), pcs)
247 def _update_guessed_postcode(conn: Connection, analyzer: AbstractAnalyzer,
248 matcher: PostcodeFormatter, project_dir: Optional[Path]) -> None:
249 """ Computes artificial postcode centroids from the placex table,
250 potentially enhances it with external data and then updates the
251 postcodes in the table 'location_postcodes'.
253 # First get the list of countries that currently have postcodes.
254 # (Doing this before starting to insert, so it is fast on import.)
255 with conn.cursor() as cur:
256 cur.execute("""SELECT DISTINCT country_code FROM location_postcodes
257 WHERE osm_id is null""")
258 todo_countries = {row[0] for row in cur}
260 # Next, get the list of postcodes that are already covered by areas.
261 area_pcs = defaultdict(set)
262 with conn.cursor() as cur:
263 cur.execute("""SELECT country_code, postcode
264 FROM location_postcodes WHERE osm_id is not null
265 ORDER BY country_code""")
269 # Create a temporary table which contains coverage of the postcode areas.
270 with conn.cursor() as cur:
271 cur.execute("DROP TABLE IF EXISTS _global_postcode_area")
272 cur.execute("""CREATE TABLE _global_postcode_area AS
273 (SELECT ST_SubDivide(ST_SimplifyPreserveTopology(
274 ST_Union(geometry), 0.00001), 128) as geometry
275 FROM place_postcode WHERE geometry is not null)
277 cur.execute("CREATE INDEX ON _global_postcode_area USING gist(geometry)")
278 # Recompute the list of valid postcodes from placex.
279 with conn.cursor(name="placex_postcodes") as cur:
281 SELECT country_code, postcode, ST_X(centroid), ST_Y(centroid)
283 (SELECT country_code, address->'postcode' as postcode, centroid
284 FROM placex WHERE address ? 'postcode')
286 (SELECT country_code, postcode, centroid
287 FROM place_postcode WHERE geometry is null)
289 WHERE not postcode like '%,%' and not postcode like '%;%'
290 AND NOT EXISTS(SELECT * FROM _global_postcode_area g
291 WHERE ST_Intersects(x.centroid, g.geometry))
292 ORDER BY country_code""")
296 for country, postcode, x, y in cur:
297 if collector is None or country != collector.country:
298 if collector is not None:
299 collector.commit(conn, analyzer, project_dir)
300 collector = _PostcodeCollector(country, matcher.get_matcher(country),
301 matcher.get_postcode_extent(country),
302 exclude=area_pcs[country])
303 todo_countries.discard(country)
304 collector.add(postcode, x, y)
306 if collector is not None:
307 collector.commit(conn, analyzer, project_dir)
309 # Now handle any countries that are only in the postcode table.
310 for country in todo_countries:
311 fmt = matcher.get_matcher(country)
312 ext = matcher.get_postcode_extent(country)
313 _PostcodeCollector(country, fmt, ext,
314 exclude=area_pcs[country]).commit(conn, analyzer, project_dir)
316 conn.execute("DROP TABLE IF EXISTS _global_postcode_area")
319 def can_compute(dsn: str) -> bool:
320 """ Check that the necessary tables exist so that postcodes can be computed.
322 with connect(dsn) as conn:
323 return table_exists(conn, 'place_postcode')