From 2dd8433ab6ce93dbedf23b4915d785f081bb37cb Mon Sep 17 00:00:00 2001 From: Sarah Hoffmann Date: Tue, 8 Nov 2022 21:45:36 +0100 Subject: [PATCH] fix timeout use for replication timeout The timeout parameter is no longer taken into account since pyosmium switched to the requests library. This adds the parameter back. --- nominatim/clicmd/replication.py | 2 +- nominatim/tools/replication.py | 41 +++++++++++++++++++++++++++++---- 2 files changed, 37 insertions(+), 6 deletions(-) diff --git a/nominatim/clicmd/replication.py b/nominatim/clicmd/replication.py index 2d6396a1..235817de 100644 --- a/nominatim/clicmd/replication.py +++ b/nominatim/clicmd/replication.py @@ -148,7 +148,7 @@ class UpdateReplication: while True: with connect(args.config.get_libpq_dsn()) as conn: start = dt.datetime.now(dt.timezone.utc) - state = replication.update(conn, params) + state = replication.update(conn, params, socket_timeout=args.socket_timeout) if state is not replication.UpdateState.NO_CHANGES: status.log_status(conn, start, 'import') batchdate, _, _ = status.get_status(conn) diff --git a/nominatim/tools/replication.py b/nominatim/tools/replication.py index db706bf6..1c46c50c 100644 --- a/nominatim/tools/replication.py +++ b/nominatim/tools/replication.py @@ -7,13 +7,16 @@ """ Functions for updating a database from a replication source. """ -from typing import ContextManager, MutableMapping, Any, Generator, cast +from typing import ContextManager, MutableMapping, Any, Generator, cast, Iterator from contextlib import contextmanager import datetime as dt from enum import Enum import logging import time +import types +import urllib.request as urlrequest +import requests from nominatim.db import status from nominatim.db.connection import Connection from nominatim.tools.exec_utils import run_osm2pgsql @@ -22,6 +25,7 @@ from nominatim.errors import UsageError try: from osmium.replication.server import ReplicationServer from osmium import WriteHandler + from osmium import version as pyo_version except ImportError as exc: logging.getLogger().critical("pyosmium not installed. Replication functions not available.\n" "To install pyosmium via pip: pip3 install osmium") @@ -86,7 +90,8 @@ class UpdateState(Enum): NO_CHANGES = 3 -def update(conn: Connection, options: MutableMapping[str, Any]) -> UpdateState: +def update(conn: Connection, options: MutableMapping[str, Any], + socket_timeout: int = 60) -> UpdateState: """ Update database from the next batch of data. Returns the state of updates according to `UpdateState`. """ @@ -114,7 +119,7 @@ def update(conn: Connection, options: MutableMapping[str, Any]) -> UpdateState: options['import_file'].unlink() # Read updates into file. - with _make_replication_server(options['base_url']) as repl: + with _make_replication_server(options['base_url'], socket_timeout) as repl: outhandler = WriteHandler(str(options['import_file'])) endseq = repl.apply_diffs(outhandler, startseq + 1, max_size=options['max_diff_size'] * 1024) @@ -136,14 +141,40 @@ def update(conn: Connection, options: MutableMapping[str, Any]) -> UpdateState: return UpdateState.UP_TO_DATE -def _make_replication_server(url: str) -> ContextManager[ReplicationServer]: +def _make_replication_server(url: str, timeout: int) -> ContextManager[ReplicationServer]: """ Returns a ReplicationServer in form of a context manager. Creates a light wrapper around older versions of pyosmium that did not support the context manager interface. """ if hasattr(ReplicationServer, '__enter__'): - return cast(ContextManager[ReplicationServer], ReplicationServer(url)) + # Patches the open_url function for pyosmium >= 3.2 + # where the socket timeout is no longer respected. + def patched_open_url(self: ReplicationServer, url: urlrequest.Request) -> Any: + """ Download a resource from the given URL and return a byte sequence + of the content. + """ + get_params = { + 'headers': {"User-Agent" : f"Nominatim (pyosmium/{pyo_version.pyosmium_release})"}, + 'timeout': timeout or None, + 'stream': True + } + + if self.session is not None: + return self.session.get(url.get_full_url(), **get_params) + + @contextmanager + def _get_url_with_session() -> Iterator[requests.Response]: + with requests.Session() as session: + request = session.get(url.get_full_url(), **get_params) # type: ignore + yield request + + return _get_url_with_session() + + repl = ReplicationServer(url) + repl.open_url = types.MethodType(patched_open_url, repl) + + return cast(ContextManager[ReplicationServer], repl) @contextmanager def get_cm() -> Generator[ReplicationServer, None, None]: -- 2.45.1