]> git.openstreetmap.org Git - nominatim.git/blob - src/nominatim_db/tools/postcodes.py
prepare release 5.3.2-post1
[nominatim.git] / src / nominatim_db / tools / postcodes.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2025 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Functions for importing, updating and otherwise maintaining the table
9 of artificial postcode centroids.
10 """
11 from typing import Optional, Tuple, Dict, TextIO
12 from collections import defaultdict
13 from pathlib import Path
14 import csv
15 import gzip
16 import logging
17 from math import isfinite
18
19 from psycopg import sql as pysql
20
21 from ..db.connection import connect, Connection, table_exists
22 from ..utils.centroid import PointsCentroid
23 from ..data.postcode_format import PostcodeFormatter, CountryPostcodeMatcher
24 from ..tokenizer.base import AbstractAnalyzer, AbstractTokenizer
25
26 LOG = logging.getLogger()
27
28
29 def _to_float(numstr: str, max_value: float) -> float:
30     """ Convert the number in string into a float. The number is expected
31         to be in the range of [-max_value, max_value]. Otherwise rises a
32         ValueError.
33     """
34     num = float(numstr)
35     if not isfinite(num) or num <= -max_value or num >= max_value:
36         raise ValueError()
37
38     return num
39
40
41 def _extent_to_rank(extent: int) -> int:
42     """ Guess a suitable search rank from the extent of a postcode.
43     """
44     if extent <= 100:
45         return 25
46     if extent <= 3000:
47         return 23
48     return 21
49
50
51 class _PostcodeCollector:
52     """ Collector for postcodes of a single country.
53     """
54
55     def __init__(self, country: str, matcher: Optional[CountryPostcodeMatcher],
56                  default_extent: int, exclude: set[str] = set()):
57         self.country = country
58         self.matcher = matcher
59         self.extent = default_extent
60         self.exclude = exclude
61         self.collected: Dict[str, PointsCentroid] = defaultdict(PointsCentroid)
62         self.normalization_cache: Optional[Tuple[str, Optional[str]]] = None
63
64     def add(self, postcode: str, x: float, y: float) -> None:
65         """ Add the given postcode to the collection cache. If the postcode
66             already existed, it is overwritten with the new centroid.
67         """
68         if self.matcher is not None:
69             normalized: Optional[str]
70             if self.normalization_cache and self.normalization_cache[0] == postcode:
71                 normalized = self.normalization_cache[1]
72             else:
73                 match = self.matcher.match(postcode)
74                 normalized = self.matcher.normalize(match) if match else None
75                 self.normalization_cache = (postcode, normalized)
76
77             if normalized and normalized not in self.exclude:
78                 self.collected[normalized] += (x, y)
79
80     def commit(self, conn: Connection, analyzer: AbstractAnalyzer,
81                project_dir: Optional[Path], is_initial: bool) -> None:
82         """ Update postcodes for the country from the postcodes selected so far.
83
84             When 'project_dir' is set, then any postcode files found in this
85             directory are taken into account as well.
86         """
87         if project_dir is not None:
88             self._update_from_external(analyzer, project_dir)
89
90         if is_initial:
91             to_delete = []
92         else:
93             with conn.cursor() as cur:
94                 cur.execute("""SELECT postcode FROM location_postcodes
95                                WHERE country_code = %s AND osm_id is null""",
96                             (self.country, ))
97                 to_delete = [row[0] for row in cur if row[0] not in self.collected]
98
99         to_add = [dict(zip(('pc', 'x', 'y'), (k, *v.centroid())))
100                   for k, v in self.collected.items()]
101         self.collected = defaultdict(PointsCentroid)
102
103         LOG.info("Processing country '%s' (%s added, %s deleted).",
104                  self.country, len(to_add), len(to_delete))
105
106         with conn.cursor() as cur:
107             if to_add:
108                 columns = ['country_code',
109                            'rank_search',
110                            'postcode',
111                            'centroid',
112                            'geometry']
113                 values = [pysql.Literal(self.country),
114                           pysql.Literal(_extent_to_rank(self.extent)),
115                           pysql.Placeholder('pc'),
116                           pysql.SQL('ST_SetSRID(ST_MakePoint(%(x)s, %(y)s), 4326)'),
117                           pysql.SQL("""expand_by_meters(
118                                            ST_SetSRID(ST_MakePoint(%(x)s, %(y)s), 4326), {})""")
119                                .format(pysql.Literal(self.extent))]
120                 if is_initial:
121                     columns.extend(('place_id', 'indexed_status'))
122                     values.extend((pysql.SQL("nextval('seq_place')"), pysql.Literal(1)))
123
124                 cur.executemany(pysql.SQL("INSERT INTO location_postcodes ({}) VALUES ({})")
125                                      .format(pysql.SQL(',')
126                                                   .join(pysql.Identifier(c) for c in columns),
127                                              pysql.SQL(',').join(values)),
128                                 to_add)
129             if to_delete:
130                 cur.execute("""DELETE FROM location_postcodes
131                                WHERE country_code = %s and postcode = any(%s)
132                                      AND osm_id is null
133                             """, (self.country, to_delete))
134
135     def _update_from_external(self, analyzer: AbstractAnalyzer, project_dir: Path) -> None:
136         """ Look for an external postcode file for the active country in
137             the project directory and add missing postcodes when found.
138         """
139         csvfile = self._open_external(project_dir)
140         if csvfile is None:
141             return
142
143         try:
144             reader = csv.DictReader(csvfile)
145             for row in reader:
146                 if 'postcode' not in row or 'lat' not in row or 'lon' not in row:
147                     LOG.warning("Bad format for external postcode file for country '%s'."
148                                 " Ignored.", self.country)
149                     return
150                 postcode = analyzer.normalize_postcode(row['postcode'])
151                 if postcode not in self.collected:
152                     try:
153                         # Do float conversation separately, it might throw
154                         centroid = (_to_float(row['lon'], 180),
155                                     _to_float(row['lat'], 90))
156                         self.collected[postcode] += centroid
157                     except ValueError:
158                         LOG.warning("Bad coordinates %s, %s in '%s' country postcode file.",
159                                     row['lat'], row['lon'], self.country)
160
161         finally:
162             csvfile.close()
163
164     def _open_external(self, project_dir: Path) -> Optional[TextIO]:
165         fname = project_dir / f'{self.country}_postcodes.csv'
166
167         if fname.is_file():
168             LOG.info("Using external postcode file '%s'.", fname)
169             return open(fname, 'r', encoding='utf-8')
170
171         fname = project_dir / f'{self.country}_postcodes.csv.gz'
172
173         if fname.is_file():
174             LOG.info("Using external postcode file '%s'.", fname)
175             return gzip.open(fname, 'rt', encoding='utf-8')
176
177         return None
178
179
180 def update_postcodes(dsn: str, project_dir: Optional[Path],
181                      tokenizer: AbstractTokenizer, force_reimport: bool = False) -> None:
182     """ Update the table of postcodes from the input tables
183         placex and place_postcode.
184     """
185     matcher = PostcodeFormatter()
186     with tokenizer.name_analyzer() as analyzer:
187         with connect(dsn) as conn:
188             # Backfill country_code column where required
189             conn.execute("""UPDATE place_postcode
190                               SET country_code = get_country_code(centroid)
191                               WHERE country_code is null
192                          """)
193             if force_reimport:
194                 conn.execute("TRUNCATE location_postcodes")
195                 is_initial = True
196             else:
197                 is_initial = _is_postcode_table_empty(conn)
198             if is_initial:
199                 conn.execute("""ALTER TABLE location_postcodes
200                                 DISABLE TRIGGER location_postcodes_before_insert""")
201             # Now update first postcode areas
202             _update_postcode_areas(conn, analyzer, matcher, is_initial)
203             # Then fill with estimated postcode centroids from other info
204             _update_guessed_postcode(conn, analyzer, matcher, project_dir, is_initial)
205             if is_initial:
206                 conn.execute("""ALTER TABLE location_postcodes
207                                 ENABLE TRIGGER location_postcodes_before_insert""")
208             conn.commit()
209
210         analyzer.update_postcodes_from_db()
211
212
213 def _is_postcode_table_empty(conn: Connection) -> bool:
214     """ Check if there are any entries in the location_postcodes table yet.
215     """
216     with conn.cursor() as cur:
217         cur.execute("SELECT place_id FROM location_postcodes LIMIT 1")
218         return cur.fetchone() is None
219
220
221 def _insert_postcode_areas(conn: Connection, country_code: str,
222                            extent: int, pcs: list[dict[str, str]],
223                            is_initial: bool) -> None:
224     if pcs:
225         with conn.cursor() as cur:
226             columns = ['osm_id', 'country_code',
227                        'rank_search', 'postcode',
228                        'centroid', 'geometry']
229             values = [pysql.Identifier('osm_id'), pysql.Identifier('country_code'),
230                       pysql.Literal(_extent_to_rank(extent)), pysql.Placeholder('out'),
231                       pysql.Identifier('centroid'), pysql.Identifier('geometry')]
232             if is_initial:
233                 columns.extend(('place_id', 'indexed_status'))
234                 values.extend((pysql.SQL("nextval('seq_place')"), pysql.Literal(1)))
235
236             cur.executemany(
237                 pysql.SQL(
238                     """ INSERT INTO location_postcodes ({})
239                             SELECT {} FROM place_postcode
240                             WHERE osm_type = 'R'
241                                   and country_code = {} and postcode = %(in)s
242                                   and geometry is not null
243                     """).format(pysql.SQL(',')
244                                      .join(pysql.Identifier(c) for c in columns),
245                                 pysql.SQL(',').join(values),
246                                 pysql.Literal(country_code)),
247                 pcs)
248
249
250 def _update_postcode_areas(conn: Connection, analyzer: AbstractAnalyzer,
251                            matcher: PostcodeFormatter, is_initial: bool) -> None:
252     """ Update the postcode areas made from postcode boundaries.
253     """
254     # first delete all areas that have gone
255     if not is_initial:
256         conn.execute(""" DELETE FROM location_postcodes pc
257                          WHERE pc.osm_id is not null
258                            AND NOT EXISTS(
259                                   SELECT * FROM place_postcode pp
260                                   WHERE pp.osm_type = 'R' and pp.osm_id = pc.osm_id
261                                         and geometry is not null)
262                     """)
263     # now insert all in country batches, triggers will ensure proper updates
264     with conn.cursor() as cur:
265         cur.execute(""" SELECT country_code, postcode FROM place_postcode
266                         WHERE geometry is not null and osm_type = 'R'
267                         ORDER BY country_code
268                     """)
269         country_code = None
270         fmt = None
271         pcs = []
272         for cc, postcode in cur:
273             if country_code is None:
274                 country_code = cc
275                 fmt = matcher.get_matcher(country_code)
276             elif country_code != cc:
277                 _insert_postcode_areas(conn, country_code,
278                                        matcher.get_postcode_extent(country_code), pcs,
279                                        is_initial)
280                 country_code = cc
281                 fmt = matcher.get_matcher(country_code)
282                 pcs = []
283
284             if fmt is not None:
285                 if (m := fmt.match(postcode)):
286                     pcs.append({'out': fmt.normalize(m), 'in': postcode})
287
288         if country_code is not None and pcs:
289             _insert_postcode_areas(conn, country_code,
290                                    matcher.get_postcode_extent(country_code), pcs,
291                                    is_initial)
292
293
294 def _update_guessed_postcode(conn: Connection, analyzer: AbstractAnalyzer,
295                              matcher: PostcodeFormatter, project_dir: Optional[Path],
296                              is_initial: bool) -> None:
297     """ Computes artificial postcode centroids from the placex table,
298         potentially enhances it with external data and then updates the
299         postcodes in the table 'location_postcodes'.
300     """
301     # First get the list of countries that currently have postcodes.
302     # (Doing this before starting to insert, so it is fast on import.)
303     if is_initial:
304         todo_countries: set[str] = set()
305     else:
306         with conn.cursor() as cur:
307             cur.execute("""SELECT DISTINCT country_code FROM location_postcodes
308                             WHERE osm_id is null""")
309             todo_countries = {row[0] for row in cur}
310
311     # Next, get the list of postcodes that are already covered by areas.
312     area_pcs = defaultdict(set)
313     with conn.cursor() as cur:
314         cur.execute("""SELECT country_code, postcode
315                        FROM location_postcodes WHERE osm_id is not null
316                        ORDER BY country_code""")
317         for cc, pc in cur:
318             area_pcs[cc].add(pc)
319
320     # Create a temporary table which contains coverage of the postcode areas.
321     with conn.cursor() as cur:
322         cur.execute("DROP TABLE IF EXISTS _global_postcode_area")
323         cur.execute("""CREATE TABLE _global_postcode_area AS
324                        (SELECT ST_SubDivide(ST_SimplifyPreserveTopology(
325                                               ST_Union(geometry), 0.00001), 128) as geometry
326                         FROM place_postcode WHERE geometry is not null)
327                     """)
328         cur.execute("CREATE INDEX ON _global_postcode_area USING gist(geometry)")
329
330     # Recompute the list of valid postcodes from placex.
331     with conn.cursor(name="placex_postcodes") as cur:
332         cur.execute("""
333             SELECT country_code, postcode, ST_X(centroid), ST_Y(centroid)
334               FROM (
335                 (SELECT country_code, address->'postcode' as postcode, centroid
336                   FROM placex WHERE address ? 'postcode')
337                 UNION
338                 (SELECT country_code, postcode, centroid
339                  FROM place_postcode WHERE geometry is null)
340               ) x
341               WHERE not postcode like '%,%' and not postcode like '%;%'
342                     AND NOT EXISTS(SELECT * FROM _global_postcode_area g
343                                    WHERE ST_Intersects(x.centroid, g.geometry))
344               ORDER BY country_code""")
345
346         collector = None
347
348         for country, postcode, x, y in cur:
349             if collector is None or country != collector.country:
350                 if collector is not None:
351                     collector.commit(conn, analyzer, project_dir, is_initial)
352                 collector = _PostcodeCollector(country, matcher.get_matcher(country),
353                                                matcher.get_postcode_extent(country),
354                                                exclude=area_pcs[country])
355                 todo_countries.discard(country)
356             collector.add(postcode, x, y)
357
358         if collector is not None:
359             collector.commit(conn, analyzer, project_dir, is_initial)
360
361     # Now handle any countries that are only in the postcode table.
362     for country in todo_countries:
363         fmt = matcher.get_matcher(country)
364         ext = matcher.get_postcode_extent(country)
365         _PostcodeCollector(country, fmt, ext,
366                            exclude=area_pcs[country]).commit(conn, analyzer, project_dir, False)
367
368     conn.execute("DROP TABLE IF EXISTS _global_postcode_area")
369
370
371 def can_compute(dsn: str) -> bool:
372     """ Check that the necessary tables exist so that postcodes can be computed.
373     """
374     with connect(dsn) as conn:
375         return table_exists(conn, 'place_postcode')