1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2025 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Functions for importing, updating and otherwise maintaining the table
9 of artificial postcode centroids.
11 from typing import Optional, Tuple, Dict, TextIO
12 from collections import defaultdict
13 from pathlib import Path
17 from math import isfinite
19 from psycopg import sql as pysql
21 from ..db.connection import connect, Connection, table_exists
22 from ..utils.centroid import PointsCentroid
23 from ..data.postcode_format import PostcodeFormatter, CountryPostcodeMatcher
24 from ..tokenizer.base import AbstractAnalyzer, AbstractTokenizer
26 LOG = logging.getLogger()
29 def _to_float(numstr: str, max_value: float) -> float:
30 """ Convert the number in string into a float. The number is expected
31 to be in the range of [-max_value, max_value]. Otherwise rises a
35 if not isfinite(num) or num <= -max_value or num >= max_value:
41 def _extent_to_rank(extent: int) -> int:
42 """ Guess a suitable search rank from the extent of a postcode.
51 class _PostcodeCollector:
52 """ Collector for postcodes of a single country.
55 def __init__(self, country: str, matcher: Optional[CountryPostcodeMatcher],
56 default_extent: int, exclude: set[str] = set()):
57 self.country = country
58 self.matcher = matcher
59 self.extent = default_extent
60 self.exclude = exclude
61 self.collected: Dict[str, PointsCentroid] = defaultdict(PointsCentroid)
62 self.normalization_cache: Optional[Tuple[str, Optional[str]]] = None
64 def add(self, postcode: str, x: float, y: float) -> None:
65 """ Add the given postcode to the collection cache. If the postcode
66 already existed, it is overwritten with the new centroid.
68 if self.matcher is not None:
69 normalized: Optional[str]
70 if self.normalization_cache and self.normalization_cache[0] == postcode:
71 normalized = self.normalization_cache[1]
73 match = self.matcher.match(postcode)
74 normalized = self.matcher.normalize(match) if match else None
75 self.normalization_cache = (postcode, normalized)
77 if normalized and normalized not in self.exclude:
78 self.collected[normalized] += (x, y)
80 def commit(self, conn: Connection, analyzer: AbstractAnalyzer,
81 project_dir: Optional[Path], is_initial: bool) -> None:
82 """ Update postcodes for the country from the postcodes selected so far.
84 When 'project_dir' is set, then any postcode files found in this
85 directory are taken into account as well.
87 if project_dir is not None:
88 self._update_from_external(analyzer, project_dir)
93 with conn.cursor() as cur:
94 cur.execute("""SELECT postcode FROM location_postcodes
95 WHERE country_code = %s AND osm_id is null""",
97 to_delete = [row[0] for row in cur if row[0] not in self.collected]
99 to_add = [dict(zip(('pc', 'x', 'y'), (k, *v.centroid())))
100 for k, v in self.collected.items()]
101 self.collected = defaultdict(PointsCentroid)
103 LOG.info("Processing country '%s' (%s added, %s deleted).",
104 self.country, len(to_add), len(to_delete))
106 with conn.cursor() as cur:
108 columns = ['country_code',
113 values = [pysql.Literal(self.country),
114 pysql.Literal(_extent_to_rank(self.extent)),
115 pysql.Placeholder('pc'),
116 pysql.SQL('ST_SetSRID(ST_MakePoint(%(x)s, %(y)s), 4326)'),
117 pysql.SQL("""expand_by_meters(
118 ST_SetSRID(ST_MakePoint(%(x)s, %(y)s), 4326), {})""")
119 .format(pysql.Literal(self.extent))]
121 columns.extend(('place_id', 'indexed_status'))
122 values.extend((pysql.SQL("nextval('seq_place')"), pysql.Literal(1)))
124 cur.executemany(pysql.SQL("INSERT INTO location_postcodes ({}) VALUES ({})")
125 .format(pysql.SQL(',')
126 .join(pysql.Identifier(c) for c in columns),
127 pysql.SQL(',').join(values)),
130 cur.execute("""DELETE FROM location_postcodes
131 WHERE country_code = %s and postcode = any(%s)
133 """, (self.country, to_delete))
135 def _update_from_external(self, analyzer: AbstractAnalyzer, project_dir: Path) -> None:
136 """ Look for an external postcode file for the active country in
137 the project directory and add missing postcodes when found.
139 csvfile = self._open_external(project_dir)
144 reader = csv.DictReader(csvfile)
146 if 'postcode' not in row or 'lat' not in row or 'lon' not in row:
147 LOG.warning("Bad format for external postcode file for country '%s'."
148 " Ignored.", self.country)
150 postcode = analyzer.normalize_postcode(row['postcode'])
151 if postcode not in self.collected:
153 # Do float conversation separately, it might throw
154 centroid = (_to_float(row['lon'], 180),
155 _to_float(row['lat'], 90))
156 self.collected[postcode] += centroid
158 LOG.warning("Bad coordinates %s, %s in '%s' country postcode file.",
159 row['lat'], row['lon'], self.country)
164 def _open_external(self, project_dir: Path) -> Optional[TextIO]:
165 fname = project_dir / f'{self.country}_postcodes.csv'
168 LOG.info("Using external postcode file '%s'.", fname)
169 return open(fname, 'r', encoding='utf-8')
171 fname = project_dir / f'{self.country}_postcodes.csv.gz'
174 LOG.info("Using external postcode file '%s'.", fname)
175 return gzip.open(fname, 'rt', encoding='utf-8')
180 def update_postcodes(dsn: str, project_dir: Optional[Path],
181 tokenizer: AbstractTokenizer, force_reimport: bool = False) -> None:
182 """ Update the table of postcodes from the input tables
183 placex and place_postcode.
185 matcher = PostcodeFormatter()
186 with tokenizer.name_analyzer() as analyzer:
187 with connect(dsn) as conn:
188 # Backfill country_code column where required
189 conn.execute("""UPDATE place_postcode
190 SET country_code = get_country_code(centroid)
191 WHERE country_code is null
194 conn.execute("TRUNCATE location_postcodes")
197 is_initial = _is_postcode_table_empty(conn)
199 conn.execute("""ALTER TABLE location_postcodes
200 DISABLE TRIGGER location_postcodes_before_insert""")
201 # Now update first postcode areas
202 _update_postcode_areas(conn, analyzer, matcher, is_initial)
203 # Then fill with estimated postcode centroids from other info
204 _update_guessed_postcode(conn, analyzer, matcher, project_dir, is_initial)
206 conn.execute("""ALTER TABLE location_postcodes
207 ENABLE TRIGGER location_postcodes_before_insert""")
210 analyzer.update_postcodes_from_db()
213 def _is_postcode_table_empty(conn: Connection) -> bool:
214 """ Check if there are any entries in the location_postcodes table yet.
216 with conn.cursor() as cur:
217 cur.execute("SELECT place_id FROM location_postcodes LIMIT 1")
218 return cur.fetchone() is None
221 def _insert_postcode_areas(conn: Connection, country_code: str,
222 extent: int, pcs: list[dict[str, str]],
223 is_initial: bool) -> None:
225 with conn.cursor() as cur:
226 columns = ['osm_id', 'country_code',
227 'rank_search', 'postcode',
228 'centroid', 'geometry']
229 values = [pysql.Identifier('osm_id'), pysql.Identifier('country_code'),
230 pysql.Literal(_extent_to_rank(extent)), pysql.Placeholder('out'),
231 pysql.Identifier('centroid'), pysql.Identifier('geometry')]
233 columns.extend(('place_id', 'indexed_status'))
234 values.extend((pysql.SQL("nextval('seq_place')"), pysql.Literal(1)))
238 """ INSERT INTO location_postcodes ({})
239 SELECT {} FROM place_postcode
241 and country_code = {} and postcode = %(in)s
242 and geometry is not null
243 """).format(pysql.SQL(',')
244 .join(pysql.Identifier(c) for c in columns),
245 pysql.SQL(',').join(values),
246 pysql.Literal(country_code)),
250 def _update_postcode_areas(conn: Connection, analyzer: AbstractAnalyzer,
251 matcher: PostcodeFormatter, is_initial: bool) -> None:
252 """ Update the postcode areas made from postcode boundaries.
254 # first delete all areas that have gone
256 conn.execute(""" DELETE FROM location_postcodes pc
257 WHERE pc.osm_id is not null
259 SELECT * FROM place_postcode pp
260 WHERE pp.osm_type = 'R' and pp.osm_id = pc.osm_id
261 and geometry is not null)
263 # now insert all in country batches, triggers will ensure proper updates
264 with conn.cursor() as cur:
265 cur.execute(""" SELECT country_code, postcode FROM place_postcode
266 WHERE geometry is not null and osm_type = 'R'
267 ORDER BY country_code
272 for cc, postcode in cur:
273 if country_code is None:
275 fmt = matcher.get_matcher(country_code)
276 elif country_code != cc:
277 _insert_postcode_areas(conn, country_code,
278 matcher.get_postcode_extent(country_code), pcs,
281 fmt = matcher.get_matcher(country_code)
285 if (m := fmt.match(postcode)):
286 pcs.append({'out': fmt.normalize(m), 'in': postcode})
288 if country_code is not None and pcs:
289 _insert_postcode_areas(conn, country_code,
290 matcher.get_postcode_extent(country_code), pcs,
294 def _update_guessed_postcode(conn: Connection, analyzer: AbstractAnalyzer,
295 matcher: PostcodeFormatter, project_dir: Optional[Path],
296 is_initial: bool) -> None:
297 """ Computes artificial postcode centroids from the placex table,
298 potentially enhances it with external data and then updates the
299 postcodes in the table 'location_postcodes'.
301 # First get the list of countries that currently have postcodes.
302 # (Doing this before starting to insert, so it is fast on import.)
304 todo_countries: set[str] = set()
306 with conn.cursor() as cur:
307 cur.execute("""SELECT DISTINCT country_code FROM location_postcodes
308 WHERE osm_id is null""")
309 todo_countries = {row[0] for row in cur}
311 # Next, get the list of postcodes that are already covered by areas.
312 area_pcs = defaultdict(set)
313 with conn.cursor() as cur:
314 cur.execute("""SELECT country_code, postcode
315 FROM location_postcodes WHERE osm_id is not null
316 ORDER BY country_code""")
320 # Create a temporary table which contains coverage of the postcode areas.
321 with conn.cursor() as cur:
322 cur.execute("DROP TABLE IF EXISTS _global_postcode_area")
323 cur.execute("""CREATE TABLE _global_postcode_area AS
324 (SELECT ST_SubDivide(ST_SimplifyPreserveTopology(
325 ST_Union(geometry), 0.00001), 128) as geometry
326 FROM place_postcode WHERE geometry is not null)
328 cur.execute("CREATE INDEX ON _global_postcode_area USING gist(geometry)")
330 # Recompute the list of valid postcodes from placex.
331 with conn.cursor(name="placex_postcodes") as cur:
333 SELECT country_code, postcode, ST_X(centroid), ST_Y(centroid)
335 (SELECT country_code, address->'postcode' as postcode, centroid
336 FROM placex WHERE address ? 'postcode')
338 (SELECT country_code, postcode, centroid
339 FROM place_postcode WHERE geometry is null)
341 WHERE not postcode like '%,%' and not postcode like '%;%'
342 AND NOT EXISTS(SELECT * FROM _global_postcode_area g
343 WHERE ST_Intersects(x.centroid, g.geometry))
344 ORDER BY country_code""")
348 for country, postcode, x, y in cur:
349 if collector is None or country != collector.country:
350 if collector is not None:
351 collector.commit(conn, analyzer, project_dir, is_initial)
352 collector = _PostcodeCollector(country, matcher.get_matcher(country),
353 matcher.get_postcode_extent(country),
354 exclude=area_pcs[country])
355 todo_countries.discard(country)
356 collector.add(postcode, x, y)
358 if collector is not None:
359 collector.commit(conn, analyzer, project_dir, is_initial)
361 # Now handle any countries that are only in the postcode table.
362 for country in todo_countries:
363 fmt = matcher.get_matcher(country)
364 ext = matcher.get_postcode_extent(country)
365 _PostcodeCollector(country, fmt, ext,
366 exclude=area_pcs[country]).commit(conn, analyzer, project_dir, False)
368 conn.execute("DROP TABLE IF EXISTS _global_postcode_area")
371 def can_compute(dsn: str) -> bool:
372 """ Check that the necessary tables exist so that postcodes can be computed.
374 with connect(dsn) as conn:
375 return table_exists(conn, 'place_postcode')