]> git.openstreetmap.org Git - nominatim.git/commitdiff
port replication update function to python
authorSarah Hoffmann <lonvia@denofr.de>
Sat, 30 Jan 2021 14:50:34 +0000 (15:50 +0100)
committerSarah Hoffmann <lonvia@denofr.de>
Sat, 30 Jan 2021 14:50:34 +0000 (15:50 +0100)
13 files changed:
lib/admin/update.php
nominatim/cli.py
nominatim/config.py
nominatim/db/status.py
nominatim/tools/exec_utils.py
nominatim/tools/replication.py
test/python/conftest.py
test/python/test_cli.py
test/python/test_config.py
test/python/test_db_status.py
test/python/test_tools_exec_utils.py
test/python/test_tools_replication.py
utils/osm_file_date.py [deleted file]

index a2ff6158533d09f1b8d97a6a3ee53f329127b1fe..fba5300b07ee4ec0d5791d01895bfe16eb1310e3 100644 (file)
@@ -111,9 +111,6 @@ if ($aResult['verbose']) {
     $oNominatimCmd->addParams('--verbose');
 }
 
-$sPyosmiumBin = getSetting('PYOSMIUM_BINARY');
-$sBaseURL = getSetting('REPLICATION_URL');
-
 
 if ($aResult['init-updates']) {
     $oCmd = (clone($oNominatimCmd))->addParams('replication', '--init');
@@ -203,7 +200,10 @@ if ($aResult['recompute-word-counts']) {
 }
 
 if ($aResult['index']) {
-    (clone $oNominatimCmd)->addParams('index', '--minrank', $aResult['index-rank'])->run();
+    (clone $oNominatimCmd)
+        ->addParams('index', '--minrank', $aResult['index-rank'])
+        ->addParams('--threads', $aResult['index-instances'])
+        ->run();
 }
 
 if ($aResult['update-address-levels']) {
@@ -228,146 +228,17 @@ if ($aResult['recompute-importance']) {
 }
 
 if ($aResult['import-osmosis'] || $aResult['import-osmosis-all']) {
-    //
-    if (strpos($sBaseURL, 'download.geofabrik.de') !== false && getSetting('REPLICATION_UPDATE_INTERVAL') < 86400) {
-        fail('Error: Update interval too low for download.geofabrik.de. ' .
-             "Please check install documentation (https://nominatim.org/release-docs/latest/admin/Import-and-Update#setting-up-the-update-process)\n");
+    $oCmd = (clone($oNominatimCmd))
+              ->addParams('replication')
+              ->addParams('--threads', $aResult['index-instances']);
+
+    if (!$aResult['import-osmosis-all']) {
+        $oCmd->addParams('--once');
     }
 
-    $sImportFile = CONST_InstallDir.'/osmosischange.osc';
-
-    $oCMDDownload = (new \Nominatim\Shell($sPyosmiumBin))
-                    ->addParams('--server', $sBaseURL)
-                    ->addParams('--outfile', $sImportFile)
-                    ->addParams('--size', getSetting('REPLICATION_MAX_DIFF'));
-
-    $oCMDImport = (clone $oOsm2pgsqlCmd)->addParams($sImportFile);
-
-    while (true) {
-        $fStartTime = time();
-        $aLastState = $oDB->getRow('SELECT *, EXTRACT (EPOCH FROM lastimportdate) as unix_ts FROM import_status');
-
-        if (!$aLastState['sequence_id']) {
-            echo "Updates not set up. Please run ./utils/update.php --init-updates.\n";
-            exit(1);
-        }
-
-        echo 'Currently at sequence '.$aLastState['sequence_id'].' ('.$aLastState['lastimportdate'].') - '.$aLastState['indexed']." indexed\n";
-
-        $sBatchEnd = $aLastState['lastimportdate'];
-        $iEndSequence = $aLastState['sequence_id'];
-
-        if ($aLastState['indexed']) {
-            // Sleep if the update interval has not yet been reached.
-            $fNextUpdate = $aLastState['unix_ts'] + getSetting('REPLICATION_UPDATE_INTERVAL');
-            if ($fNextUpdate > $fStartTime) {
-                $iSleepTime = $fNextUpdate - $fStartTime;
-                echo "Waiting for next update for $iSleepTime sec.";
-                sleep($iSleepTime);
-            }
-
-            // Download the next batch of changes.
-            do {
-                $fCMDStartTime = time();
-                $iNextSeq = (int) $aLastState['sequence_id'];
-                unset($aOutput);
-
-                $oCMD = (clone $oCMDDownload)->addParams('--start-id', $iNextSeq);
-                echo $oCMD->escapedCmd()."\n";
-                if (file_exists($sImportFile)) {
-                    unlink($sImportFile);
-                }
-                exec($oCMD->escapedCmd(), $aOutput, $iResult);
-
-                if ($iResult == 3) {
-                    $sSleep = getSetting('REPLICATION_RECHECK_INTERVAL');
-                    echo 'No new updates. Sleeping for '.$sSleep." sec.\n";
-                    sleep($sSleep);
-                } elseif ($iResult != 0) {
-                    echo 'ERROR: updates failed.';
-                    exit($iResult);
-                } else {
-                    $iEndSequence = (int)$aOutput[0];
-                }
-            } while ($iResult);
-
-            // get the newest object from the diff file
-            $sBatchEnd = 0;
-            $iRet = 0;
-            $oCMD = new \Nominatim\Shell(CONST_BinDir.'/osm_file_date.py', $sImportFile);
-            exec($oCMD->escapedCmd(), $sBatchEnd, $iRet);
-            if ($iRet == 5) {
-                echo "Diff file is empty. skipping import.\n";
-                if (!$aResult['import-osmosis-all']) {
-                    exit(0);
-                } else {
-                    continue;
-                }
-            }
-            if ($iRet != 0) {
-                fail('Error getting date from diff file.');
-            }
-            $sBatchEnd = $sBatchEnd[0];
-
-            // Import the file
-            $fCMDStartTime = time();
-
-
-            echo $oCMDImport->escapedCmd()."\n";
-            unset($sJunk);
-            $iErrorLevel = $oCMDImport->run();
-            if ($iErrorLevel) {
-                echo "Error executing osm2pgsql: $iErrorLevel\n";
-                exit($iErrorLevel);
-            }
-
-            // write the update logs
-            $iFileSize = filesize($sImportFile);
-            $sSQL = 'INSERT INTO import_osmosis_log';
-            $sSQL .= '(batchend, batchseq, batchsize, starttime, endtime, event)';
-            $sSQL .= " values ('$sBatchEnd',$iEndSequence,$iFileSize,'";
-            $sSQL .= date('Y-m-d H:i:s', $fCMDStartTime)."','";
-            $sSQL .= date('Y-m-d H:i:s')."','import')";
-            var_Dump($sSQL);
-            $oDB->exec($sSQL);
-
-            // update the status
-            $sSQL = "UPDATE import_status SET lastimportdate = '$sBatchEnd', indexed=false, sequence_id = $iEndSequence";
-            var_Dump($sSQL);
-            $oDB->exec($sSQL);
-            echo date('Y-m-d H:i:s')." Completed download step for $sBatchEnd in ".round((time()-$fCMDStartTime)/60, 2)." minutes\n";
-        }
-
-        // Index file
-        if (!$aResult['no-index']) {
-            $fCMDStartTime = time();
-
-            $oThisIndexCmd = clone($oNominatimCmd);
-            $oThisIndexCmd->addParams('index');
-            echo $oThisIndexCmd->escapedCmd()."\n";
-            $iErrorLevel = $oThisIndexCmd->run();
-            if ($iErrorLevel) {
-                echo "Error: $iErrorLevel\n";
-                exit($iErrorLevel);
-            }
-
-            $sSQL = 'INSERT INTO import_osmosis_log';
-            $sSQL .= '(batchend, batchseq, batchsize, starttime, endtime, event)';
-            $sSQL .= " values ('$sBatchEnd',$iEndSequence,NULL,'";
-            $sSQL .= date('Y-m-d H:i:s', $fCMDStartTime)."','";
-            $sSQL .= date('Y-m-d H:i:s')."','index')";
-            var_Dump($sSQL);
-            $oDB->exec($sSQL);
-            echo date('Y-m-d H:i:s')." Completed index step for $sBatchEnd in ".round((time()-$fCMDStartTime)/60, 2)." minutes\n";
-        } else {
-            if ($aResult['import-osmosis-all']) {
-                echo "Error: --no-index cannot be used with continuous imports (--import-osmosis-all).\n";
-                exit(1);
-            }
-        }
-
-        $fDuration = time() - $fStartTime;
-        echo date('Y-m-d H:i:s')." Completed all for $sBatchEnd in ".round($fDuration/60, 2)." minutes\n";
-        if (!$aResult['import-osmosis-all']) exit(0);
+    if ($aResult['no-index']) {
+        $oCmd->addParams('--no-index');
     }
+
+    exit($oCmd->run());
 }
index edfddcbc721e2b4eb5ee4b7afec42e9f7b62ba51..f02277aebbce8e476e98f7989218eb395f839ac7 100644 (file)
@@ -2,8 +2,10 @@
 Command-line interface to the Nominatim functions for import, update,
 database administration and querying.
 """
-import sys
+import datetime as dt
 import os
+import sys
+import time
 import argparse
 import logging
 from pathlib import Path
@@ -11,6 +13,7 @@ from pathlib import Path
 from .config import Configuration
 from .tools.exec_utils import run_legacy_script, run_api_script
 from .db.connection import connect
+from .db import status
 
 LOG = logging.getLogger()
 
@@ -88,6 +91,17 @@ class CommandlineParser:
 
         return args.command.run(args)
 
+
+def _osm2pgsql_options_from_args(args, default_cache, default_threads):
+    """ Set up the stanadrd osm2pgsql from the command line arguments.
+    """
+    return dict(osm2pgsql=args.osm2pgsql_path,
+                osm2pgsql_cache=args.osm2pgsql_cache or default_cache,
+                osm2pgsql_style=args.config.get_import_style_file(),
+                threads=args.threads or default_threads,
+                dsn=args.config.get_libpq_dsn(),
+                flatnode_file=args.config.FLATNODE_FILE)
+
 ##### Subcommand classes
 #
 # Each class needs to implement two functions: add_args() adds the CLI parameters
@@ -231,6 +245,88 @@ class UpdateReplication:
         group.add_argument('--no-index', action='store_false', dest='do_index',
                            help="""Do not index the new data. Only applicable
                                    together with --once""")
+        group.add_argument('--osm2pgsql-cache', metavar='SIZE', type=int,
+                           help='Size of cache to be used by osm2pgsql (in MB)')
+
+    @staticmethod
+    def _init_replication(args):
+        from .tools import replication, refresh
+
+        LOG.warning("Initialising replication updates")
+        conn = connect(args.config.get_libpq_dsn())
+        replication.init_replication(conn, base_url=args.config.REPLICATION_URL)
+        if args.update_functions:
+            LOG.warning("Create functions")
+            refresh.create_functions(conn, args.config, args.data_dir,
+                                     True, False)
+        conn.close()
+        return 0
+
+
+    @staticmethod
+    def _check_for_updates(args):
+        from .tools import replication
+
+        conn = connect(args.config.get_libpq_dsn())
+        ret = replication.check_for_updates(conn, base_url=args.config.REPLICATION_URL)
+        conn.close()
+        return ret
+
+
+    @staticmethod
+    def _update(args):
+        from .tools import replication
+        from .indexer.indexer import Indexer
+
+        params = _osm2pgsql_options_from_args(args, 2000, 1)
+        params.update(base_url=args.config.REPLICATION_URL,
+                      update_interval=args.config.get_int('REPLICATION_UPDATE_INTERVAL'),
+                      import_file=args.project_dir / 'osmosischange.osc',
+                      max_diff_size=args.config.get_int('REPLICATION_MAX_DIFF'),
+                      indexed_only=not args.once)
+
+        # Sanity check to not overwhelm the Geofabrik servers.
+        if 'download.geofabrik.de'in params['base_url']\
+           and params['update_interval'] < 86400:
+            LOG.fatal("Update interval too low for download.geofabrik.de.\n"
+                      "Please check install documentation "
+                      "(https://nominatim.org/release-docs/latest/admin/Import-and-Update#"
+                      "setting-up-the-update-process).")
+            raise RuntimeError("Invalid replication update interval setting.")
+
+        if not args.once:
+            if not args.do_index:
+                LOG.fatal("Indexing cannot be disabled when running updates continuously.")
+                raise RuntimeError("Bad arguments.")
+            recheck_interval = args.config.get_int('REPLICATION_RECHECK_INTERVAL')
+
+        while True:
+            conn = connect(args.config.get_libpq_dsn())
+            start = dt.datetime.now(dt.timezone.utc)
+            state = replication.update(conn, params)
+            status.log_status(conn, start, 'import')
+            conn.close()
+
+            if state is not replication.UpdateState.NO_CHANGES and args.do_index:
+                start = dt.datetime.now(dt.timezone.utc)
+                indexer = Indexer(args.config.get_libpq_dsn(),
+                                  args.threads or 1)
+                indexer.index_boundaries(0, 30)
+                indexer.index_by_rank(0, 30)
+
+                conn = connect(args.config.get_libpq_dsn())
+                status.set_indexed(conn, True)
+                status.log_status(conn, start, 'index')
+                conn.close()
+
+            if args.once:
+                break
+
+            if state is replication.UpdateState.NO_CHANGES:
+                LOG.warning("No new changes. Sleeping for %d sec.", recheck_interval)
+                time.sleep(recheck_interval)
+
+        return state.value
 
     @staticmethod
     def run(args):
@@ -241,35 +337,13 @@ class UpdateReplication:
                       "To install pyosmium via pip: pip3 install osmium")
             return 1
 
-        from .tools import replication, refresh
-
-        conn = connect(args.config.get_libpq_dsn())
-
-        params = ['update.php']
         if args.init:
-            LOG.warning("Initialising replication updates")
-            replication.init_replication(conn, args.config.REPLICATION_URL)
-            if args.update_functions:
-                LOG.warning("Create functions")
-                refresh.create_functions(conn, args.config, args.data_dir,
-                                         True, False)
-            conn.close()
-            return 0
+            return UpdateReplication._init_replication(args)
 
         if args.check_for_updates:
-            ret = replication.check_for_updates(conn, args.config.REPLICATION_URL)
-            conn.close()
-            return ret
-
-        if args.once:
-            params.append('--import-osmosis')
-        else:
-            params.append('--import-osmosis-all')
-        if not args.do_index:
-            params.append('--no-index')
-
-        return run_legacy_script(*params, nominatim_env=args)
+            return UpdateReplication._check_for_updates(args)
 
+        return UpdateReplication._update(args)
 
 class UpdateAddData:
     """\
@@ -350,8 +424,11 @@ class UpdateIndex:
         if not args.boundaries_only:
             indexer.index_by_rank(args.minrank, args.maxrank)
 
-        if not args.no_boundaries and not args.boundaries_only:
-            indexer.update_status_table()
+        if not args.no_boundaries and not args.boundaries_only \
+           and args.minrank == 0 and args.maxrank == 30:
+            conn = connect(args.config.get_libpq_dsn())
+            status.set_indexed(conn, True)
+            conn.close()
 
         return 0
 
@@ -390,25 +467,31 @@ class UpdateRefresh:
     def run(args):
         from .tools import refresh
 
-        conn = connect(args.config.get_libpq_dsn())
-
         if args.postcodes:
             LOG.warning("Update postcodes centroid")
+            conn = connect(args.config.get_libpq_dsn())
             refresh.update_postcodes(conn, args.data_dir)
+            conn.close()
 
         if args.word_counts:
             LOG.warning('Recompute frequency of full-word search terms')
+            conn = connect(args.config.get_libpq_dsn())
             refresh.recompute_word_counts(conn, args.data_dir)
+            conn.close()
 
         if args.address_levels:
             cfg = Path(args.config.ADDRESS_LEVEL_CONFIG)
             LOG.warning('Updating address levels from %s', cfg)
+            conn = connect(args.config.get_libpq_dsn())
             refresh.load_address_levels_from_file(conn, cfg)
+            conn.close()
 
         if args.functions:
             LOG.warning('Create functions')
+            conn = connect(args.config.get_libpq_dsn())
             refresh.create_functions(conn, args.config, args.data_dir,
                                      args.diffs, args.enable_debug_statements)
+            conn.close()
 
         if args.wiki_data:
             run_legacy_script('setup.php', '--import-wikipedia-articles',
@@ -421,8 +504,6 @@ class UpdateRefresh:
             run_legacy_script('setup.php', '--setup-website',
                               nominatim_env=args, throw_on_fail=True)
 
-        conn.close()
-
         return 0
 
 
index 3f75ce33eecb5e46839984a3df1f700ba0a51fc4..271d2d4d4ee20b0d37fde49de1429b5b9da7f0fc 100644 (file)
@@ -1,10 +1,14 @@
 """
 Nominatim configuration accessor.
 """
+import logging
 import os
+from pathlib import Path
 
 from dotenv import dotenv_values
 
+LOG = logging.getLogger()
+
 class Configuration:
     """ Load and manage the project configuration.
 
@@ -21,6 +25,7 @@ class Configuration:
 
     def __init__(self, project_dir, config_dir):
         self.project_dir = project_dir
+        self.config_dir = config_dir
         self._config = dotenv_values(str((config_dir / 'env.defaults').resolve()))
         if project_dir is not None:
             self._config.update(dotenv_values(str((project_dir / '.env').resolve())))
@@ -38,24 +43,56 @@ class Configuration:
         return os.environ.get(name) or self._config[name]
 
     def get_bool(self, name):
-        """ Return the given configuration parameters as a boolean.
+        """ Return the given configuration parameter as a boolean.
             Values of '1', 'yes' and 'true' are accepted as truthy values,
             everything else is interpreted as false.
         """
         return self.__getattr__(name).lower() in ('1', 'yes', 'true')
 
+
+    def get_int(self, name):
+        """ Return the given configuration parameter as an int.
+        """
+        try:
+            return int(self.__getattr__(name))
+        except ValueError:
+            LOG.fatal("Invalid setting NOMINATIM_%s. Needs to be a number.", name)
+            raise
+
+
     def get_libpq_dsn(self):
         """ Get configured database DSN converted into the key/value format
             understood by libpq and psycopg.
         """
         dsn = self.DATABASE_DSN
 
+        def quote_param(param):
+            key, val = param.split('=')
+            val = val.replace('\\', '\\\\').replace("'", "\\'")
+            if ' ' in val:
+                val = "'" + val + "'"
+            return key + '=' + val
+
         if dsn.startswith('pgsql:'):
             # Old PHP DSN format. Convert before returning.
-            return dsn[6:].replace(';', ' ')
+            return ' '.join([quote_param(p) for p in dsn[6:].split(';')])
 
         return dsn
 
+
+    def get_import_style_file(self):
+        """ Return the import style file as a path object. Translates the
+            name of the standard styles automatically into a file in the
+            config style.
+        """
+        style = self.__getattr__('IMPORT_STYLE')
+
+        if style in ('admin', 'street', 'address', 'full', 'extratags'):
+            return self.config_dir / 'import-{}.style'.format(style)
+
+        return Path(style)
+
+
     def get_os_env(self):
         """ Return a copy of the OS environment with the Nominatim configuration
             merged in.
index a0454771e0e0653d386d480050082fa047e0ca2c..9d7344c48754e93313be543a60d3120ed02fcb61 100644 (file)
@@ -1,5 +1,5 @@
 """
-Access and helper functions for the status table.
+Access and helper functions for the status and status log table.
 """
 import datetime as dt
 import logging
@@ -61,3 +61,21 @@ def get_status(conn):
 
         row = cur.fetchone()
         return row['lastimportdate'], row['sequence_id'], row['indexed']
+
+
+def set_indexed(conn, state):
+    """ Set the indexed flag in the status table to the given state.
+    """
+    with conn.cursor() as cur:
+        cur.execute("UPDATE import_status SET indexed = %s", (state, ))
+    conn.commit()
+
+
+def log_status(conn, start, event, batchsize=None):
+    """ Write a new status line to the `import_osmosis_log` table.
+    """
+    with conn.cursor() as cur:
+        cur.execute("""INSERT INTO import_osmosis_log
+                       (batchend, batchseq, batchsize, starttime, endtime, event)
+                       SELECT lastimportdate, sequence_id, %s, %s, now(), %s FROM import_status""",
+                    (batchsize, start, event))
index 0d3db20474ae79c67f74f3338dee026eb3ca4ed7..03bed98600f6678c5fff6f152ca22cbeeb8ffa47 100644 (file)
@@ -2,10 +2,13 @@
 Helper functions for executing external programs.
 """
 import logging
+import os
 import subprocess
 import urllib.request as urlrequest
 from urllib.parse import urlencode
 
+from psycopg2.extensions import parse_dsn
+
 from ..version import NOMINATIM_VERSION
 
 LOG = logging.getLogger()
@@ -87,6 +90,41 @@ def run_api_script(endpoint, project_dir, extra_env=None, phpcgi_bin=None,
     return 0
 
 
+def run_osm2pgsql(options):
+    """ Run osm2pgsql with the given options.
+    """
+    env = os.environ
+    cmd = [options['osm2pgsql'],
+           '--hstore', '--latlon', '--slim',
+           '--with-forward-dependencies', 'false',
+           '--log-progress', 'true',
+           '--number-processes', str(options['threads']),
+           '--cache', str(options['osm2pgsql_cache']),
+           '--output', 'gazetteer',
+           '--style', str(options['osm2pgsql_style'])
+          ]
+    if options['append']:
+        cmd.append('--append')
+
+    if options['flatnode_file']:
+        cmd.extend(('--flat-nodes', options['flatnode_file']))
+
+    dsn = parse_dsn(options['dsn'])
+    if 'password' in dsn:
+        env['PGPASSWORD'] = dsn['password']
+    if 'dbname' in dsn:
+        cmd.extend(('-d', dsn['dbname']))
+    if 'user' in dsn:
+        cmd.extend(('--username', dsn['user']))
+    for param in ('host', 'port'):
+        if param in dsn:
+            cmd.extend(('--' + param, dsn[param]))
+
+    cmd.append(str(options['import_file']))
+
+    subprocess.run(cmd, cwd=options.get('cwd', '.'), env=env, check=True)
+
+
 def get_url(url):
     """ Get the contents from the given URL and return it as a UTF-8 string.
     """
index f278556af90d37e906dbd02f9a4ca283f7aad39c..04f1c45b9cd728b7b0abe7cbb5ff892eb0fd7bef 100644 (file)
@@ -1,12 +1,16 @@
 """
 Functions for updating a database from a replication source.
 """
-import datetime
+import datetime as dt
+from enum import Enum
 import logging
+import time
 
 from osmium.replication.server import ReplicationServer
+from osmium import WriteHandler
 
 from ..db import status
+from .exec_utils import run_osm2pgsql
 
 LOG = logging.getLogger()
 
@@ -17,7 +21,7 @@ def init_replication(conn, base_url):
     date = status.compute_database_date(conn)
 
     # margin of error to make sure we get all data
-    date -= datetime.timedelta(hours=3)
+    date -= dt.timedelta(hours=3)
 
     repl = ReplicationServer(base_url)
 
@@ -53,7 +57,62 @@ def check_for_updates(conn, base_url):
 
     if state.sequence <= seq:
         LOG.warning("Database is up to date.")
-        return 1
+        return 2
 
     LOG.warning("New data available (%i => %i).", seq, state.sequence)
     return 0
+
+class UpdateState(Enum):
+    """ Possible states after an update has run.
+    """
+
+    UP_TO_DATE = 0
+    MORE_PENDING = 2
+    NO_CHANGES = 3
+
+
+def update(conn, options):
+    """ Update database from the next batch of data. Returns the state of
+        updates according to `UpdateState`.
+    """
+    startdate, startseq, indexed = status.get_status(conn)
+
+    if startseq is None:
+        LOG.error("Replication not set up. "
+                  "Please run 'nominatim replication --init' first.")
+        raise RuntimeError("Replication not set up.")
+
+    if not indexed and options['indexed_only']:
+        LOG.info("Skipping update. There is data that needs indexing.")
+        return UpdateState.MORE_PENDING
+
+    last_since_update = dt.datetime.now(dt.timezone.utc) - startdate
+    update_interval = dt.timedelta(seconds=options['update_interval'])
+    if last_since_update < update_interval:
+        duration = (update_interval - last_since_update).seconds
+        LOG.warning("Sleeping for %s sec before next update.", duration)
+        time.sleep(duration)
+
+    if options['import_file'].exists():
+        options['import_file'].unlink()
+
+    # Read updates into file.
+    repl = ReplicationServer(options['base_url'])
+
+    outhandler = WriteHandler(str(options['import_file']))
+    endseq = repl.apply_diffs(outhandler, startseq,
+                              max_size=options['max_diff_size'] * 1024)
+    outhandler.close()
+
+    if endseq is None:
+        return UpdateState.NO_CHANGES
+
+    # Consume updates with osm2pgsql.
+    options['append'] = True
+    run_osm2pgsql(options)
+
+    # Write the current status to the file
+    endstate = repl.get_state_info(endseq)
+    status.set_status(conn, endstate.timestamp, seq=endseq, indexed=False)
+
+    return UpdateState.UP_TO_DATE
index 2e81e91997ae7a902727013a1f1593088fba0a9f..8b0ba145c89d15bb473b97758b749449a476c8e2 100644 (file)
@@ -101,7 +101,8 @@ def def_config():
 
 @pytest.fixture
 def status_table(temp_db_conn):
-    """ Create an empty version of the status table.
+    """ Create an empty version of the status table and
+        the status logging table.
     """
     with temp_db_conn.cursor() as cur:
         cur.execute("""CREATE TABLE import_status (
@@ -109,6 +110,14 @@ def status_table(temp_db_conn):
                            sequence_id integer,
                            indexed boolean
                        )""")
+        cur.execute("""CREATE TABLE import_osmosis_log (
+                           batchend timestamp,
+                           batchseq integer,
+                           batchsize bigint,
+                           starttime timestamp,
+                           endtime timestamp,
+                           event text
+                           )""")
     temp_db_conn.commit()
 
 
index 71e3ff65cd8e9aed9005223353fff0c08ba9aca0..fc2454cd219a3f85edf14b3c2eac7c13f7f851b6 100644 (file)
@@ -1,8 +1,13 @@
 """
 Tests for command line interface wrapper.
+
+These tests just check that the various command line parameters route to the
+correct functionionality. They use a lot of monkeypatching to avoid executing
+the actual functions.
 """
 import psycopg2
 import pytest
+import time
 
 import nominatim.cli
 import nominatim.indexer.indexer
@@ -21,9 +26,9 @@ class MockParamCapture:
     """ Mock that records the parameters with which a function was called
         as well as the number of calls.
     """
-    def __init__(self):
+    def __init__(self, retval=0):
         self.called = 0
-        self.return_value = 0
+        self.return_value = retval
 
     def __call__(self, *args, **kwargs):
         self.called += 1
@@ -142,6 +147,69 @@ def test_replication_command(monkeypatch, temp_db, params, func):
     assert func_mock.called == 1
 
 
+def test_replication_update_bad_interval(monkeypatch, temp_db):
+    monkeypatch.setenv('NOMINATIM_REPLICATION_UPDATE_INTERVAL', 'xx')
+
+    with pytest.raises(ValueError):
+        call_nominatim('replication')
+
+
+def test_replication_update_bad_interval_for_geofabrik(monkeypatch, temp_db):
+    monkeypatch.setenv('NOMINATIM_REPLICATION_URL',
+                       'https://download.geofabrik.de/europe/ireland-and-northern-ireland-updates')
+
+    with pytest.raises(RuntimeError, match='Invalid replication.*'):
+        call_nominatim('replication')
+
+
+@pytest.mark.parametrize("state, retval", [
+                         (nominatim.tools.replication.UpdateState.UP_TO_DATE, 0),
+                         (nominatim.tools.replication.UpdateState.NO_CHANGES, 3)
+                         ])
+def test_replication_update_once_no_index(monkeypatch, temp_db, status_table, state, retval):
+    func_mock = MockParamCapture(retval=state)
+    monkeypatch.setattr(nominatim.tools.replication, 'update', func_mock)
+
+    assert retval == call_nominatim('replication', '--once', '--no-index')
+
+
+def test_replication_update_continuous(monkeypatch, status_table):
+    states = [nominatim.tools.replication.UpdateState.UP_TO_DATE,
+              nominatim.tools.replication.UpdateState.UP_TO_DATE]
+    monkeypatch.setattr(nominatim.tools.replication, 'update',
+                        lambda *args, **kwargs: states.pop())
+
+    index_mock = MockParamCapture()
+    monkeypatch.setattr(nominatim.indexer.indexer.Indexer, 'index_boundaries', index_mock)
+    monkeypatch.setattr(nominatim.indexer.indexer.Indexer, 'index_by_rank', index_mock)
+
+    with pytest.raises(IndexError):
+        call_nominatim('replication')
+
+    assert index_mock.called == 4
+
+
+def test_replication_update_continuous_no_change(monkeypatch, status_table):
+    states = [nominatim.tools.replication.UpdateState.NO_CHANGES,
+              nominatim.tools.replication.UpdateState.UP_TO_DATE]
+    monkeypatch.setattr(nominatim.tools.replication, 'update',
+                        lambda *args, **kwargs: states.pop())
+
+    index_mock = MockParamCapture()
+    monkeypatch.setattr(nominatim.indexer.indexer.Indexer, 'index_boundaries', index_mock)
+    monkeypatch.setattr(nominatim.indexer.indexer.Indexer, 'index_by_rank', index_mock)
+
+    sleep_mock = MockParamCapture()
+    monkeypatch.setattr(time, 'sleep', sleep_mock)
+
+    with pytest.raises(IndexError):
+        call_nominatim('replication')
+
+    assert index_mock.called == 2
+    assert sleep_mock.called == 1
+    assert sleep_mock.last_args[0] == 60
+
+
 @pytest.mark.parametrize("params", [
                          ('search', '--query', 'new'),
                          ('reverse', '--lat', '0', '--lon', '0'),
index 064f71ba38c816b1eaccb8615d5fa8bbfc9d0a55..f9fefeb2215c900279a9bad81d22ecce35ff6a5c 100644 (file)
@@ -69,6 +69,18 @@ def test_get_libpq_dsn_convert_php(monkeypatch):
     assert config.get_libpq_dsn() == 'dbname=gis password=foo host=localhost'
 
 
+@pytest.mark.parametrize("val,expect", [('foo bar', "'foo bar'"),
+                                        ("xy'z", "xy\\'z"),
+                                       ])
+def test_get_libpq_dsn_convert_php_special_chars(monkeypatch, val, expect):
+    config = Configuration(None, DEFCFG_DIR)
+
+    monkeypatch.setenv('NOMINATIM_DATABASE_DSN',
+                       'pgsql:dbname=gis;password={}'.format(val))
+
+    assert config.get_libpq_dsn() == "dbname=gis password={}".format(expect)
+
+
 def test_get_libpq_dsn_convert_libpq(monkeypatch):
     config = Configuration(None, DEFCFG_DIR)
 
@@ -93,3 +105,51 @@ def test_get_bool_empty():
 
     assert config.DATABASE_MODULE_PATH == ''
     assert config.get_bool('DATABASE_MODULE_PATH') == False
+
+
+@pytest.mark.parametrize("value,result", [('0', 0), ('1', 1),
+                                          ('85762513444', 85762513444)])
+def test_get_int_success(monkeypatch, value, result):
+    config = Configuration(None, DEFCFG_DIR)
+
+    monkeypatch.setenv('NOMINATIM_FOOBAR', value)
+
+    assert config.get_int('FOOBAR') == result
+
+
+@pytest.mark.parametrize("value", ['1b', 'fg', '0x23'])
+def test_get_int_bad_values(monkeypatch, value):
+    config = Configuration(None, DEFCFG_DIR)
+
+    monkeypatch.setenv('NOMINATIM_FOOBAR', value)
+
+    with pytest.raises(ValueError):
+        config.get_int('FOOBAR')
+
+
+def test_get_int_empty():
+    config = Configuration(None, DEFCFG_DIR)
+
+    assert config.DATABASE_MODULE_PATH == ''
+
+    with pytest.raises(ValueError):
+        config.get_int('DATABASE_MODULE_PATH')
+
+
+def test_get_import_style_intern(monkeypatch):
+    config = Configuration(None, DEFCFG_DIR)
+
+    monkeypatch.setenv('NOMINATIM_IMPORT_STYLE', 'street')
+
+    expected = DEFCFG_DIR / 'import-street.style'
+
+    assert config.get_import_style_file() == expected
+
+
+@pytest.mark.parametrize("value", ['custom', '/foo/bar.stye'])
+def test_get_import_style_intern(monkeypatch, value):
+    config = Configuration(None, DEFCFG_DIR)
+
+    monkeypatch.setenv('NOMINATIM_IMPORT_STYLE', value)
+
+    assert str(config.get_import_style_file()) == value
index 9631170af37678bf959b8191ba63feaef652cf77..1a538aec2156f2687d360565bedb03db23f64ce1 100644 (file)
@@ -84,3 +84,30 @@ def test_get_status_success(status_table, temp_db_conn):
 
     assert nominatim.db.status.get_status(temp_db_conn) == \
              (date, 667, False)
+
+
+@pytest.mark.parametrize("old_state", [True, False])
+@pytest.mark.parametrize("new_state", [True, False])
+def test_set_indexed(status_table, temp_db_conn, temp_db_cursor, old_state, new_state):
+    date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
+    nominatim.db.status.set_status(temp_db_conn, date=date, indexed=old_state)
+    nominatim.db.status.set_indexed(temp_db_conn, new_state)
+
+    assert temp_db_cursor.scalar("SELECT indexed FROM import_status") == new_state
+
+
+def test_set_indexed_empty_status(status_table, temp_db_conn, temp_db_cursor):
+    nominatim.db.status.set_indexed(temp_db_conn, True)
+
+    assert temp_db_cursor.scalar("SELECT count(*) FROM import_status") == 0
+
+
+def text_log_status(status_table, temp_db_conn):
+    date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
+    start = dt.datetime.now() - dt.timedelta(hours=1)
+    nominatim.db.status.set_status(temp_db_conn, date=date, seq=56)
+    nominatim.db.status.log_status(temp_db_conn, start, 'index')
+
+    assert temp_db_cursor.scalar("SELECT count(*) FROM import_osmosis_log") == 1
+    assert temp_db_cursor.scalar("SELECT seq FROM import_osmosis_log") == 56
+    assert temp_db_cursor.scalar("SELECT date FROM import_osmosis_log") == date
index a4eef61f5b24315f76bcfd789863e3f1d3a3a205..26a714f3dc6169a398e64041da70d7b7ec844af5 100644 (file)
@@ -99,3 +99,12 @@ def test_run_api_with_extra_env(tmp_project_dir):
     extra_env = dict(SCRIPT_FILENAME=str(tmp_project_dir / 'website' / 'test.php'))
     assert 0 == exec_utils.run_api_script('badname', tmp_project_dir,
                                           extra_env=extra_env)
+
+
+### run_osm2pgsql
+
+def test_run_osm2pgsql():
+    exec_utils.run_osm2pgsql(dict(osm2pgsql='echo', append=False, flatnode_file=None,
+                                  dsn='dbname=foobar', threads=1, osm2pgsql_cache=500,
+                                  osm2pgsql_style='./my.style',
+                                  import_file='foo.bar'))
index e06eda59a71d352ecd88e9b801e9a42dc73976b4..b94563ffa849f8c7c2ffd3a77b8cef1ce0cf3f99 100644 (file)
@@ -2,6 +2,7 @@
 Tests for replication functionality.
 """
 import datetime as dt
+import time
 
 import pytest
 from osmium.replication.server import OsmosisState
@@ -16,6 +17,7 @@ OSM_NODE_DATA = """\
 </osm>
 """
 
+### init replication
 
 def test_init_replication_bad_base_url(monkeypatch, status_table, place_row, temp_db_conn, temp_db_cursor):
     place_row(osm_type='N', osm_id=100)
@@ -43,20 +45,20 @@ def test_init_replication_success(monkeypatch, status_table, place_row, temp_db_
     assert temp_db_cursor.fetchone() == [expected_date, 234, True]
 
 
+### checking for updates
+
 def test_check_for_updates_empty_status_table(status_table, temp_db_conn):
     assert nominatim.tools.replication.check_for_updates(temp_db_conn, 'https://test.io') == 254
 
 
 def test_check_for_updates_seq_not_set(status_table, temp_db_conn):
-    status.set_status(temp_db_conn, dt.datetime.now().replace(tzinfo=dt.timezone.utc))
+    status.set_status(temp_db_conn, dt.datetime.now(dt.timezone.utc))
 
     assert nominatim.tools.replication.check_for_updates(temp_db_conn, 'https://test.io') == 254
 
 
 def test_check_for_updates_no_state(monkeypatch, status_table, temp_db_conn):
-    status.set_status(temp_db_conn,
-                      dt.datetime.now().replace(tzinfo=dt.timezone.utc),
-                      seq=345)
+    status.set_status(temp_db_conn, dt.datetime.now(dt.timezone.utc), seq=345)
 
     monkeypatch.setattr(nominatim.tools.replication.ReplicationServer,
                         "get_state_info", lambda self: None)
@@ -64,10 +66,10 @@ def test_check_for_updates_no_state(monkeypatch, status_table, temp_db_conn):
     assert nominatim.tools.replication.check_for_updates(temp_db_conn, 'https://test.io') == 253
 
 
-@pytest.mark.parametrize("server_sequence,result", [(344, 1), (345, 1), (346, 0)])
+@pytest.mark.parametrize("server_sequence,result", [(344, 2), (345, 2), (346, 0)])
 def test_check_for_updates_no_new_data(monkeypatch, status_table, temp_db_conn,
                                        server_sequence, result):
-    date = dt.datetime.now().replace(tzinfo=dt.timezone.utc)
+    date = dt.datetime.now(dt.timezone.utc)
     status.set_status(temp_db_conn, date, seq=345)
 
     monkeypatch.setattr(nominatim.tools.replication.ReplicationServer,
@@ -75,3 +77,61 @@ def test_check_for_updates_no_new_data(monkeypatch, status_table, temp_db_conn,
                         lambda self: OsmosisState(server_sequence, date))
 
     assert nominatim.tools.replication.check_for_updates(temp_db_conn, 'https://test.io') == result
+
+
+### updating
+
+@pytest.fixture
+def update_options(tmpdir):
+    return dict(base_url='https://test.io',
+                   indexed_only=False,
+                   update_interval=3600,
+                   import_file=tmpdir / 'foo.osm',
+                   max_diff_size=1)
+
+def test_update_empty_status_table(status_table, temp_db_conn):
+    with pytest.raises(RuntimeError):
+        nominatim.tools.replication.update(temp_db_conn, {})
+
+
+def test_update_already_indexed(status_table, temp_db_conn):
+    status.set_status(temp_db_conn, dt.datetime.now(dt.timezone.utc), seq=34, indexed=False)
+
+    assert nominatim.tools.replication.update(temp_db_conn, dict(indexed_only=True)) \
+             == nominatim.tools.replication.UpdateState.MORE_PENDING
+
+
+def test_update_no_data_no_sleep(monkeypatch, status_table, temp_db_conn, update_options):
+    date = dt.datetime.now(dt.timezone.utc) - dt.timedelta(days=1)
+    status.set_status(temp_db_conn, date, seq=34)
+
+    monkeypatch.setattr(nominatim.tools.replication.ReplicationServer,
+                        "apply_diffs",
+                        lambda *args, **kwargs: None)
+
+    sleeptime = []
+    monkeypatch.setattr(time, 'sleep', lambda s: sleeptime.append(s))
+
+    assert nominatim.tools.replication.update(temp_db_conn, update_options) \
+             == nominatim.tools.replication.UpdateState.NO_CHANGES
+
+    assert not sleeptime
+
+
+def test_update_no_data_sleep(monkeypatch, status_table, temp_db_conn, update_options):
+    date = dt.datetime.now(dt.timezone.utc) - dt.timedelta(minutes=30)
+    status.set_status(temp_db_conn, date, seq=34)
+
+    monkeypatch.setattr(nominatim.tools.replication.ReplicationServer,
+                        "apply_diffs",
+                        lambda *args, **kwargs: None)
+
+    sleeptime = []
+    monkeypatch.setattr(time, 'sleep', lambda s: sleeptime.append(s))
+
+    assert nominatim.tools.replication.update(temp_db_conn, update_options) \
+             == nominatim.tools.replication.UpdateState.NO_CHANGES
+
+    assert len(sleeptime) == 1
+    assert sleeptime[0] < 3600
+    assert sleeptime[0] > 0
diff --git a/utils/osm_file_date.py b/utils/osm_file_date.py
deleted file mode 100755 (executable)
index 0443e6a..0000000
+++ /dev/null
@@ -1,34 +0,0 @@
-#!/usr/bin/env python3
-
-import osmium
-import sys
-import datetime
-
-
-class Datecounter(osmium.SimpleHandler):
-
-    filedate = None
-
-    def date(self, o):
-        ts = o.timestamp
-        if self.filedate is None or ts > self.filedate:
-            self.filedate = ts
-
-    node = date
-    way = date
-    relation = date
-
-
-if __name__ == '__main__':
-    if len(sys.argv) != 2:
-        print("Usage: python osm_file_date.py <osmfile>")
-        sys.exit(-1)
-
-    h = Datecounter()
-
-    h.apply_file(sys.argv[1])
-
-    if h.filedate is None:
-        exit(5)
-
-    print(h.filedate)