1 # SPDX-License-Identifier: GPL-2.0-only
 
   3 # This file is part of Nominatim. (https://nominatim.org)
 
   5 # Copyright (C) 2022 by the Nominatim developer community.
 
   6 # For a full list of authors see the git log.
 
   8 Implementation of the 'replication' sub-command.
 
  10 from typing import Optional
 
  17 from nominatim.db import status
 
  18 from nominatim.db.connection import connect
 
  19 from nominatim.errors import UsageError
 
  20 from nominatim.clicmd.args import NominatimArgs
 
  22 LOG = logging.getLogger()
 
  24 # Do not repeat documentation of subcommand classes.
 
  25 # pylint: disable=C0111
 
  26 # Using non-top-level imports to make pyosmium optional for replication only.
 
  27 # pylint: disable=C0415
 
  29 class UpdateReplication:
 
  31     Update the database using an online replication service.
 
  33     An OSM replication service is an online service that provides regular
 
  34     updates (OSM diff files) for the planet or update they provide. The OSMF
 
  35     provides the primary replication service for the full planet at
 
  36     https://planet.osm.org/replication/ but there are other providers of
 
  37     extracts of OSM data who provide such a service as well.
 
  39     This sub-command allows to set up such a replication service and download
 
  40     and import updates at regular intervals. You need to call '--init' once to
 
  41     set up the process or whenever you change the replication configuration
 
  42     parameters. Without any arguments, the sub-command will go into a loop and
 
  43     continuously apply updates as they become available. Giving `--once` just
 
  44     downloads and imports the next batch of updates.
 
  47     def add_args(self, parser: argparse.ArgumentParser) -> None:
 
  48         group = parser.add_argument_group('Arguments for initialisation')
 
  49         group.add_argument('--init', action='store_true',
 
  50                            help='Initialise the update process')
 
  51         group.add_argument('--no-update-functions', dest='update_functions',
 
  53                            help="Do not update the trigger function to "
 
  54                                 "support differential updates (EXPERT)")
 
  55         group = parser.add_argument_group('Arguments for updates')
 
  56         group.add_argument('--check-for-updates', action='store_true',
 
  57                            help='Check if new updates are available and exit')
 
  58         group.add_argument('--once', action='store_true',
 
  59                            help="Download and apply updates only once. When "
 
  60                                 "not set, updates are continuously applied")
 
  61         group.add_argument('--catch-up', action='store_true',
 
  62                            help="Download and apply updates until no new "
 
  63                                 "data is available on the server")
 
  64         group.add_argument('--no-index', action='store_false', dest='do_index',
 
  65                            help=("Do not index the new data. Only usable "
 
  66                                  "together with --once"))
 
  67         group.add_argument('--osm2pgsql-cache', metavar='SIZE', type=int,
 
  68                            help='Size of cache to be used by osm2pgsql (in MB)')
 
  69         group = parser.add_argument_group('Download parameters')
 
  70         group.add_argument('--socket-timeout', dest='socket_timeout', type=int, default=60,
 
  71                            help='Set timeout for file downloads')
 
  74     def _init_replication(self, args: NominatimArgs) -> int:
 
  75         from ..tools import replication, refresh
 
  77         LOG.warning("Initialising replication updates")
 
  78         with connect(args.config.get_libpq_dsn()) as conn:
 
  79             replication.init_replication(conn, base_url=args.config.REPLICATION_URL)
 
  80             if args.update_functions:
 
  81                 LOG.warning("Create functions")
 
  82                 refresh.create_functions(conn, args.config, True, False)
 
  86     def _check_for_updates(self, args: NominatimArgs) -> int:
 
  87         from ..tools import replication
 
  89         with connect(args.config.get_libpq_dsn()) as conn:
 
  90             return replication.check_for_updates(conn, base_url=args.config.REPLICATION_URL)
 
  93     def _report_update(self, batchdate: dt.datetime,
 
  94                        start_import: dt.datetime,
 
  95                        start_index: Optional[dt.datetime]) -> None:
 
  96         def round_time(delta: dt.timedelta) -> dt.timedelta:
 
  97             return dt.timedelta(seconds=int(delta.total_seconds()))
 
  99         end = dt.datetime.now(dt.timezone.utc)
 
 100         LOG.warning("Update completed. Import: %s. %sTotal: %s. Remaining backlog: %s.",
 
 101                     round_time((start_index or end) - start_import),
 
 102                     f"Indexing: {round_time(end - start_index)} " if start_index else '',
 
 103                     round_time(end - start_import),
 
 104                     round_time(end - batchdate))
 
 107     def _compute_update_interval(self, args: NominatimArgs) -> int:
 
 111         update_interval = args.config.get_int('REPLICATION_UPDATE_INTERVAL')
 
 112         # Sanity check to not overwhelm the Geofabrik servers.
 
 113         if 'download.geofabrik.de' in args.config.REPLICATION_URL\
 
 114            and update_interval < 86400:
 
 115             LOG.fatal("Update interval too low for download.geofabrik.de.\n"
 
 116                       "Please check install documentation "
 
 117                       "(https://nominatim.org/release-docs/latest/admin/Import-and-Update#"
 
 118                       "setting-up-the-update-process).")
 
 119             raise UsageError("Invalid replication update interval setting.")
 
 121         return update_interval
 
 124     def _update(self, args: NominatimArgs) -> None:
 
 125         # pylint: disable=too-many-locals
 
 126         from ..tools import replication
 
 127         from ..indexer.indexer import Indexer
 
 128         from ..tokenizer import factory as tokenizer_factory
 
 130         update_interval = self._compute_update_interval(args)
 
 132         params = args.osm2pgsql_options(default_cache=2000, default_threads=1)
 
 133         params.update(base_url=args.config.REPLICATION_URL,
 
 134                       update_interval=update_interval,
 
 135                       import_file=args.project_dir / 'osmosischange.osc',
 
 136                       max_diff_size=args.config.get_int('REPLICATION_MAX_DIFF'),
 
 137                       indexed_only=not args.once)
 
 140             if not args.do_index:
 
 141                 LOG.fatal("Indexing cannot be disabled when running updates continuously.")
 
 142                 raise UsageError("Bad argument '--no-index'.")
 
 143             recheck_interval = args.config.get_int('REPLICATION_RECHECK_INTERVAL')
 
 145         tokenizer = tokenizer_factory.get_tokenizer_for_db(args.config)
 
 146         indexer = Indexer(args.config.get_libpq_dsn(), tokenizer, args.threads or 1)
 
 149             with connect(args.config.get_libpq_dsn()) as conn:
 
 150                 start = dt.datetime.now(dt.timezone.utc)
 
 151                 state = replication.update(conn, params)
 
 152                 if state is not replication.UpdateState.NO_CHANGES:
 
 153                     status.log_status(conn, start, 'import')
 
 154                 batchdate, _, _ = status.get_status(conn)
 
 157             if state is not replication.UpdateState.NO_CHANGES and args.do_index:
 
 158                 index_start = dt.datetime.now(dt.timezone.utc)
 
 159                 indexer.index_full(analyse=False)
 
 161                 with connect(args.config.get_libpq_dsn()) as conn:
 
 162                     status.set_indexed(conn, True)
 
 163                     status.log_status(conn, index_start, 'index')
 
 168             if state is replication.UpdateState.NO_CHANGES and \
 
 169                args.catch_up or update_interval > 40*60:
 
 170                 while indexer.has_pending():
 
 171                     indexer.index_full(analyse=False)
 
 173             if LOG.isEnabledFor(logging.WARNING):
 
 174                 assert batchdate is not None
 
 175                 self._report_update(batchdate, start, index_start)
 
 177             if args.once or (args.catch_up and state is replication.UpdateState.NO_CHANGES):
 
 180             if state is replication.UpdateState.NO_CHANGES:
 
 181                 LOG.warning("No new changes. Sleeping for %d sec.", recheck_interval)
 
 182                 time.sleep(recheck_interval)
 
 185     def run(self, args: NominatimArgs) -> int:
 
 186         socket.setdefaulttimeout(args.socket_timeout)
 
 189             return self._init_replication(args)
 
 191         if args.check_for_updates:
 
 192             return self._check_for_updates(args)