]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/clicmd/replication.py
run Tiger import with parallel threads per default
[nominatim.git] / nominatim / clicmd / replication.py
1 """
2 Implementation of the 'replication' sub-command.
3 """
4 import datetime as dt
5 import logging
6 import socket
7 import time
8
9 from nominatim.db import status
10 from nominatim.db.connection import connect
11 from nominatim.errors import UsageError
12
13 LOG = logging.getLogger()
14
15 # Do not repeat documentation of subcommand classes.
16 # pylint: disable=C0111
17 # Using non-top-level imports to make pyosmium optional for replication only.
18 # pylint: disable=E0012,C0415
19
20 class UpdateReplication:
21     """\
22     Update the database using an online replication service.
23
24     An OSM replication service is an online service that provides regular
25     updates (OSM diff files) for the planet or update they provide. The OSMF
26     provides the primary replication service for the full planet at
27     https://planet.osm.org/replication/ but there are other providers of
28     extracts of OSM data who provide such a service as well.
29
30     This sub-command allows to set up such a replication service and download
31     and import updates at regular intervals. You need to call '--init' once to
32     set up the process or whenever you change the replication configuration
33     parameters. Without any arguments, the sub-command will go into a loop and
34     continuously apply updates as they become available. Giving `--once` just
35     downloads and imports the next batch of updates.
36     """
37
38     @staticmethod
39     def add_args(parser):
40         group = parser.add_argument_group('Arguments for initialisation')
41         group.add_argument('--init', action='store_true',
42                            help='Initialise the update process')
43         group.add_argument('--no-update-functions', dest='update_functions',
44                            action='store_false',
45                            help=("Do not update the trigger function to "
46                                  "support differential updates."))
47         group = parser.add_argument_group('Arguments for updates')
48         group.add_argument('--check-for-updates', action='store_true',
49                            help='Check if new updates are available and exit')
50         group.add_argument('--once', action='store_true',
51                            help=("Download and apply updates only once. When "
52                                  "not set, updates are continuously applied"))
53         group.add_argument('--no-index', action='store_false', dest='do_index',
54                            help=("Do not index the new data. Only usable "
55                                  "together with --once"))
56         group.add_argument('--osm2pgsql-cache', metavar='SIZE', type=int,
57                            help='Size of cache to be used by osm2pgsql (in MB)')
58         group = parser.add_argument_group('Download parameters')
59         group.add_argument('--socket-timeout', dest='socket_timeout', type=int, default=60,
60                            help='Set timeout for file downloads')
61
62     @staticmethod
63     def _init_replication(args):
64         from ..tools import replication, refresh
65
66         LOG.warning("Initialising replication updates")
67         with connect(args.config.get_libpq_dsn()) as conn:
68             replication.init_replication(conn, base_url=args.config.REPLICATION_URL)
69             if args.update_functions:
70                 LOG.warning("Create functions")
71                 refresh.create_functions(conn, args.config, True, False)
72         return 0
73
74
75     @staticmethod
76     def _check_for_updates(args):
77         from ..tools import replication
78
79         with connect(args.config.get_libpq_dsn()) as conn:
80             return replication.check_for_updates(conn, base_url=args.config.REPLICATION_URL)
81
82     @staticmethod
83     def _report_update(batchdate, start_import, start_index):
84         def round_time(delta):
85             return dt.timedelta(seconds=int(delta.total_seconds()))
86
87         end = dt.datetime.now(dt.timezone.utc)
88         LOG.warning("Update completed. Import: %s. %sTotal: %s. Remaining backlog: %s.",
89                     round_time((start_index or end) - start_import),
90                     "Indexing: {} ".format(round_time(end - start_index))
91                     if start_index else '',
92                     round_time(end - start_import),
93                     round_time(end - batchdate))
94
95     @staticmethod
96     def _update(args):
97         from ..tools import replication
98         from ..indexer.indexer import Indexer
99         from ..tokenizer import factory as tokenizer_factory
100
101         params = args.osm2pgsql_options(default_cache=2000, default_threads=1)
102         params.update(base_url=args.config.REPLICATION_URL,
103                       update_interval=args.config.get_int('REPLICATION_UPDATE_INTERVAL'),
104                       import_file=args.project_dir / 'osmosischange.osc',
105                       max_diff_size=args.config.get_int('REPLICATION_MAX_DIFF'),
106                       indexed_only=not args.once)
107
108         # Sanity check to not overwhelm the Geofabrik servers.
109         if 'download.geofabrik.de' in params['base_url']\
110            and params['update_interval'] < 86400:
111             LOG.fatal("Update interval too low for download.geofabrik.de.\n"
112                       "Please check install documentation "
113                       "(https://nominatim.org/release-docs/latest/admin/Import-and-Update#"
114                       "setting-up-the-update-process).")
115             raise UsageError("Invalid replication update interval setting.")
116
117         if not args.once:
118             if not args.do_index:
119                 LOG.fatal("Indexing cannot be disabled when running updates continuously.")
120                 raise UsageError("Bad argument '--no-index'.")
121             recheck_interval = args.config.get_int('REPLICATION_RECHECK_INTERVAL')
122
123         tokenizer = tokenizer_factory.get_tokenizer_for_db(args.config)
124
125         while True:
126             with connect(args.config.get_libpq_dsn()) as conn:
127                 start = dt.datetime.now(dt.timezone.utc)
128                 state = replication.update(conn, params)
129                 if state is not replication.UpdateState.NO_CHANGES:
130                     status.log_status(conn, start, 'import')
131                 batchdate, _, _ = status.get_status(conn)
132                 conn.commit()
133
134             if state is not replication.UpdateState.NO_CHANGES and args.do_index:
135                 index_start = dt.datetime.now(dt.timezone.utc)
136                 indexer = Indexer(args.config.get_libpq_dsn(), tokenizer,
137                                   args.threads or 1)
138                 indexer.index_boundaries(0, 30)
139                 indexer.index_by_rank(0, 30)
140
141                 with connect(args.config.get_libpq_dsn()) as conn:
142                     status.set_indexed(conn, True)
143                     status.log_status(conn, index_start, 'index')
144                     conn.commit()
145             else:
146                 index_start = None
147
148             if LOG.isEnabledFor(logging.WARNING):
149                 UpdateReplication._report_update(batchdate, start, index_start)
150
151             if args.once:
152                 break
153
154             if state is replication.UpdateState.NO_CHANGES:
155                 LOG.warning("No new changes. Sleeping for %d sec.", recheck_interval)
156                 time.sleep(recheck_interval)
157
158
159     @staticmethod
160     def run(args):
161         socket.setdefaulttimeout(args.socket_timeout)
162
163         if args.init:
164             return UpdateReplication._init_replication(args)
165
166         if args.check_for_updates:
167             return UpdateReplication._check_for_updates(args)
168
169         UpdateReplication._update(args)
170         return 0