]> git.openstreetmap.org Git - nominatim.git/blob - src/nominatim_db/tools/postcodes.py
release 5.2.0.post7
[nominatim.git] / src / nominatim_db / tools / postcodes.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2025 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Functions for importing, updating and otherwise maintaining the table
9 of artificial postcode centroids.
10 """
11 from typing import Optional, Tuple, Dict, TextIO
12 from collections import defaultdict
13 from pathlib import Path
14 import csv
15 import gzip
16 import logging
17 from math import isfinite
18
19 from psycopg import sql as pysql
20
21 from ..db.connection import connect, Connection, table_exists
22 from ..utils.centroid import PointsCentroid
23 from ..data.postcode_format import PostcodeFormatter, CountryPostcodeMatcher
24 from ..tokenizer.base import AbstractAnalyzer, AbstractTokenizer
25
26 LOG = logging.getLogger()
27
28
29 def _to_float(numstr: str, max_value: float) -> float:
30     """ Convert the number in string into a float. The number is expected
31         to be in the range of [-max_value, max_value]. Otherwise rises a
32         ValueError.
33     """
34     num = float(numstr)
35     if not isfinite(num) or num <= -max_value or num >= max_value:
36         raise ValueError()
37
38     return num
39
40
41 def _extent_to_rank(extent: int) -> int:
42     """ Guess a suitable search rank from the extent of a postcode.
43     """
44     if extent <= 100:
45         return 25
46     if extent <= 3000:
47         return 23
48     return 21
49
50
51 class _PostcodeCollector:
52     """ Collector for postcodes of a single country.
53     """
54
55     def __init__(self, country: str, matcher: Optional[CountryPostcodeMatcher],
56                  default_extent: int, exclude: set[str] = set()):
57         self.country = country
58         self.matcher = matcher
59         self.extent = default_extent
60         self.exclude = exclude
61         self.collected: Dict[str, PointsCentroid] = defaultdict(PointsCentroid)
62         self.normalization_cache: Optional[Tuple[str, Optional[str]]] = None
63
64     def add(self, postcode: str, x: float, y: float) -> None:
65         """ Add the given postcode to the collection cache. If the postcode
66             already existed, it is overwritten with the new centroid.
67         """
68         if self.matcher is not None:
69             normalized: Optional[str]
70             if self.normalization_cache and self.normalization_cache[0] == postcode:
71                 normalized = self.normalization_cache[1]
72             else:
73                 match = self.matcher.match(postcode)
74                 normalized = self.matcher.normalize(match) if match else None
75                 self.normalization_cache = (postcode, normalized)
76
77             if normalized and normalized not in self.exclude:
78                 self.collected[normalized] += (x, y)
79
80     def commit(self, conn: Connection, analyzer: AbstractAnalyzer,
81                project_dir: Optional[Path]) -> None:
82         """ Update postcodes for the country from the postcodes selected so far.
83
84             When 'project_dir' is set, then any postcode files found in this
85             directory are taken into account as well.
86         """
87         if project_dir is not None:
88             self._update_from_external(analyzer, project_dir)
89
90         with conn.cursor() as cur:
91             cur.execute("""SELECT postcode FROM location_postcodes
92                            WHERE country_code = %s AND osm_id is null""",
93                         (self.country, ))
94             to_delete = [row[0] for row in cur if row[0] not in self.collected]
95
96         to_add = [dict(zip(('pc', 'x', 'y'), (k, *v.centroid())))
97                   for k, v in self.collected.items()]
98         self.collected = defaultdict(PointsCentroid)
99
100         LOG.info("Processing country '%s' (%s added, %s deleted).",
101                  self.country, len(to_add), len(to_delete))
102
103         with conn.cursor() as cur:
104             if to_add:
105                 cur.executemany(pysql.SQL(
106                     """INSERT INTO location_postcodes
107                          (country_code, rank_search, postcode, centroid, geometry)
108                        VALUES ({}, {}, %(pc)s,
109                                ST_SetSRID(ST_MakePoint(%(x)s, %(y)s), 4326),
110                                expand_by_meters(ST_SetSRID(ST_MakePoint(%(x)s, %(y)s), 4326), {}))
111                     """).format(pysql.Literal(self.country),
112                                 pysql.Literal(_extent_to_rank(self.extent)),
113                                 pysql.Literal(self.extent)),
114                     to_add)
115             if to_delete:
116                 cur.execute("""DELETE FROM location_postcodes
117                                WHERE country_code = %s and postcode = any(%s)
118                                      AND osm_id is null
119                             """, (self.country, to_delete))
120             cur.execute("ANALYSE location_postcodes")
121
122     def _update_from_external(self, analyzer: AbstractAnalyzer, project_dir: Path) -> None:
123         """ Look for an external postcode file for the active country in
124             the project directory and add missing postcodes when found.
125         """
126         csvfile = self._open_external(project_dir)
127         if csvfile is None:
128             return
129
130         try:
131             reader = csv.DictReader(csvfile)
132             for row in reader:
133                 if 'postcode' not in row or 'lat' not in row or 'lon' not in row:
134                     LOG.warning("Bad format for external postcode file for country '%s'."
135                                 " Ignored.", self.country)
136                     return
137                 postcode = analyzer.normalize_postcode(row['postcode'])
138                 if postcode not in self.collected:
139                     try:
140                         # Do float conversation separately, it might throw
141                         centroid = (_to_float(row['lon'], 180),
142                                     _to_float(row['lat'], 90))
143                         self.collected[postcode] += centroid
144                     except ValueError:
145                         LOG.warning("Bad coordinates %s, %s in '%s' country postcode file.",
146                                     row['lat'], row['lon'], self.country)
147
148         finally:
149             csvfile.close()
150
151     def _open_external(self, project_dir: Path) -> Optional[TextIO]:
152         fname = project_dir / f'{self.country}_postcodes.csv'
153
154         if fname.is_file():
155             LOG.info("Using external postcode file '%s'.", fname)
156             return open(fname, 'r', encoding='utf-8')
157
158         fname = project_dir / f'{self.country}_postcodes.csv.gz'
159
160         if fname.is_file():
161             LOG.info("Using external postcode file '%s'.", fname)
162             return gzip.open(fname, 'rt')
163
164         return None
165
166
167 def update_postcodes(dsn: str, project_dir: Optional[Path], tokenizer: AbstractTokenizer) -> None:
168     """ Update the table of postcodes from the input tables
169         placex and place_postcode.
170     """
171     matcher = PostcodeFormatter()
172     with tokenizer.name_analyzer() as analyzer:
173         with connect(dsn) as conn:
174             # Backfill country_code column where required
175             conn.execute("""UPDATE place_postcode
176                               SET country_code = get_country_code(centroid)
177                               WHERE country_code is null
178                          """)
179             # Now update first postcode areas
180             _update_postcode_areas(conn, analyzer, matcher)
181             # Then fill with estimated postcode centroids from other info
182             _update_guessed_postcode(conn, analyzer, matcher, project_dir)
183             conn.commit()
184
185         analyzer.update_postcodes_from_db()
186
187
188 def _insert_postcode_areas(conn: Connection, country_code: str,
189                            extent: int, pcs: list[dict[str, str]]) -> None:
190     if pcs:
191         with conn.cursor() as cur:
192             cur.executemany(
193                 pysql.SQL(
194                     """ INSERT INTO location_postcodes
195                             (osm_id, country_code, rank_search, postcode, centroid, geometry)
196                             SELECT osm_id, country_code, {}, %(out)s, centroid, geometry
197                             FROM place_postcode
198                             WHERE osm_type = 'R'
199                                   and country_code = {} and postcode = %(in)s
200                                   and geometry is not null
201                     """).format(pysql.Literal(_extent_to_rank(extent)),
202                                 pysql.Literal(country_code)),
203                 pcs)
204
205
206 def _update_postcode_areas(conn: Connection, analyzer: AbstractAnalyzer,
207                            matcher: PostcodeFormatter) -> None:
208     """ Update the postcode areas made from postcode boundaries.
209     """
210     # first delete all areas that have gone
211     conn.execute(""" DELETE FROM location_postcodes pc
212                      WHERE pc.osm_id is not null
213                        AND NOT EXISTS(
214                               SELECT * FROM place_postcode pp
215                               WHERE pp.osm_type = 'R' and pp.osm_id = pc.osm_id
216                                     and geometry is not null)
217                 """)
218     # now insert all in country batches, triggers will ensure proper updates
219     with conn.cursor() as cur:
220         cur.execute(""" SELECT country_code, postcode FROM place_postcode
221                         WHERE geometry is not null and osm_type = 'R'
222                         ORDER BY country_code
223                     """)
224         country_code = None
225         fmt = None
226         pcs = []
227         for cc, postcode in cur:
228             if country_code is None:
229                 country_code = cc
230                 fmt = matcher.get_matcher(country_code)
231             elif country_code != cc:
232                 _insert_postcode_areas(conn, country_code,
233                                        matcher.get_postcode_extent(country_code), pcs)
234                 country_code = cc
235                 fmt = matcher.get_matcher(country_code)
236                 pcs = []
237
238             if fmt is not None:
239                 if (m := fmt.match(postcode)):
240                     pcs.append({'out': fmt.normalize(m), 'in': postcode})
241
242         if country_code is not None and pcs:
243             _insert_postcode_areas(conn, country_code,
244                                    matcher.get_postcode_extent(country_code), pcs)
245
246
247 def _update_guessed_postcode(conn: Connection, analyzer: AbstractAnalyzer,
248                              matcher: PostcodeFormatter, project_dir: Optional[Path]) -> None:
249     """ Computes artificial postcode centroids from the placex table,
250         potentially enhances it with external data and then updates the
251         postcodes in the table 'location_postcodes'.
252     """
253     # First get the list of countries that currently have postcodes.
254     # (Doing this before starting to insert, so it is fast on import.)
255     with conn.cursor() as cur:
256         cur.execute("""SELECT DISTINCT country_code FROM location_postcodes
257                         WHERE osm_id is null""")
258         todo_countries = {row[0] for row in cur}
259
260     # Next, get the list of postcodes that are already covered by areas.
261     area_pcs = defaultdict(set)
262     with conn.cursor() as cur:
263         cur.execute("""SELECT country_code, postcode
264                        FROM location_postcodes WHERE osm_id is not null
265                        ORDER BY country_code""")
266         for cc, pc in cur:
267             area_pcs[cc].add(pc)
268
269     # Create a temporary table which contains coverage of the postcode areas.
270     with conn.cursor() as cur:
271         cur.execute("DROP TABLE IF EXISTS _global_postcode_area")
272         cur.execute("""CREATE TABLE _global_postcode_area AS
273                        (SELECT ST_SubDivide(ST_SimplifyPreserveTopology(
274                                               ST_Union(geometry), 0.00001), 128) as geometry
275                         FROM place_postcode WHERE geometry is not null)
276                     """)
277         cur.execute("CREATE INDEX ON _global_postcode_area USING gist(geometry)")
278     # Recompute the list of valid postcodes from placex.
279     with conn.cursor(name="placex_postcodes") as cur:
280         cur.execute("""
281             SELECT country_code, postcode, ST_X(centroid), ST_Y(centroid)
282               FROM (
283                 (SELECT country_code, address->'postcode' as postcode, centroid
284                   FROM placex WHERE address ? 'postcode')
285                 UNION
286                 (SELECT country_code, postcode, centroid
287                  FROM place_postcode WHERE geometry is null)
288               ) x
289               WHERE not postcode like '%,%' and not postcode like '%;%'
290                     AND NOT EXISTS(SELECT * FROM _global_postcode_area g
291                                    WHERE ST_Intersects(x.centroid, g.geometry))
292               ORDER BY country_code""")
293
294         collector = None
295
296         for country, postcode, x, y in cur:
297             if collector is None or country != collector.country:
298                 if collector is not None:
299                     collector.commit(conn, analyzer, project_dir)
300                 collector = _PostcodeCollector(country, matcher.get_matcher(country),
301                                                matcher.get_postcode_extent(country),
302                                                exclude=area_pcs[country])
303                 todo_countries.discard(country)
304             collector.add(postcode, x, y)
305
306         if collector is not None:
307             collector.commit(conn, analyzer, project_dir)
308
309     # Now handle any countries that are only in the postcode table.
310     for country in todo_countries:
311         fmt = matcher.get_matcher(country)
312         ext = matcher.get_postcode_extent(country)
313         _PostcodeCollector(country, fmt, ext,
314                            exclude=area_pcs[country]).commit(conn, analyzer, project_dir)
315
316     conn.execute("DROP TABLE IF EXISTS _global_postcode_area")
317
318
319 def can_compute(dsn: str) -> bool:
320     """ Check that the necessary tables exist so that postcodes can be computed.
321     """
322     with connect(dsn) as conn:
323         return table_exists(conn, 'place_postcode')