]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/tools/tiger_data.py
Merge pull request #2326 from lonvia/wokerpool-for-tiger-data
[nominatim.git] / nominatim / tools / tiger_data.py
1 """
2 Functions for importing tiger data and handling tarbar and directory files
3 """
4 import logging
5 import os
6 import tarfile
7
8 from nominatim.db.connection import connect
9 from nominatim.db.async_connection import WorkerPool
10 from nominatim.db.sql_preprocessor import SQLPreprocessor
11
12
13 LOG = logging.getLogger()
14
15
16 def handle_tarfile_or_directory(data_dir):
17     """ Handles tarfile or directory for importing tiger data
18     """
19
20     tar = None
21     if data_dir.endswith('.tar.gz'):
22         tar = tarfile.open(data_dir)
23         sql_files = [i for i in tar.getmembers() if i.name.endswith('.sql')]
24         LOG.warning("Found %d SQL files in tarfile with path %s", len(sql_files), data_dir)
25         if not sql_files:
26             LOG.warning("Tiger data import selected but no files in tarfile's path %s", data_dir)
27             return None, None
28     else:
29         files = os.listdir(data_dir)
30         sql_files = [os.path.join(data_dir, i) for i in files if i.endswith('.sql')]
31         LOG.warning("Found %d SQL files in path %s", len(sql_files), data_dir)
32         if not sql_files:
33             LOG.warning("Tiger data import selected but no files found in path %s", data_dir)
34             return None, None
35
36     return sql_files, tar
37
38
39 def handle_threaded_sql_statements(pool, file):
40     """ Handles sql statement with multiplexing
41     """
42
43     lines = 0
44     # Using pool of database connections to execute sql statements
45     for sql_query in file:
46         pool.next_free_worker().perform(sql_query)
47
48         lines += 1
49         if lines == 1000:
50             print('.', end='', flush=True)
51             lines = 0
52
53
54 def add_tiger_data(data_dir, config, threads):
55     """ Import tiger data from directory or tar file `data dir`.
56     """
57     dsn = config.get_libpq_dsn()
58     sql_files, tar = handle_tarfile_or_directory(data_dir)
59
60     if not sql_files:
61         return
62
63     with connect(dsn) as conn:
64         sql = SQLPreprocessor(conn, config)
65         sql.run_sql_file(conn, 'tiger_import_start.sql')
66
67     # Reading sql_files and then for each file line handling
68     # sql_query in <threads - 1> chunks.
69     place_threads = max(1, threads - 1)
70
71     with WorkerPool(dsn, place_threads, ignore_sql_errors=True) as pool:
72         for sql_file in sql_files:
73             if not tar:
74                 file = open(sql_file)
75             else:
76                 file = tar.extractfile(sql_file)
77
78             handle_threaded_sql_statements(pool, file)
79
80     if tar:
81         tar.close()
82     print('\n')
83     LOG.warning("Creating indexes on Tiger data")
84     with connect(dsn) as conn:
85         sql = SQLPreprocessor(conn, config)
86         sql.run_sql_file(conn, 'tiger_import_finish.sql')