1 # SPDX-License-Identifier: GPL-3.0-or-later
 
   3 # This file is part of Nominatim. (https://nominatim.org)
 
   5 # Copyright (C) 2024 by the Nominatim developer community.
 
   6 # For a full list of authors see the git log.
 
   8 Functions for importing tiger data and handling tarbar and directory files
 
  10 from typing import Any, TextIO, List, Union, cast, Iterator, Dict
 
  17 from psycopg.types.json import Json
 
  19 from ..config import Configuration
 
  20 from ..db.connection import connect
 
  21 from ..db.sql_preprocessor import SQLPreprocessor
 
  22 from ..errors import UsageError
 
  23 from ..db.query_pool import QueryPool
 
  24 from ..data.place_info import PlaceInfo
 
  25 from ..tokenizer.base import AbstractTokenizer
 
  28 LOG = logging.getLogger()
 
  32     """ Context manager that goes through Tiger input files which may
 
  33         either be in a directory or gzipped together in a tar file.
 
  36     def __init__(self, data_dir: str) -> None:
 
  37         self.tar_handle = None
 
  38         self.files: List[Union[str, tarfile.TarInfo]] = []
 
  40         if data_dir.endswith('.tar.gz'):
 
  42                 self.tar_handle = tarfile.open(data_dir)
 
  43             except tarfile.ReadError as err:
 
  44                 LOG.fatal("Cannot open '%s'. Is this a tar file?", data_dir)
 
  45                 raise UsageError("Cannot open Tiger data file.") from err
 
  47             self.files = [i for i in self.tar_handle.getmembers() if i.name.endswith('.csv')]
 
  48             LOG.warning("Found %d CSV files in tarfile with path %s", len(self.files), data_dir)
 
  50             files = os.listdir(data_dir)
 
  51             self.files = [os.path.join(data_dir, i) for i in files if i.endswith('.csv')]
 
  52             LOG.warning("Found %d CSV files in path %s", len(self.files), data_dir)
 
  55             LOG.warning("Tiger data import selected but no files found at %s", data_dir)
 
  57     def __enter__(self) -> 'TigerInput':
 
  60     def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
 
  62             self.tar_handle.close()
 
  63             self.tar_handle = None
 
  65     def __bool__(self) -> bool:
 
  66         return bool(self.files)
 
  68     def get_file(self, fname: Union[str, tarfile.TarInfo]) -> TextIO:
 
  69         """ Return a file handle to the next file to be processed.
 
  70             Raises an IndexError if there is no file left.
 
  72         if self.tar_handle is not None:
 
  73             extracted = self.tar_handle.extractfile(fname)
 
  74             assert extracted is not None
 
  75             return io.TextIOWrapper(extracted)
 
  77         return open(cast(str, fname), encoding='utf-8')
 
  79     def __iter__(self) -> Iterator[Dict[str, Any]]:
 
  80         """ Iterate over the lines in each file.
 
  82         for fname in self.files:
 
  83             fd = self.get_file(fname)
 
  84             yield from csv.DictReader(fd, delimiter=';')
 
  87 async def add_tiger_data(data_dir: str, config: Configuration, threads: int,
 
  88                          tokenizer: AbstractTokenizer) -> int:
 
  89     """ Import tiger data from directory or tar file `data dir`.
 
  91     dsn = config.get_libpq_dsn()
 
  93     with connect(dsn) as conn:
 
  94         if freeze.is_frozen(conn):
 
  95             raise UsageError("Tiger cannot be imported when database frozen (Github issue #3048)")
 
  97     with TigerInput(data_dir) as tar:
 
 101         with connect(dsn) as conn:
 
 102             sql = SQLPreprocessor(conn, config)
 
 103             sql.run_sql_file(conn, 'tiger_import_start.sql')
 
 105         # Reading files and then for each file line handling
 
 106         # sql_query in <threads - 1> chunks.
 
 107         place_threads = max(1, threads - 1)
 
 109         async with QueryPool(dsn, place_threads, autocommit=True) as pool:
 
 110             with tokenizer.name_analyzer() as analyzer:
 
 111                 for lineno, row in enumerate(tar, 1):
 
 113                         address = dict(street=row['street'], postcode=row['postcode'])
 
 114                         args = ('SRID=4326;' + row['geometry'],
 
 115                                 int(row['from']), int(row['to']), row['interpolation'],
 
 116                                 Json(analyzer.process_place(PlaceInfo({'address': address}))),
 
 117                                 analyzer.normalize_postcode(row['postcode']))
 
 121                     await pool.put_query(
 
 122                         """SELECT tiger_line_import(%s::GEOMETRY, %s::INT,
 
 123                                                     %s::INT, %s::TEXT, %s::JSONB, %s::TEXT)""",
 
 126                     if not lineno % 1000:
 
 127                         print('.', end='', flush=True)
 
 129         print('', flush=True)
 
 131     LOG.warning("Creating indexes on Tiger data")
 
 132     with connect(dsn) as conn:
 
 133         sql = SQLPreprocessor(conn, config)
 
 134         sql.run_sql_file(conn, 'tiger_import_finish.sql')