1 # SPDX-License-Identifier: GPL-2.0-only
 
   3 # This file is part of Nominatim. (https://nominatim.org)
 
   5 # Copyright (C) 2022 by the Nominatim developer community.
 
   6 # For a full list of authors see the git log.
 
   8 Access and helper functions for the status and status log table.
 
  10 from typing import Optional, Tuple, cast
 
  15 from nominatim.db.connection import Connection
 
  16 from nominatim.tools.exec_utils import get_url
 
  17 from nominatim.errors import UsageError
 
  18 from nominatim.typing import TypedDict
 
  20 LOG = logging.getLogger()
 
  21 ISODATE_FORMAT = '%Y-%m-%dT%H:%M:%S'
 
  24 class StatusRow(TypedDict):
 
  25     """ Dictionary of columns of the import_status table.
 
  27     lastimportdate: dt.datetime
 
  28     sequence_id: Optional[int]
 
  29     indexed: Optional[bool]
 
  32 def compute_database_date(conn: Connection) -> dt.datetime:
 
  33     """ Determine the date of the database from the newest object in the
 
  36     # First, find the node with the highest ID in the database
 
  37     with conn.cursor() as cur:
 
  38         if conn.table_exists('place'):
 
  39             osmid = cur.scalar("SELECT max(osm_id) FROM place WHERE osm_type='N'")
 
  41             osmid = cur.scalar("SELECT max(osm_id) FROM placex WHERE osm_type='N'")
 
  44             LOG.fatal("No data found in the database.")
 
  45             raise UsageError("No data found in the database.")
 
  47     LOG.info("Using node id %d for timestamp lookup", osmid)
 
  48     # Get the node from the API to find the timestamp when it was created.
 
  49     node_url = f'https://www.openstreetmap.org/api/0.6/node/{osmid}/1'
 
  50     data = get_url(node_url)
 
  52     match = re.search(r'timestamp="((\d{4})-(\d{2})-(\d{2})T(\d{2}):(\d{2}):(\d{2}))Z"', data)
 
  55         LOG.fatal("The node data downloaded from the API does not contain valid data.\n"
 
  56                   "URL used: %s", node_url)
 
  57         raise UsageError("Bad API data.")
 
  59     LOG.debug("Found timestamp %s", match.group(1))
 
  61     return dt.datetime.strptime(match.group(1), ISODATE_FORMAT).replace(tzinfo=dt.timezone.utc)
 
  64 def set_status(conn: Connection, date: Optional[dt.datetime],
 
  65                seq: Optional[int] = None, indexed: bool = True) -> None:
 
  66     """ Replace the current status with the given status. If date is `None`
 
  67         then only sequence and indexed will be updated as given. Otherwise
 
  68         the whole status is replaced.
 
  69         The change will be committed to the database.
 
  71     assert date is None or date.tzinfo == dt.timezone.utc
 
  72     with conn.cursor() as cur:
 
  74             cur.execute("UPDATE import_status set sequence_id = %s, indexed = %s",
 
  77             cur.execute("TRUNCATE TABLE import_status")
 
  78             cur.execute("""INSERT INTO import_status (lastimportdate, sequence_id, indexed)
 
  79                            VALUES (%s, %s, %s)""", (date, seq, indexed))
 
  84 def get_status(conn: Connection) -> Tuple[Optional[dt.datetime], Optional[int], Optional[bool]]:
 
  85     """ Return the current status as a triple of (date, sequence, indexed).
 
  86         If status has not been set up yet, a triple of None is returned.
 
  88     with conn.cursor() as cur:
 
  89         cur.execute("SELECT * FROM import_status LIMIT 1")
 
  91             return None, None, None
 
  93         row = cast(StatusRow, cur.fetchone())
 
  94         return row['lastimportdate'], row['sequence_id'], row['indexed']
 
  97 def set_indexed(conn: Connection, state: bool) -> None:
 
  98     """ Set the indexed flag in the status table to the given state.
 
 100     with conn.cursor() as cur:
 
 101         cur.execute("UPDATE import_status SET indexed = %s", (state, ))
 
 105 def log_status(conn: Connection, start: dt.datetime,
 
 106                event: str, batchsize: Optional[int] = None) -> None:
 
 107     """ Write a new status line to the `import_osmosis_log` table.
 
 109     with conn.cursor() as cur:
 
 110         cur.execute("""INSERT INTO import_osmosis_log
 
 111                        (batchend, batchseq, batchsize, starttime, endtime, event)
 
 112                        SELECT lastimportdate, sequence_id, %s, %s, now(), %s FROM import_status""",
 
 113                     (batchsize, start, event))