1 # SPDX-License-Identifier: GPL-2.0-only
 
   3 # This file is part of Nominatim. (https://nominatim.org)
 
   5 # Copyright (C) 2022 by the Nominatim developer community.
 
   6 # For a full list of authors see the git log.
 
   8 Functions for updating a database from a replication source.
 
  10 from typing import ContextManager, MutableMapping, Any, Generator, cast, Iterator
 
  11 from contextlib import contextmanager
 
  17 import urllib.request as urlrequest
 
  20 from nominatim.db import status
 
  21 from nominatim.db.connection import Connection
 
  22 from nominatim.tools.exec_utils import run_osm2pgsql
 
  23 from nominatim.errors import UsageError
 
  26     from osmium.replication.server import ReplicationServer
 
  27     from osmium import WriteHandler
 
  28     from osmium import version as pyo_version
 
  29 except ImportError as exc:
 
  30     logging.getLogger().critical("pyosmium not installed. Replication functions not available.\n"
 
  31                                  "To install pyosmium via pip: pip3 install osmium")
 
  32     raise UsageError("replication tools not available") from exc
 
  34 LOG = logging.getLogger()
 
  36 def init_replication(conn: Connection, base_url: str,
 
  37                      socket_timeout: int = 60) -> None:
 
  38     """ Set up replication for the server at the given base URL.
 
  40     LOG.info("Using replication source: %s", base_url)
 
  41     date = status.compute_database_date(conn)
 
  43     # margin of error to make sure we get all data
 
  44     date -= dt.timedelta(hours=3)
 
  46     with _make_replication_server(base_url, socket_timeout) as repl:
 
  47         seq = repl.timestamp_to_sequence(date)
 
  50         LOG.fatal("Cannot reach the configured replication service '%s'.\n"
 
  51                   "Does the URL point to a directory containing OSM update data?",
 
  53         raise UsageError("Failed to reach replication service")
 
  55     status.set_status(conn, date=date, seq=seq)
 
  57     LOG.warning("Updates initialised at sequence %s (%s)", seq, date)
 
  60 def check_for_updates(conn: Connection, base_url: str,
 
  61                       socket_timeout: int = 60) -> int:
 
  62     """ Check if new data is available from the replication service at the
 
  65     _, seq, _ = status.get_status(conn)
 
  68         LOG.error("Replication not set up. "
 
  69                   "Please run 'nominatim replication --init' first.")
 
  72     with _make_replication_server(base_url, socket_timeout) as repl:
 
  73         state = repl.get_state_info()
 
  76         LOG.error("Cannot get state for URL %s.", base_url)
 
  79     if state.sequence <= seq:
 
  80         LOG.warning("Database is up to date.")
 
  83     LOG.warning("New data available (%i => %i).", seq, state.sequence)
 
  86 class UpdateState(Enum):
 
  87     """ Possible states after an update has run.
 
  95 def update(conn: Connection, options: MutableMapping[str, Any],
 
  96            socket_timeout: int = 60) -> UpdateState:
 
  97     """ Update database from the next batch of data. Returns the state of
 
  98         updates according to `UpdateState`.
 
 100     startdate, startseq, indexed = status.get_status(conn)
 
 103         LOG.error("Replication not set up. "
 
 104                   "Please run 'nominatim replication --init' first.")
 
 105         raise UsageError("Replication not set up.")
 
 107     assert startdate is not None
 
 109     if not indexed and options['indexed_only']:
 
 110         LOG.info("Skipping update. There is data that needs indexing.")
 
 111         return UpdateState.MORE_PENDING
 
 113     last_since_update = dt.datetime.now(dt.timezone.utc) - startdate
 
 114     update_interval = dt.timedelta(seconds=options['update_interval'])
 
 115     if last_since_update < update_interval:
 
 116         duration = (update_interval - last_since_update).seconds
 
 117         LOG.warning("Sleeping for %s sec before next update.", duration)
 
 120     if options['import_file'].exists():
 
 121         options['import_file'].unlink()
 
 123     # Read updates into file.
 
 124     with _make_replication_server(options['base_url'], socket_timeout) as repl:
 
 125         outhandler = WriteHandler(str(options['import_file']))
 
 126         endseq = repl.apply_diffs(outhandler, startseq + 1,
 
 127                                   max_size=options['max_diff_size'] * 1024)
 
 131             return UpdateState.NO_CHANGES
 
 133         run_osm2pgsql_updates(conn, options)
 
 135         # Write the current status to the file
 
 136         endstate = repl.get_state_info(endseq)
 
 137         status.set_status(conn, endstate.timestamp if endstate else None,
 
 138                           seq=endseq, indexed=False)
 
 140     return UpdateState.UP_TO_DATE
 
 143 def run_osm2pgsql_updates(conn: Connection, options: MutableMapping[str, Any]) -> None:
 
 144     """ Run osm2pgsql in append mode.
 
 146     # Remove any stale deletion marks.
 
 147     with conn.cursor() as cur:
 
 148         cur.execute('TRUNCATE place_to_be_deleted')
 
 151     # Consume updates with osm2pgsql.
 
 152     options['append'] = True
 
 153     options['disable_jit'] = conn.server_version_tuple() >= (11, 0)
 
 154     run_osm2pgsql(options)
 
 157     with conn.cursor() as cur:
 
 158         cur.execute('SELECT flush_deleted_places()')
 
 162 def _make_replication_server(url: str, timeout: int) -> ContextManager[ReplicationServer]:
 
 163     """ Returns a ReplicationServer in form of a context manager.
 
 165         Creates a light wrapper around older versions of pyosmium that did
 
 166         not support the context manager interface.
 
 168     if hasattr(ReplicationServer, '__enter__'):
 
 169         # Patches the open_url function for pyosmium >= 3.2
 
 170         # where the socket timeout is no longer respected.
 
 171         def patched_open_url(self: ReplicationServer, url: urlrequest.Request) -> Any:
 
 172             """ Download a resource from the given URL and return a byte sequence
 
 175             headers = {"User-Agent" : f"Nominatim (pyosmium/{pyo_version.pyosmium_release})"}
 
 177             if self.session is not None:
 
 178                 return self.session.get(url.get_full_url(),
 
 179                                        headers=headers, timeout=timeout or None,
 
 183             def _get_url_with_session() -> Iterator[requests.Response]:
 
 184                 with requests.Session() as session:
 
 185                     request = session.get(url.get_full_url(),
 
 186                                           headers=headers, timeout=timeout or None,
 
 190             return _get_url_with_session()
 
 192         repl = ReplicationServer(url)
 
 193         setattr(repl, 'open_url', types.MethodType(patched_open_url, repl))
 
 195         return cast(ContextManager[ReplicationServer], repl)
 
 198     def get_cm() -> Generator[ReplicationServer, None, None]:
 
 199         yield ReplicationServer(url)