]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/clicmd/replication.py
Merge pull request #2475 from lonvia/catchup-mode
[nominatim.git] / nominatim / clicmd / replication.py
1 """
2 Implementation of the 'replication' sub-command.
3 """
4 import datetime as dt
5 import logging
6 import socket
7 import time
8
9 from nominatim.db import status
10 from nominatim.db.connection import connect
11 from nominatim.errors import UsageError
12
13 LOG = logging.getLogger()
14
15 # Do not repeat documentation of subcommand classes.
16 # pylint: disable=C0111
17 # Using non-top-level imports to make pyosmium optional for replication only.
18 # pylint: disable=E0012,C0415
19
20 class UpdateReplication:
21     """\
22     Update the database using an online replication service.
23
24     An OSM replication service is an online service that provides regular
25     updates (OSM diff files) for the planet or update they provide. The OSMF
26     provides the primary replication service for the full planet at
27     https://planet.osm.org/replication/ but there are other providers of
28     extracts of OSM data who provide such a service as well.
29
30     This sub-command allows to set up such a replication service and download
31     and import updates at regular intervals. You need to call '--init' once to
32     set up the process or whenever you change the replication configuration
33     parameters. Without any arguments, the sub-command will go into a loop and
34     continuously apply updates as they become available. Giving `--once` just
35     downloads and imports the next batch of updates.
36     """
37
38     @staticmethod
39     def add_args(parser):
40         group = parser.add_argument_group('Arguments for initialisation')
41         group.add_argument('--init', action='store_true',
42                            help='Initialise the update process')
43         group.add_argument('--no-update-functions', dest='update_functions',
44                            action='store_false',
45                            help="Do not update the trigger function to "
46                                 "support differential updates (EXPERT)")
47         group = parser.add_argument_group('Arguments for updates')
48         group.add_argument('--check-for-updates', action='store_true',
49                            help='Check if new updates are available and exit')
50         group.add_argument('--once', action='store_true',
51                            help="Download and apply updates only once. When "
52                                 "not set, updates are continuously applied")
53         group.add_argument('--catch-up', action='store_true',
54                            help="Download and apply updates until no new "
55                                 "data is available on the server")
56         group.add_argument('--no-index', action='store_false', dest='do_index',
57                            help=("Do not index the new data. Only usable "
58                                  "together with --once"))
59         group.add_argument('--osm2pgsql-cache', metavar='SIZE', type=int,
60                            help='Size of cache to be used by osm2pgsql (in MB)')
61         group = parser.add_argument_group('Download parameters')
62         group.add_argument('--socket-timeout', dest='socket_timeout', type=int, default=60,
63                            help='Set timeout for file downloads')
64
65     @staticmethod
66     def _init_replication(args):
67         from ..tools import replication, refresh
68
69         LOG.warning("Initialising replication updates")
70         with connect(args.config.get_libpq_dsn()) as conn:
71             replication.init_replication(conn, base_url=args.config.REPLICATION_URL)
72             if args.update_functions:
73                 LOG.warning("Create functions")
74                 refresh.create_functions(conn, args.config, True, False)
75         return 0
76
77
78     @staticmethod
79     def _check_for_updates(args):
80         from ..tools import replication
81
82         with connect(args.config.get_libpq_dsn()) as conn:
83             return replication.check_for_updates(conn, base_url=args.config.REPLICATION_URL)
84
85     @staticmethod
86     def _report_update(batchdate, start_import, start_index):
87         def round_time(delta):
88             return dt.timedelta(seconds=int(delta.total_seconds()))
89
90         end = dt.datetime.now(dt.timezone.utc)
91         LOG.warning("Update completed. Import: %s. %sTotal: %s. Remaining backlog: %s.",
92                     round_time((start_index or end) - start_import),
93                     "Indexing: {} ".format(round_time(end - start_index))
94                     if start_index else '',
95                     round_time(end - start_import),
96                     round_time(end - batchdate))
97
98
99     @staticmethod
100     def _compute_update_interval(args):
101         if args.catch_up:
102             return 0
103
104         update_interval = args.config.get_int('REPLICATION_UPDATE_INTERVAL')
105         # Sanity check to not overwhelm the Geofabrik servers.
106         if 'download.geofabrik.de' in args.config.REPLICATION_URL\
107            and update_interval < 86400:
108             LOG.fatal("Update interval too low for download.geofabrik.de.\n"
109                       "Please check install documentation "
110                       "(https://nominatim.org/release-docs/latest/admin/Import-and-Update#"
111                       "setting-up-the-update-process).")
112             raise UsageError("Invalid replication update interval setting.")
113
114         return update_interval
115
116
117     @staticmethod
118     def _update(args):
119         from ..tools import replication
120         from ..indexer.indexer import Indexer
121         from ..tokenizer import factory as tokenizer_factory
122
123         update_interval = UpdateReplication._compute_update_interval(args)
124
125         params = args.osm2pgsql_options(default_cache=2000, default_threads=1)
126         params.update(base_url=args.config.REPLICATION_URL,
127                       update_interval=update_interval,
128                       import_file=args.project_dir / 'osmosischange.osc',
129                       max_diff_size=args.config.get_int('REPLICATION_MAX_DIFF'),
130                       indexed_only=not args.once)
131
132         if not args.once:
133             if not args.do_index:
134                 LOG.fatal("Indexing cannot be disabled when running updates continuously.")
135                 raise UsageError("Bad argument '--no-index'.")
136             recheck_interval = args.config.get_int('REPLICATION_RECHECK_INTERVAL')
137
138         tokenizer = tokenizer_factory.get_tokenizer_for_db(args.config)
139
140         while True:
141             with connect(args.config.get_libpq_dsn()) as conn:
142                 start = dt.datetime.now(dt.timezone.utc)
143                 state = replication.update(conn, params)
144                 if state is not replication.UpdateState.NO_CHANGES:
145                     status.log_status(conn, start, 'import')
146                 batchdate, _, _ = status.get_status(conn)
147                 conn.commit()
148
149             if state is not replication.UpdateState.NO_CHANGES and args.do_index:
150                 index_start = dt.datetime.now(dt.timezone.utc)
151                 indexer = Indexer(args.config.get_libpq_dsn(), tokenizer,
152                                   args.threads or 1)
153                 indexer.index_full(analyse=False)
154
155                 with connect(args.config.get_libpq_dsn()) as conn:
156                     status.set_indexed(conn, True)
157                     status.log_status(conn, index_start, 'index')
158                     conn.commit()
159             else:
160                 index_start = None
161
162             if state is replication.UpdateState.NO_CHANGES and \
163                args.catch_up or update_interval > 40*60:
164                 while indexer.has_pending():
165                     indexer.index_full(analyse=False)
166
167             if LOG.isEnabledFor(logging.WARNING):
168                 UpdateReplication._report_update(batchdate, start, index_start)
169
170             if args.once or (args.catch_up and state is replication.UpdateState.NO_CHANGES):
171                 break
172
173             if state is replication.UpdateState.NO_CHANGES:
174                 LOG.warning("No new changes. Sleeping for %d sec.", recheck_interval)
175                 time.sleep(recheck_interval)
176
177
178     @staticmethod
179     def run(args):
180         socket.setdefaulttimeout(args.socket_timeout)
181
182         if args.init:
183             return UpdateReplication._init_replication(args)
184
185         if args.check_for_updates:
186             return UpdateReplication._check_for_updates(args)
187
188         UpdateReplication._update(args)
189         return 0