2 Implementation of the 'replication' sub-command.
 
   9 from nominatim.db import status
 
  10 from nominatim.db.connection import connect
 
  11 from nominatim.errors import UsageError
 
  13 LOG = logging.getLogger()
 
  15 # Do not repeat documentation of subcommand classes.
 
  16 # pylint: disable=C0111
 
  17 # Using non-top-level imports to make pyosmium optional for replication only.
 
  18 # pylint: disable=E0012,C0415
 
  20 class UpdateReplication:
 
  22     Update the database using an online replication service.
 
  24     An OSM replication service is an online service that provides regular
 
  25     updates (OSM diff files) for the planet or update they provide. The OSMF
 
  26     provides the primary replication service for the full planet at
 
  27     https://planet.osm.org/replication/ but there are other providers of
 
  28     extracts of OSM data who provide such a service as well.
 
  30     This sub-command allows to set up such a replication service and download
 
  31     and import updates at regular intervals. You need to call '--init' once to
 
  32     set up the process or whenever you change the replication configuration
 
  33     parameters. Without any arguments, the sub-command will go into a loop and
 
  34     continuously apply updates as they become available. Giving `--once` just
 
  35     downloads and imports the next batch of updates.
 
  40         group = parser.add_argument_group('Arguments for initialisation')
 
  41         group.add_argument('--init', action='store_true',
 
  42                            help='Initialise the update process')
 
  43         group.add_argument('--no-update-functions', dest='update_functions',
 
  45                            help="Do not update the trigger function to "
 
  46                                 "support differential updates (EXPERT)")
 
  47         group = parser.add_argument_group('Arguments for updates')
 
  48         group.add_argument('--check-for-updates', action='store_true',
 
  49                            help='Check if new updates are available and exit')
 
  50         group.add_argument('--once', action='store_true',
 
  51                            help="Download and apply updates only once. When "
 
  52                                 "not set, updates are continuously applied")
 
  53         group.add_argument('--catch-up', action='store_true',
 
  54                            help="Download and apply updates until no new "
 
  55                                 "data is available on the server")
 
  56         group.add_argument('--no-index', action='store_false', dest='do_index',
 
  57                            help=("Do not index the new data. Only usable "
 
  58                                  "together with --once"))
 
  59         group.add_argument('--osm2pgsql-cache', metavar='SIZE', type=int,
 
  60                            help='Size of cache to be used by osm2pgsql (in MB)')
 
  61         group = parser.add_argument_group('Download parameters')
 
  62         group.add_argument('--socket-timeout', dest='socket_timeout', type=int, default=60,
 
  63                            help='Set timeout for file downloads')
 
  66     def _init_replication(args):
 
  67         from ..tools import replication, refresh
 
  69         LOG.warning("Initialising replication updates")
 
  70         with connect(args.config.get_libpq_dsn()) as conn:
 
  71             replication.init_replication(conn, base_url=args.config.REPLICATION_URL)
 
  72             if args.update_functions:
 
  73                 LOG.warning("Create functions")
 
  74                 refresh.create_functions(conn, args.config, True, False)
 
  79     def _check_for_updates(args):
 
  80         from ..tools import replication
 
  82         with connect(args.config.get_libpq_dsn()) as conn:
 
  83             return replication.check_for_updates(conn, base_url=args.config.REPLICATION_URL)
 
  86     def _report_update(batchdate, start_import, start_index):
 
  87         def round_time(delta):
 
  88             return dt.timedelta(seconds=int(delta.total_seconds()))
 
  90         end = dt.datetime.now(dt.timezone.utc)
 
  91         LOG.warning("Update completed. Import: %s. %sTotal: %s. Remaining backlog: %s.",
 
  92                     round_time((start_index or end) - start_import),
 
  93                     "Indexing: {} ".format(round_time(end - start_index))
 
  94                     if start_index else '',
 
  95                     round_time(end - start_import),
 
  96                     round_time(end - batchdate))
 
 100     def _compute_update_interval(args):
 
 104         update_interval = args.config.get_int('REPLICATION_UPDATE_INTERVAL')
 
 105         # Sanity check to not overwhelm the Geofabrik servers.
 
 106         if 'download.geofabrik.de' in args.config.REPLICATION_URL\
 
 107            and update_interval < 86400:
 
 108             LOG.fatal("Update interval too low for download.geofabrik.de.\n"
 
 109                       "Please check install documentation "
 
 110                       "(https://nominatim.org/release-docs/latest/admin/Import-and-Update#"
 
 111                       "setting-up-the-update-process).")
 
 112             raise UsageError("Invalid replication update interval setting.")
 
 114         return update_interval
 
 119         from ..tools import replication
 
 120         from ..indexer.indexer import Indexer
 
 121         from ..tokenizer import factory as tokenizer_factory
 
 123         update_interval = UpdateReplication._compute_update_interval(args)
 
 125         params = args.osm2pgsql_options(default_cache=2000, default_threads=1)
 
 126         params.update(base_url=args.config.REPLICATION_URL,
 
 127                       update_interval=update_interval,
 
 128                       import_file=args.project_dir / 'osmosischange.osc',
 
 129                       max_diff_size=args.config.get_int('REPLICATION_MAX_DIFF'),
 
 130                       indexed_only=not args.once)
 
 133             if not args.do_index:
 
 134                 LOG.fatal("Indexing cannot be disabled when running updates continuously.")
 
 135                 raise UsageError("Bad argument '--no-index'.")
 
 136             recheck_interval = args.config.get_int('REPLICATION_RECHECK_INTERVAL')
 
 138         tokenizer = tokenizer_factory.get_tokenizer_for_db(args.config)
 
 139         indexer = Indexer(args.config.get_libpq_dsn(), tokenizer, args.threads or 1)
 
 142             with connect(args.config.get_libpq_dsn()) as conn:
 
 143                 start = dt.datetime.now(dt.timezone.utc)
 
 144                 state = replication.update(conn, params)
 
 145                 if state is not replication.UpdateState.NO_CHANGES:
 
 146                     status.log_status(conn, start, 'import')
 
 147                 batchdate, _, _ = status.get_status(conn)
 
 150             if state is not replication.UpdateState.NO_CHANGES and args.do_index:
 
 151                 index_start = dt.datetime.now(dt.timezone.utc)
 
 152                 indexer.index_full(analyse=False)
 
 154                 with connect(args.config.get_libpq_dsn()) as conn:
 
 155                     status.set_indexed(conn, True)
 
 156                     status.log_status(conn, index_start, 'index')
 
 161             if state is replication.UpdateState.NO_CHANGES and \
 
 162                args.catch_up or update_interval > 40*60:
 
 163                 while indexer.has_pending():
 
 164                     indexer.index_full(analyse=False)
 
 166             if LOG.isEnabledFor(logging.WARNING):
 
 167                 UpdateReplication._report_update(batchdate, start, index_start)
 
 169             if args.once or (args.catch_up and state is replication.UpdateState.NO_CHANGES):
 
 172             if state is replication.UpdateState.NO_CHANGES:
 
 173                 LOG.warning("No new changes. Sleeping for %d sec.", recheck_interval)
 
 174                 time.sleep(recheck_interval)
 
 179         socket.setdefaulttimeout(args.socket_timeout)
 
 182             return UpdateReplication._init_replication(args)
 
 184         if args.check_for_updates:
 
 185             return UpdateReplication._check_for_updates(args)
 
 187         UpdateReplication._update(args)