]> git.openstreetmap.org Git - nominatim.git/commitdiff
use WorkerPool for Tiger data import
authorSarah Hoffmann <lonvia@denofr.de>
Thu, 13 May 2021 18:16:30 +0000 (20:16 +0200)
committerSarah Hoffmann <lonvia@denofr.de>
Thu, 13 May 2021 18:36:50 +0000 (20:36 +0200)
Requires adding an option that SQL errors are ignored.

nominatim/db/async_connection.py
nominatim/tools/tiger_data.py
test/python/test_db_async_connection.py

index fe8b30068d497322930b8e237c874c875b02edc0..db4b89ce2fa74b6a03763dccac1f00df5f8ef22e 100644 (file)
@@ -28,8 +28,9 @@ class DeadlockHandler:
         normally.
     """
 
-    def __init__(self, handler):
+    def __init__(self, handler, ignore_sql_errors=False):
         self.handler = handler
+        self.ignore_sql_errors = ignore_sql_errors
 
     def __enter__(self):
         pass
@@ -44,6 +45,11 @@ class DeadlockHandler:
                 if exc_value.pgcode == '40P01':
                     self.handler()
                     return True
+
+        if self.ignore_sql_errors and isinstance(exc_value, psycopg2.Error):
+            LOG.info("SQL error ignored: %s", exc_value)
+            return True
+
         return False
 
 
@@ -51,10 +57,11 @@ class DBConnection:
     """ A single non-blocking database connection.
     """
 
-    def __init__(self, dsn, cursor_factory=None):
+    def __init__(self, dsn, cursor_factory=None, ignore_sql_errors=False):
         self.current_query = None
         self.current_params = None
         self.dsn = dsn
+        self.ignore_sql_errors = ignore_sql_errors
 
         self.conn = None
         self.cursor = None
@@ -101,7 +108,7 @@ class DBConnection:
         """ Block until any pending operation is done.
         """
         while True:
-            with DeadlockHandler(self._deadlock_handler):
+            with DeadlockHandler(self._deadlock_handler, self.ignore_sql_errors):
                 wait_select(self.conn)
                 self.current_query = None
                 return
@@ -128,7 +135,7 @@ class DBConnection:
         if self.current_query is None:
             return True
 
-        with DeadlockHandler(self._deadlock_handler):
+        with DeadlockHandler(self._deadlock_handler, self.ignore_sql_errors):
             if self.conn.poll() == psycopg2.extensions.POLL_OK:
                 self.current_query = None
                 return True
@@ -143,8 +150,9 @@ class WorkerPool:
     """
     REOPEN_CONNECTIONS_AFTER = 100000
 
-    def __init__(self, dsn, pool_size):
-        self.threads = [DBConnection(dsn) for _ in range(pool_size)]
+    def __init__(self, dsn, pool_size, ignore_sql_errors=False):
+        self.threads = [DBConnection(dsn, ignore_sql_errors=ignore_sql_errors)
+                        for _ in range(pool_size)]
         self.free_workers = self._yield_free_worker()
         self.wait_time = 0
 
index 07772c702a53f3a554cb8a42f54f9de388da56cd..fbcdb077bc12674782d6b0bc871538cad8b44e2f 100644 (file)
@@ -4,10 +4,9 @@ Functions for importing tiger data and handling tarbar and directory files
 import logging
 import os
 import tarfile
-import selectors
 
 from nominatim.db.connection import connect
-from nominatim.db.async_connection import DBConnection
+from nominatim.db.async_connection import WorkerPool
 from nominatim.db.sql_preprocessor import SQLPreprocessor
 
 
@@ -37,44 +36,20 @@ def handle_tarfile_or_directory(data_dir):
     return sql_files, tar
 
 
-def handle_threaded_sql_statements(sel, file):
+def handle_threaded_sql_statements(pool, file):
     """ Handles sql statement with multiplexing
     """
 
     lines = 0
-    end_of_file = False
     # Using pool of database connections to execute sql statements
-    while not end_of_file:
-        for key, _ in sel.select(1):
-            conn = key.data
-            try:
-                if conn.is_done():
-                    sql_query = file.readline()
-                    lines += 1
-                    if not sql_query:
-                        end_of_file = True
-                        break
-                    conn.perform(sql_query)
-                    if lines == 1000:
-                        print('. ', end='', flush=True)
-                        lines = 0
-            except Exception as exc: # pylint: disable=broad-except
-                LOG.info('Wrong SQL statement: %s', exc)
-
-def handle_unregister_connection_pool(sel, place_threads):
-    """ Handles unregistering pool of connections
-    """
+    for sql_query in file:
+        pool.next_free_worker().perform(sql_query)
+
+        lines += 1
+        if lines == 1000:
+            print('.', end='', flush=True)
+            lines = 0
 
-    while place_threads > 0:
-        for key, _ in sel.select(1):
-            conn = key.data
-            sel.unregister(conn)
-            try:
-                conn.wait()
-            except Exception as exc: # pylint: disable=broad-except
-                LOG.info('Wrong SQL statement: %s', exc)
-            conn.close()
-            place_threads -= 1
 
 def add_tiger_data(data_dir, config, threads):
     """ Import tiger data from directory or tar file `data dir`.
@@ -91,25 +66,16 @@ def add_tiger_data(data_dir, config, threads):
 
     # Reading sql_files and then for each file line handling
     # sql_query in <threads - 1> chunks.
-    sel = selectors.DefaultSelector()
     place_threads = max(1, threads - 1)
 
-    # Creates a pool of database connections
-    for _ in range(place_threads):
-        conn = DBConnection(dsn)
-        conn.connect()
-        sel.register(conn, selectors.EVENT_WRITE, conn)
-
-    for sql_file in sql_files:
-        if not tar:
-            file = open(sql_file)
-        else:
-            file = tar.extractfile(sql_file)
-
-        handle_threaded_sql_statements(sel, file)
+    with WorkerPool(dsn, place_threads, ignore_sql_errors=True) as pool:
+        for sql_file in sql_files:
+            if not tar:
+                file = open(sql_file)
+            else:
+                file = tar.extractfile(sql_file)
 
-    # Unregistering pool of database connections
-    handle_unregister_connection_pool(sel, place_threads)
+            handle_threaded_sql_statements(pool, file)
 
     if tar:
         tar.close()
index b52f7053f4a8d601facfbf48a924ae564b7688e3..330b86f7cb46e4e324de82467d461523e0dd94b8 100644 (file)
@@ -56,13 +56,21 @@ def test_bad_query(conn):
         conn.wait()
 
 
+def test_bad_query_ignore(temp_db):
+    with closing(DBConnection('dbname=' + temp_db, ignore_sql_errors=True)) as conn:
+        conn.connect()
+
+        conn.perform('SELECT efasfjsea')
+
+        conn.wait()
+
+
 def exec_with_deadlock(cur, sql, detector):
     with DeadlockHandler(lambda *args: detector.append(1)):
         cur.execute(sql)
 
 
 def test_deadlock(simple_conns):
-    print(psycopg2.__version__)
     cur1, cur2 = simple_conns
 
     cur1.execute("""CREATE TABLE t1 (id INT PRIMARY KEY, t TEXT);