From: Sarah Hoffmann Date: Thu, 11 Mar 2021 21:48:38 +0000 (+0100) Subject: Merge pull request #2204 from darkshredder/tiger-data X-Git-Tag: v3.7.0~20 X-Git-Url: https://git.openstreetmap.org/nominatim.git/commitdiff_plain/9086a794a1ca9ed08d66c9aca5cea25d1f6006bf?hp=35f4695b67b50a41caf69378ea13044ea91f1d2c Merge pull request #2204 from darkshredder/tiger-data Ported tiger-data-import to Python and Added Tarball Support --- diff --git a/lib-php/admin/setup.php b/lib-php/admin/setup.php index f81c0ca8..0b008495 100644 --- a/lib-php/admin/setup.php +++ b/lib-php/admin/setup.php @@ -161,7 +161,7 @@ if ($aCMDResult['load-data'] || $aCMDResult['all']) { if ($aCMDResult['import-tiger-data']) { $bDidSomething = true; $sTigerPath = getSetting('TIGER_DATA_PATH', CONST_InstallDir.'/tiger'); - $oSetup->importTigerData($sTigerPath); + run((clone($oNominatimCmd))->addParams('transition', '--tiger-data', $sTigerPath)); } if ($aCMDResult['calculate-postcodes'] || $aCMDResult['all']) { diff --git a/lib-php/setup/SetupClass.php b/lib-php/setup/SetupClass.php index 4b6439a9..cf2ac6da 100755 --- a/lib-php/setup/SetupClass.php +++ b/lib-php/setup/SetupClass.php @@ -67,76 +67,6 @@ class SetupFunctions } } - public function importTigerData($sTigerPath) - { - info('Import Tiger data'); - - $aFilenames = glob($sTigerPath.'/*.sql'); - info('Found '.count($aFilenames).' SQL files in path '.$sTigerPath); - if (empty($aFilenames)) { - warn('Tiger data import selected but no files found in path '.$sTigerPath); - return; - } - $sTemplate = file_get_contents(CONST_SqlDir.'/tiger_import_start.sql'); - $sTemplate = $this->replaceSqlPatterns($sTemplate); - - $this->pgsqlRunScript($sTemplate, false); - - $aDBInstances = array(); - for ($i = 0; $i < $this->iInstances; $i++) { - // https://secure.php.net/manual/en/function.pg-connect.php - $DSN = getSetting('DATABASE_DSN'); - $DSN = preg_replace('/^pgsql:/', '', $DSN); - $DSN = preg_replace('/;/', ' ', $DSN); - $aDBInstances[$i] = pg_connect($DSN, PGSQL_CONNECT_FORCE_NEW | PGSQL_CONNECT_ASYNC); - pg_ping($aDBInstances[$i]); - } - - foreach ($aFilenames as $sFile) { - echo $sFile.': '; - $hFile = fopen($sFile, 'r'); - $sSQL = fgets($hFile, 100000); - $iLines = 0; - while (true) { - for ($i = 0; $i < $this->iInstances; $i++) { - if (!pg_connection_busy($aDBInstances[$i])) { - while (pg_get_result($aDBInstances[$i])); - $sSQL = fgets($hFile, 100000); - if (!$sSQL) break 2; - if (!pg_send_query($aDBInstances[$i], $sSQL)) fail(pg_last_error($aDBInstances[$i])); - $iLines++; - if ($iLines == 1000) { - echo '.'; - $iLines = 0; - } - } - } - usleep(10); - } - fclose($hFile); - - $bAnyBusy = true; - while ($bAnyBusy) { - $bAnyBusy = false; - for ($i = 0; $i < $this->iInstances; $i++) { - if (pg_connection_busy($aDBInstances[$i])) $bAnyBusy = true; - } - usleep(10); - } - echo "\n"; - } - - for ($i = 0; $i < $this->iInstances; $i++) { - pg_close($aDBInstances[$i]); - } - - info('Creating indexes on Tiger data'); - $sTemplate = file_get_contents(CONST_SqlDir.'/tiger_import_finish.sql'); - $sTemplate = $this->replaceSqlPatterns($sTemplate); - - $this->pgsqlRunScript($sTemplate, false); - } - public function calculatePostcodes($bCMDResultAll) { info('Calculate Postcodes'); diff --git a/lib-sql/tiger_import_finish.sql b/lib-sql/tiger_import_finish.sql index 374c00b3..39ab1ae3 100644 --- a/lib-sql/tiger_import_finish.sql +++ b/lib-sql/tiger_import_finish.sql @@ -1,13 +1,15 @@ --index only on parent_place_id -CREATE INDEX idx_location_property_tiger_parent_place_id_imp ON location_property_tiger_import (parent_place_id) {ts:aux-index}; -CREATE UNIQUE INDEX idx_location_property_tiger_place_id_imp ON location_property_tiger_import (place_id) {ts:aux-index}; +CREATE INDEX {{sql.if_index_not_exists}} idx_location_property_tiger_place_id_imp + ON location_property_tiger_import (parent_place_id) {{db.tablespace.aux_index}}; +CREATE UNIQUE INDEX {{sql.if_index_not_exists}} idx_location_property_tiger_place_id_imp + ON location_property_tiger_import (place_id) {{db.tablespace.aux_index}}; -GRANT SELECT ON location_property_tiger_import TO "{www-user}"; +GRANT SELECT ON location_property_tiger_import TO "{{config.DATABASE_WEBUSER}}"; DROP TABLE IF EXISTS location_property_tiger; ALTER TABLE location_property_tiger_import RENAME TO location_property_tiger; -ALTER INDEX idx_location_property_tiger_parent_place_id_imp RENAME TO idx_location_property_tiger_housenumber_parent_place_id; -ALTER INDEX idx_location_property_tiger_place_id_imp RENAME TO idx_location_property_tiger_place_id; +ALTER INDEX IF EXISTS idx_location_property_tiger_parent_place_id_imp RENAME TO idx_location_property_tiger_housenumber_parent_place_id; +ALTER INDEX IF EXISTS idx_location_property_tiger_place_id_imp RENAME TO idx_location_property_tiger_place_id; DROP FUNCTION tiger_line_import (linegeo geometry, in_startnumber integer, in_endnumber integer, interpolationtype text, in_street text, in_isin text, in_postcode text); diff --git a/nominatim/cli.py b/nominatim/cli.py index 7459711f..b3d9eee6 100644 --- a/nominatim/cli.py +++ b/nominatim/cli.py @@ -13,6 +13,7 @@ from .tools.exec_utils import run_legacy_script, run_php_server from .errors import UsageError from . import clicmd from .clicmd.args import NominatimArgs +from .tools import tiger_data LOG = logging.getLogger() @@ -166,8 +167,11 @@ class UpdateAddData: @staticmethod def run(args): if args.tiger_data: - os.environ['NOMINATIM_TIGER_DATA_PATH'] = args.tiger_data - return run_legacy_script('setup.php', '--import-tiger-data', nominatim_env=args) + return tiger_data.add_tiger_data(args.config.get_libpq_dsn(), + args.tiger_data, + args.threads or 1, + args.config, + args.sqllib_dir) params = ['update.php'] if args.file: diff --git a/nominatim/clicmd/transition.py b/nominatim/clicmd/transition.py index b8db1a38..0a89cb03 100644 --- a/nominatim/clicmd/transition.py +++ b/nominatim/clicmd/transition.py @@ -58,10 +58,12 @@ class AdminTransition: help="Ignore certain erros on import.") group.add_argument('--reverse-only', action='store_true', help='Do not create search tables and indexes') + group.add_argument('--tiger-data', metavar='FILE', + help='File to import') @staticmethod def run(args): - from ..tools import database_import + from ..tools import database_import, tiger_data from ..tools import refresh if args.create_db: @@ -127,3 +129,11 @@ class AdminTransition: LOG.warning('Create Search indices') with connect(args.config.get_libpq_dsn()) as conn: database_import.create_search_indices(conn, args.config, args.sqllib_dir, args.drop) + + if args.tiger_data: + LOG.warning('Tiger data') + tiger_data.add_tiger_data(args.config.get_libpq_dsn(), + args.tiger_data, + args.threads or 1, + args.config, + args.sqllib_dir) diff --git a/nominatim/tools/tiger_data.py b/nominatim/tools/tiger_data.py new file mode 100644 index 00000000..9b960e2d --- /dev/null +++ b/nominatim/tools/tiger_data.py @@ -0,0 +1,112 @@ +""" +Functions for importing tiger data and handling tarbar and directory files +""" +import logging +import os +import tarfile +import selectors + +from ..db.connection import connect +from ..db.async_connection import DBConnection +from ..db.sql_preprocessor import SQLPreprocessor + + +LOG = logging.getLogger() + + +def handle_tarfile_or_directory(data_dir): + """ Handles tarfile or directory for importing tiger data + """ + + tar = None + if data_dir.endswith('.tar.gz'): + tar = tarfile.open(data_dir) + sql_files = [i for i in tar.getmembers() if i.name.endswith('.sql')] + LOG.warning("Found %d SQL files in tarfile with path %s", len(sql_files), data_dir) + if not sql_files: + LOG.warning("Tiger data import selected but no files in tarfile's path %s", data_dir) + return None, None + else: + files = os.listdir(data_dir) + sql_files = [os.path.join(data_dir, i) for i in files if i.endswith('.sql')] + LOG.warning("Found %d SQL files in path %s", len(sql_files), data_dir) + if not sql_files: + LOG.warning("Tiger data import selected but no files found in path %s", data_dir) + return None, None + + return sql_files, tar + + +def handle_threaded_sql_statements(sel, file): + """ Handles sql statement with multiplexing + """ + + lines = 0 + end_of_file = False + # Using pool of database connections to execute sql statements + while not end_of_file: + for key, _ in sel.select(1): + conn = key.data + try: + if conn.is_done(): + sql_query = file.readline() + lines += 1 + if not sql_query: + end_of_file = True + break + conn.perform(sql_query) + if lines == 1000: + print('. ', end='', flush=True) + lines = 0 + except Exception as exc: # pylint: disable=broad-except + LOG.info('Wrong SQL statement: %s', exc) + + +def add_tiger_data(dsn, data_dir, threads, config, sqllib_dir): + """ Import tiger data from directory or tar file + """ + + sql_files, tar = handle_tarfile_or_directory(data_dir) + + if not sql_files: + return + + with connect(dsn) as conn: + sql = SQLPreprocessor(conn, config, sqllib_dir) + sql.run_sql_file(conn, 'tiger_import_start.sql') + + # Reading sql_files and then for each file line handling + # sql_query in chunks. + sel = selectors.DefaultSelector() + place_threads = max(1, threads - 1) + + # Creates a pool of database connections + for _ in range(place_threads): + conn = DBConnection(dsn) + conn.connect() + sel.register(conn, selectors.EVENT_WRITE, conn) + + for sql_file in sql_files: + if not tar: + file = open(sql_file) + else: + file = tar.extractfile(sql_file) + + handle_threaded_sql_statements(sel, file) + + # Unregistering pool of database connections + while place_threads > 0: + for key, _ in sel.select(1): + conn = key.data + sel.unregister(conn) + conn.wait() + conn.close() + place_threads -= 1 + + if tar: + tar.close() + print('\n') + LOG.warning("Creating indexes on Tiger data") + with connect(dsn) as conn: + sql = SQLPreprocessor(conn, config, sqllib_dir) + sql.run_sql_file(conn, 'tiger_import_finish.sql') diff --git a/test/python/conftest.py b/test/python/conftest.py index d16dceff..4b7cccc3 100644 --- a/test/python/conftest.py +++ b/test/python/conftest.py @@ -13,6 +13,7 @@ sys.path.insert(0, str(SRC_DIR.resolve())) from nominatim.config import Configuration from nominatim.db import connection +from nominatim.db.sql_preprocessor import SQLPreprocessor class _TestingCursor(psycopg2.extras.DictCursor): """ Extension to the DictCursor class that provides execution @@ -266,3 +267,9 @@ def osm2pgsql_options(temp_db): flatnode_file='', tablespaces=dict(slim_data='', slim_index='', main_data='', main_index='')) + +@pytest.fixture +def sql_preprocessor(temp_db_conn, tmp_path, def_config, monkeypatch, table_factory): + monkeypatch.setenv('NOMINATIM_DATABASE_MODULE_PATH', '.') + table_factory('country_name', 'partition INT', (0, 1, 2)) + return SQLPreprocessor(temp_db_conn, def_config, tmp_path) diff --git a/test/python/test_cli.py b/test/python/test_cli.py index 418f4bcf..1f9cd06d 100644 --- a/test/python/test_cli.py +++ b/test/python/test_cli.py @@ -65,7 +65,6 @@ def test_cli_help(capsys): @pytest.mark.parametrize("command,script", [ (('special-phrases',), 'specialphrases'), - (('add-data', '--tiger-data', 'tiger'), 'setup'), (('add-data', '--file', 'foo.osm'), 'update'), (('export',), 'export') ]) diff --git a/test/python/test_db_sql_preprocessor.py b/test/python/test_db_sql_preprocessor.py index 3c10000f..08a195bd 100644 --- a/test/python/test_db_sql_preprocessor.py +++ b/test/python/test_db_sql_preprocessor.py @@ -5,8 +5,6 @@ from pathlib import Path import pytest -from nominatim.db.sql_preprocessor import SQLPreprocessor - @pytest.fixture def sql_factory(tmp_path): def _mk_sql(sql_body): @@ -21,13 +19,6 @@ def sql_factory(tmp_path): return _mk_sql - -@pytest.fixture -def sql_preprocessor(temp_db_conn, tmp_path, def_config, monkeypatch, table_factory): - monkeypatch.setenv('NOMINATIM_DATABASE_MODULE_PATH', '.') - table_factory('country_name', 'partition INT', (0, 1, 2)) - return SQLPreprocessor(temp_db_conn, def_config, tmp_path) - @pytest.mark.parametrize("expr,ret", [ ("'a'", 'a'), ("'{{db.partitions|join}}'", '012'), diff --git a/test/python/test_tools_tiger_data.py b/test/python/test_tools_tiger_data.py new file mode 100644 index 00000000..5029a132 --- /dev/null +++ b/test/python/test_tools_tiger_data.py @@ -0,0 +1,37 @@ +""" +Test for tiger data function +""" +from pathlib import Path + +import pytest +import tarfile + +from nominatim.tools import tiger_data, database_import + + +@pytest.mark.parametrize("threads", (1, 5)) +def test_add_tiger_data(dsn, src_dir, def_config, tmp_path, sql_preprocessor, + temp_db_cursor, threads, temp_db): + temp_db_cursor.execute('CREATE EXTENSION hstore') + temp_db_cursor.execute('CREATE EXTENSION postgis') + temp_db_cursor.execute('CREATE TABLE place (id INT)') + sqlfile = tmp_path / '1010.sql' + sqlfile.write_text("""INSERT INTO place values (1)""") + tiger_data.add_tiger_data(dsn, str(tmp_path), threads, def_config, src_dir / 'lib-sql') + + assert temp_db_cursor.table_rows('place') == 1 + +@pytest.mark.parametrize("threads", (1, 5)) +def test_add_tiger_data_tarfile(dsn, src_dir, def_config, tmp_path, + temp_db_cursor, threads, temp_db, sql_preprocessor): + temp_db_cursor.execute('CREATE EXTENSION hstore') + temp_db_cursor.execute('CREATE EXTENSION postgis') + temp_db_cursor.execute('CREATE TABLE place (id INT)') + sqlfile = tmp_path / '1010.sql' + sqlfile.write_text("""INSERT INTO place values (1)""") + tar = tarfile.open("sample.tar.gz", "w:gz") + tar.add(sqlfile) + tar.close() + tiger_data.add_tiger_data(dsn, str(src_dir / 'sample.tar.gz'), threads, def_config, src_dir / 'lib-sql') + + assert temp_db_cursor.table_rows('place') == 1 \ No newline at end of file