X-Git-Url: https://git.openstreetmap.org/nominatim.git/blobdiff_plain/a7a5f0161f27833513b81466fd5d4c57cd7ee4d4..bef1aebf1c74277f3f9b8744a2a92dca0ef3f255:/nominatim/db/sql_preprocessor.py diff --git a/nominatim/db/sql_preprocessor.py b/nominatim/db/sql_preprocessor.py index 4de53886..31b4a8c0 100644 --- a/nominatim/db/sql_preprocessor.py +++ b/nominatim/db/sql_preprocessor.py @@ -7,10 +7,14 @@ """ Preprocessing of SQL files. """ +from typing import Set, Dict, Any import jinja2 +from nominatim.db.connection import Connection +from nominatim.db.async_connection import WorkerPool +from nominatim.config import Configuration -def _get_partitions(conn): +def _get_partitions(conn: Connection) -> Set[int]: """ Get the set of partitions currently in use. """ with conn.cursor() as cur: @@ -22,7 +26,7 @@ def _get_partitions(conn): return partitions -def _get_tables(conn): +def _get_tables(conn: Connection) -> Set[str]: """ Return the set of tables currently in use. Only includes non-partitioned """ @@ -32,7 +36,7 @@ def _get_tables(conn): return set((row[0] for row in list(cur))) -def _setup_tablespace_sql(config): +def _setup_tablespace_sql(config: Configuration) -> Dict[str, str]: """ Returns a dict with tablespace expressions for the different tablespace kinds depending on whether a tablespace is configured or not. """ @@ -47,7 +51,7 @@ def _setup_tablespace_sql(config): return out -def _setup_postgresql_features(conn): +def _setup_postgresql_features(conn: Connection) -> Dict[str, Any]: """ Set up a dictionary with various optional Postgresql/Postgis features that depend on the database version. """ @@ -69,11 +73,11 @@ class SQLPreprocessor: and follows its syntax. """ - def __init__(self, conn, config): + def __init__(self, conn: Connection, config: Configuration) -> None: self.env = jinja2.Environment(autoescape=False, loader=jinja2.FileSystemLoader(str(config.lib_dir.sql))) - db_info = {} + db_info: Dict[str, Any] = {} db_info['partitions'] = _get_partitions(conn) db_info['tables'] = _get_tables(conn) db_info['reverse_only'] = 'search_name' not in db_info['tables'] @@ -84,7 +88,7 @@ class SQLPreprocessor: self.env.globals['postgres'] = _setup_postgresql_features(conn) - def run_sql_file(self, conn, name, **kwargs): + def run_sql_file(self, conn: Connection, name: str, **kwargs: Any) -> None: """ Execute the given SQL file on the connection. The keyword arguments may supply additional parameters for preprocessing. """ @@ -93,3 +97,21 @@ class SQLPreprocessor: with conn.cursor() as cur: cur.execute(sql) conn.commit() + + + def run_parallel_sql_file(self, dsn: str, name: str, num_threads: int = 1, + **kwargs: Any) -> None: + """ Execure the given SQL files using parallel asynchronous connections. + The keyword arguments may supply additional parameters for + preprocessing. + + After preprocessing the SQL code is cut at lines containing only + '---'. Each chunk is sent to one of the `num_threads` workers. + """ + sql = self.env.get_template(name).render(**kwargs) + + parts = sql.split('\n---\n') + + with WorkerPool(dsn, num_threads) as pool: + for part in parts: + pool.next_free_worker().perform(part)