]> git.openstreetmap.org Git - nominatim.git/commitdiff
move WorkerPool into db module
authorSarah Hoffmann <lonvia@denofr.de>
Thu, 13 May 2021 15:11:17 +0000 (17:11 +0200)
committerSarah Hoffmann <lonvia@denofr.de>
Thu, 13 May 2021 15:11:17 +0000 (17:11 +0200)
The pool is independent of the indexer and may also be used
by other parts of the software.

nominatim/db/async_connection.py
nominatim/indexer/indexer.py

index a4f554965ab9144a89bb609296ee80ca82049ac3..fe8b30068d497322930b8e237c874c875b02edc0 100644 (file)
@@ -6,6 +6,9 @@
 """ Database helper functions for the indexer.
 """
 import logging
+import select
+import time
+
 import psycopg2
 from psycopg2.extras import wait_select
 
@@ -131,3 +134,71 @@ class DBConnection:
                 return True
 
         return False
+
+
+class WorkerPool:
+    """ A pool of asynchronous database connections.
+
+        The pool may be used as a context manager.
+    """
+    REOPEN_CONNECTIONS_AFTER = 100000
+
+    def __init__(self, dsn, pool_size):
+        self.threads = [DBConnection(dsn) for _ in range(pool_size)]
+        self.free_workers = self._yield_free_worker()
+        self.wait_time = 0
+
+
+    def finish_all(self):
+        """ Wait for all connection to finish.
+        """
+        for thread in self.threads:
+            while not thread.is_done():
+                thread.wait()
+
+        self.free_workers = self._yield_free_worker()
+
+    def close(self):
+        """ Close all connections and clear the pool.
+        """
+        for thread in self.threads:
+            thread.close()
+        self.threads = []
+        self.free_workers = None
+
+
+    def next_free_worker(self):
+        """ Get the next free connection.
+        """
+        return next(self.free_workers)
+
+
+    def _yield_free_worker(self):
+        ready = self.threads
+        command_stat = 0
+        while True:
+            for thread in ready:
+                if thread.is_done():
+                    command_stat += 1
+                    yield thread
+
+            if command_stat > self.REOPEN_CONNECTIONS_AFTER:
+                for thread in self.threads:
+                    while not thread.is_done():
+                        thread.wait()
+                    thread.connect()
+                ready = self.threads
+                command_stat = 0
+            else:
+                tstart = time.time()
+                _, ready, _ = select.select([], self.threads, [])
+                self.wait_time += time.time() - tstart
+
+
+    def __enter__(self):
+        return self
+
+
+    def __exit__(self, exc_type, exc_value, traceback):
+        self.finish_all()
+        self.close()
index b7673abaddc8090a896351c7ad230f372742a739..5ab0eac3dca5701562085ffecdce652e110f9cfd 100644 (file)
@@ -2,14 +2,13 @@
 Main work horse for indexing (computing addresses) the database.
 """
 import logging
-import select
 import time
 
 import psycopg2.extras
 
 from nominatim.indexer.progress import ProgressLogger
 from nominatim.indexer import runners
-from nominatim.db.async_connection import DBConnection
+from nominatim.db.async_connection import DBConnection, WorkerPool
 from nominatim.db.connection import connect
 
 LOG = logging.getLogger()
@@ -81,73 +80,6 @@ class PlaceFetcher:
         self.conn.wait()
         self.close()
 
-class WorkerPool:
-    """ A pool of asynchronous database connections.
-
-        The pool may be used as a context manager.
-    """
-    REOPEN_CONNECTIONS_AFTER = 100000
-
-    def __init__(self, dsn, pool_size):
-        self.threads = [DBConnection(dsn) for _ in range(pool_size)]
-        self.free_workers = self._yield_free_worker()
-        self.wait_time = 0
-
-
-    def finish_all(self):
-        """ Wait for all connection to finish.
-        """
-        for thread in self.threads:
-            while not thread.is_done():
-                thread.wait()
-
-        self.free_workers = self._yield_free_worker()
-
-    def close(self):
-        """ Close all connections and clear the pool.
-        """
-        for thread in self.threads:
-            thread.close()
-        self.threads = []
-        self.free_workers = None
-
-
-    def next_free_worker(self):
-        """ Get the next free connection.
-        """
-        return next(self.free_workers)
-
-
-    def _yield_free_worker(self):
-        ready = self.threads
-        command_stat = 0
-        while True:
-            for thread in ready:
-                if thread.is_done():
-                    command_stat += 1
-                    yield thread
-
-            if command_stat > self.REOPEN_CONNECTIONS_AFTER:
-                for thread in self.threads:
-                    while not thread.is_done():
-                        thread.wait()
-                    thread.connect()
-                ready = self.threads
-                command_stat = 0
-            else:
-                tstart = time.time()
-                _, ready, _ = select.select([], self.threads, [])
-                self.wait_time += time.time() - tstart
-
-
-    def __enter__(self):
-        return self
-
-
-    def __exit__(self, exc_type, exc_value, traceback):
-        self.finish_all()
-        self.close()
-
 
 class Indexer:
     """ Main indexing routine.