1 # SPDX-License-Identifier: GPL-2.0-only
 
   3 # This file is part of Nominatim. (https://nominatim.org)
 
   5 # Copyright (C) 2022 by the Nominatim developer community.
 
   6 # For a full list of authors see the git log.
 
   8 Tests for status table manipulation.
 
  14 import nominatim.db.status
 
  15 from nominatim.errors import UsageError
 
  18 <osm version="0.6" generator="OpenStreetMap server" copyright="OpenStreetMap and contributors" attribution="http://www.openstreetmap.org/copyright" license="http://opendatacommons.org/licenses/odbl/1-0/">
 
  19 <node id="45673" visible="true" version="1" changeset="2047" timestamp="2006-01-27T22:09:10Z" user="Foo" uid="111" lat="48.7586670" lon="8.1343060">
 
  25     return dt.datetime.strptime(date, nominatim.db.status.ISODATE_FORMAT)\
 
  26                .replace(tzinfo=dt.timezone.utc)
 
  29 @pytest.fixture(autouse=True)
 
  30 def setup_status_table(status_table):
 
  34 def test_compute_database_date_place_empty(place_table, temp_db_conn):
 
  35     with pytest.raises(UsageError):
 
  36         nominatim.db.status.compute_database_date(temp_db_conn)
 
  39 def test_compute_database_date_valid(monkeypatch, place_row, temp_db_conn):
 
  40     place_row(osm_type='N', osm_id=45673)
 
  44         requested_url.append(url)
 
  47     monkeypatch.setattr(nominatim.db.status, "get_url", mock_url)
 
  49     date = nominatim.db.status.compute_database_date(temp_db_conn)
 
  51     assert requested_url == ['https://www.openstreetmap.org/api/0.6/node/45673/1']
 
  52     assert date == iso_date('2006-01-27T22:09:10')
 
  55 def test_compute_database_broken_api(monkeypatch, place_row, temp_db_conn):
 
  56     place_row(osm_type='N', osm_id=45673)
 
  60         requested_url.append(url)
 
  61         return '<osm version="0.6" generator="OpenStre'
 
  63     monkeypatch.setattr(nominatim.db.status, "get_url", mock_url)
 
  65     with pytest.raises(UsageError):
 
  66         nominatim.db.status.compute_database_date(temp_db_conn)
 
  69 def test_set_status_empty_table(temp_db_conn, temp_db_cursor):
 
  70     date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
 
  71     nominatim.db.status.set_status(temp_db_conn, date=date)
 
  73     assert temp_db_cursor.row_set("SELECT * FROM import_status") == \
 
  77 def test_set_status_filled_table(temp_db_conn, temp_db_cursor):
 
  78     date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
 
  79     nominatim.db.status.set_status(temp_db_conn, date=date)
 
  81     assert temp_db_cursor.table_rows('import_status') == 1
 
  83     date = dt.datetime.fromordinal(1000100).replace(tzinfo=dt.timezone.utc)
 
  84     nominatim.db.status.set_status(temp_db_conn, date=date, seq=456, indexed=False)
 
  86     assert temp_db_cursor.row_set("SELECT * FROM import_status") == \
 
  90 def test_set_status_missing_date(temp_db_conn, temp_db_cursor):
 
  91     date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
 
  92     nominatim.db.status.set_status(temp_db_conn, date=date)
 
  94     assert temp_db_cursor.table_rows('import_status') == 1
 
  96     nominatim.db.status.set_status(temp_db_conn, date=None, seq=456, indexed=False)
 
  98     assert temp_db_cursor.row_set("SELECT * FROM import_status") == \
 
 102 def test_get_status_empty_table(temp_db_conn):
 
 103     assert nominatim.db.status.get_status(temp_db_conn) == (None, None, None)
 
 106 def test_get_status_success(temp_db_conn):
 
 107     date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
 
 108     nominatim.db.status.set_status(temp_db_conn, date=date, seq=667, indexed=False)
 
 110     assert nominatim.db.status.get_status(temp_db_conn) == \
 
 114 @pytest.mark.parametrize("old_state", [True, False])
 
 115 @pytest.mark.parametrize("new_state", [True, False])
 
 116 def test_set_indexed(temp_db_conn, temp_db_cursor, old_state, new_state):
 
 117     date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
 
 118     nominatim.db.status.set_status(temp_db_conn, date=date, indexed=old_state)
 
 119     nominatim.db.status.set_indexed(temp_db_conn, new_state)
 
 121     assert temp_db_cursor.scalar("SELECT indexed FROM import_status") == new_state
 
 124 def test_set_indexed_empty_status(temp_db_conn, temp_db_cursor):
 
 125     nominatim.db.status.set_indexed(temp_db_conn, True)
 
 127     assert temp_db_cursor.table_rows("import_status") == 0
 
 130 def test_log_status(temp_db_conn, temp_db_cursor):
 
 131     date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
 
 132     start = dt.datetime.now() - dt.timedelta(hours=1)
 
 134     nominatim.db.status.set_status(temp_db_conn, date=date, seq=56)
 
 135     nominatim.db.status.log_status(temp_db_conn, start, 'index')
 
 137     temp_db_conn.commit()
 
 139     assert temp_db_cursor.table_rows("import_osmosis_log") == 1
 
 140     assert temp_db_cursor.scalar("SELECT batchseq FROM import_osmosis_log") == 56
 
 141     assert temp_db_cursor.scalar("SELECT event FROM import_osmosis_log") == 'index'