From c77877a93401dd2f87e3caefb7aa6f04d05f7c95 Mon Sep 17 00:00:00 2001 From: Sarah Hoffmann Date: Sun, 17 Jan 2021 20:05:41 +0100 Subject: [PATCH] implementaion of 'nominatim index' --- nominatim/cli.py | 31 +++++++++++++++++++++++++++++-- nominatim/db/async_connection.py | 20 +++++--------------- nominatim/indexer/indexer.py | 29 +++++++++++++++-------------- 3 files changed, 49 insertions(+), 31 deletions(-) diff --git a/nominatim/cli.py b/nominatim/cli.py index 8d4071db..acb6839f 100644 --- a/nominatim/cli.py +++ b/nominatim/cli.py @@ -11,6 +11,17 @@ from pathlib import Path from .config import Configuration from .admin.exec_utils import run_legacy_script +from .indexer.indexer import Indexer + +def _num_system_cpus(): + try: + cpus = len(os.sched_getaffinity(0)) + except NotImplementedError: + cpus = None + + return cpus or os.cpu_count() + + class CommandlineParser: """ Wraps some of the common functions for parsing the command line and setting up subcommands. @@ -297,11 +308,27 @@ class UpdateIndex: @staticmethod def add_args(parser): - pass + group = parser.add_argument_group('Filter arguments') + group.add_argument('--boundaries-only', action='store_true', + help="""Index only administrative boundaries.""") + group.add_argument('--no-boundaries', action='store_true', + help="""Index everything except administrative boundaries.""") + group.add_argument('--minrank', '-r', type=int, metavar='RANK', default=0, + help='Minimum/starting rank') + group.add_argument('--maxrank', '-R', type=int, metavar='RANK', default=30, + help='Maximum/finishing rank') @staticmethod def run(args): - return run_legacy_script('update.php', '--index', nominatim_env=args) + indexer = Indexer(args.config.get_libpq_dsn(), + args.threads or _num_system_cpus() or 1) + + if not args.no_boundaries: + indexer.index_boundaries(args.minrank, args.maxrank) + if not args.boundaries_only: + indexer.index_by_rank(args.minrank, args.maxrank) + + return 0 class UpdateRefresh: diff --git a/nominatim/db/async_connection.py b/nominatim/db/async_connection.py index 85b84431..45e83664 100644 --- a/nominatim/db/async_connection.py +++ b/nominatim/db/async_connection.py @@ -11,26 +11,14 @@ from psycopg2.extras import wait_select LOG = logging.getLogger() -def make_connection(options, asynchronous=False): - """ Create a psycopg2 connection from the given options. - """ - params = {'dbname' : options.dbname, - 'user' : options.user, - 'password' : options.password, - 'host' : options.host, - 'port' : options.port, - 'async' : asynchronous} - - return psycopg2.connect(**params) - class DBConnection: """ A single non-blocking database connection. """ - def __init__(self, options): + def __init__(self, dsn): self.current_query = None self.current_params = None - self.options = options + self.dsn = dsn self.conn = None self.cursor = None @@ -46,7 +34,9 @@ class DBConnection: self.cursor.close() self.conn.close() - self.conn = make_connection(self.options, asynchronous=True) + # Use a dict to hand in the parameters because async is a reserved + # word in Python3. + self.conn = psycopg2.connect(**{'dsn' : self.dsn, 'async' : True}) self.wait() self.cursor = self.conn.cursor() diff --git a/nominatim/indexer/indexer.py b/nominatim/indexer/indexer.py index 52046456..d86303c4 100644 --- a/nominatim/indexer/indexer.py +++ b/nominatim/indexer/indexer.py @@ -5,8 +5,10 @@ Main work horse for indexing (computing addresses) the database. import logging import select +import psycopg2 + from .progress import ProgressLogger -from db.async_connection import DBConnection, make_connection +from ..db.async_connection import DBConnection LOG = logging.getLogger() @@ -94,34 +96,33 @@ class Indexer: """ Main indexing routine. """ - def __init__(self, opts): - self.minrank = max(1, opts.minrank) - self.maxrank = min(30, opts.maxrank) - self.conn = make_connection(opts) - self.threads = [DBConnection(opts) for _ in range(opts.threads)] + def __init__(self, dsn, num_threads): + self.conn = psycopg2.connect(dsn) + self.threads = [DBConnection(dsn) for _ in range(num_threads)] - def index_boundaries(self): + def index_boundaries(self, minrank, maxrank): LOG.warning("Starting indexing boundaries using %s threads", len(self.threads)) - for rank in range(max(self.minrank, 5), min(self.maxrank, 26)): + for rank in range(max(minrank, 5), min(maxrank, 26)): self.index(BoundaryRunner(rank)) - def index_by_rank(self): + def index_by_rank(self, minrank, maxrank): """ Run classic indexing by rank. """ + maxrank = min(maxrank, 30) LOG.warning("Starting indexing rank (%i to %i) using %i threads", - self.minrank, self.maxrank, len(self.threads)) + minrank, maxrank, len(self.threads)) - for rank in range(max(1, self.minrank), self.maxrank): + for rank in range(max(1, minrank), maxrank): self.index(RankRunner(rank)) - if self.maxrank == 30: + if maxrank == 30: self.index(RankRunner(0)) self.index(InterpolationRunner(), 20) - self.index(RankRunner(self.maxrank), 20) + self.index(RankRunner(30), 20) else: - self.index(RankRunner(self.maxrank)) + self.index(RankRunner(maxrank)) def index(self, obj, batch=1): """ Index a single rank or table. `obj` describes the SQL to use -- 2.45.1