]> git.openstreetmap.org Git - nominatim.git/commitdiff
introduce external processing in indexer
authorSarah Hoffmann <lonvia@denofr.de>
Fri, 23 Apr 2021 13:49:38 +0000 (15:49 +0200)
committerSarah Hoffmann <lonvia@denofr.de>
Fri, 30 Apr 2021 09:30:51 +0000 (11:30 +0200)
Indexing is now split into three parts: first a preparation step
that collects the necessary information from the database and
returns it to Python. In a second step the data is transformed
within Python as necessary and then returned to the database
through the usual UPDATE which now not only sets the indexed_status
but also other fields. The third step comprises the address
computation which is still done inside the update trigger in
the database.

The second processing step doesn't do anything useful yet.

lib-sql/functions/placex_triggers.sql
nominatim/indexer/indexer.py
nominatim/indexer/runners.py
test/bdd/steps/nominatim_environment.py
test/bdd/steps/steps_db_ops.py
test/python/test_indexing.py

index 812bc79ff5c44314858af1ae350ec7d2c8cd0597..5ffd4e4bc23615322866ec9d30b180d1f6e2ad59 100644 (file)
@@ -1,5 +1,51 @@
 -- Trigger functions for the placex table.
 
+-- Retrieve the data needed by the indexer for updating the place.
+--
+-- Return parameters:
+--  name            list of names
+--  address         list of address tags, either from the object or a surrounding
+--                  building
+--  country_feature If the place is a country feature, this contains the
+--                  country code, otherwise it is null.
+CREATE OR REPLACE FUNCTION placex_prepare_update(p placex,
+                                                 OUT name HSTORE,
+                                                 OUT address HSTORE,
+                                                 OUT country_feature VARCHAR)
+  AS $$
+BEGIN
+  -- For POI nodes, check if the address should be derived from a surrounding
+  -- building.
+  IF p.rank_search < 30 OR p.osm_type != 'N' OR p.address is not null THEN
+    RAISE WARNING 'self address for % %', p.osm_type, p.osm_id;
+    address := p.address;
+  ELSE
+    -- The additional && condition works around the misguided query
+    -- planner of postgis 3.0.
+    SELECT placex.address || hstore('_inherited', '') INTO address
+      FROM placex
+     WHERE ST_Covers(geometry, p.centroid)
+           and geometry && p.centroid
+           and (placex.address ? 'housenumber' or placex.address ? 'street' or placex.address ? 'place')
+           and rank_search > 28 AND ST_GeometryType(geometry) in ('ST_Polygon','ST_MultiPolygon')
+     LIMIT 1;
+    RAISE WARNING 'other address for % %: % (%)', p.osm_type, p.osm_id, address, p.centroid;
+  END IF;
+
+  address := address - '_unlisted_place'::TEXT;
+  name := p.name;
+
+  country_feature := CASE WHEN p.admin_level = 2
+                               and p.class = 'boundary' and p.type = 'administrative'
+                               and p.osm_type = 'R'
+                          THEN p.country_code
+                          ELSE null
+                     END;
+END;
+$$
+LANGUAGE plpgsql STABLE;
+
+
 -- Find the parent road of a POI.
 --
 -- \returns Place ID of parent object or NULL if none
@@ -397,10 +443,11 @@ BEGIN
   NEW.place_id := nextval('seq_place');
   NEW.indexed_status := 1; --STATUS_NEW
 
-  NEW.country_code := lower(get_country_code(NEW.geometry));
+  NEW.centroid := ST_PointOnSurface(NEW.geometry);
+  NEW.country_code := lower(get_country_code(NEW.centroid));
 
   NEW.partition := get_partition(NEW.country_code);
-  NEW.geometry_sector := geometry_sector(NEW.partition, NEW.geometry);
+  NEW.geometry_sector := geometry_sector(NEW.partition, NEW.centroid);
 
   IF NEW.osm_type = 'X' THEN
     -- E'X'ternal records should already be in the right format so do nothing
@@ -531,8 +578,6 @@ DECLARE
   nameaddress_vector INTEGER[];
   addr_nameaddress_vector INTEGER[];
 
-  inherited_address HSTORE;
-
   linked_node_id BIGINT;
   linked_importance FLOAT;
   linked_wikipedia TEXT;
@@ -566,7 +611,6 @@ BEGIN
   -- update not necessary for osmline, cause linked_place_id does not exist
 
   NEW.extratags := NEW.extratags - 'linked_place'::TEXT;
-  NEW.address := NEW.address - '_unlisted_place'::TEXT;
 
   IF NEW.linked_place_id is not null THEN
     {% if debug %}RAISE WARNING 'place already linked to %', NEW.linked_place_id;{% endif %}
@@ -750,27 +794,6 @@ BEGIN
     {% if debug %}RAISE WARNING 'finding street for % %', NEW.osm_type, NEW.osm_id;{% endif %}
     NEW.parent_place_id := null;
 
-    -- if we have a POI and there is no address information,
-    -- see if we can get it from a surrounding building
-    inherited_address := ''::HSTORE;
-    IF NEW.osm_type = 'N' AND addr_street IS NULL AND addr_place IS NULL
-       AND NEW.housenumber IS NULL THEN
-      FOR location IN
-        -- The additional && condition works around the misguided query
-        -- planner of postgis 3.0.
-        SELECT address from placex where ST_Covers(geometry, NEW.centroid)
-            and geometry && NEW.centroid
-            and (address ? 'housenumber' or address ? 'street' or address ? 'place')
-            and rank_search > 28 AND ST_GeometryType(geometry) in ('ST_Polygon','ST_MultiPolygon')
-            limit 1
-      LOOP
-        NEW.housenumber := location.address->'housenumber';
-        addr_street := location.address->'street';
-        addr_place := location.address->'place';
-        inherited_address := location.address;
-      END LOOP;
-    END IF;
-
     -- We have to find our parent road.
     NEW.parent_place_id := find_parent_for_poi(NEW.osm_type, NEW.osm_id,
                                                NEW.partition,
@@ -823,12 +846,12 @@ BEGIN
 
       {% if not db.reverse_only %}
       IF array_length(name_vector, 1) is not NULL
-         OR inherited_address is not NULL OR NEW.address is not NULL
+         OR NEW.address is not NULL
       THEN
         SELECT * INTO name_vector, nameaddress_vector
           FROM create_poi_search_terms(NEW.place_id,
                                        NEW.partition, NEW.parent_place_id,
-                                       inherited_address || NEW.address,
+                                       NEW.address,
                                        NEW.country_code, NEW.housenumber,
                                        name_vector, NEW.centroid);
 
@@ -844,6 +867,16 @@ BEGIN
       END IF;
       {% endif %}
 
+      -- If the address was inherited from a surrounding building,
+      -- do not add it permanently to the table.
+      IF NEW.address ? '_inherited' THEN
+        IF NEW.address ? '_unlisted_place' THEN
+          NEW.address := hstore('_unlisted_place', NEW.address->'_unlisted_place');
+        ELSE
+          NEW.address := null;
+        END IF;
+      END IF;
+
       RETURN NEW;
     END IF;
 
index faf91c456b1eb9bd0d6a00d8e1f396f8fd846ab7..cfa48433fd120c664ecd7aea3b432aec329a3261 100644 (file)
@@ -187,14 +187,14 @@ class Indexer:
 
                     with WorkerPool(self.dsn, self.num_threads) as pool:
                         while True:
-                            places = [p[0] for p in cur.fetchmany(batch)]
+                            places = [p for p in cur.fetchmany(batch)]
                             if not places:
                                 break
 
                             LOG.debug("Processing places: %s", str(places))
                             worker = pool.next_free_worker()
 
-                            worker.perform(runner.sql_index_place(places))
+                            runner.index_places(worker, places)
                             progress.add(len(places))
 
                         pool.finish_all()
index 3c853cd0a19130b8bda5ab82ed539193bf75fa40..dd6ced3881f1bdf25f0df5fa0df96e5d419b2077 100644 (file)
@@ -4,12 +4,40 @@ tasks.
 """
 # pylint: disable=C0111
 
-class RankRunner:
-    """ Returns SQL commands for indexing one rank within the placex table.
+class AbstractPlacexRunner:
+    """ Returns SQL commands for indexing of the placex table.
     """
+    SELECT_SQL = 'SELECT place_id, (placex_prepare_update(placex)).* FROM placex'
 
     def __init__(self, rank):
         self.rank = rank
+        self._sql_terms = 0
+        self._cached_index_sql = None
+
+    def _index_sql(self, num_places):
+        if num_places != self._sql_terms:
+            self._cached_index_sql = \
+                """ UPDATE placex
+                    SET indexed_status = 0, address = v.addr
+                    FROM (VALUES {}) as v(id, addr)
+                    WHERE place_id = v.id
+                """.format(','.join(["(%s, %s::hstore)"]  * num_places))
+            self._sql_terms = num_places
+
+        return self._cached_index_sql
+
+
+    def index_places(self, worker, places):
+        values = []
+        for place in places:
+            values.extend((place[x] for x in ('place_id', 'address')))
+
+        worker.perform(self._index_sql(len(places)), values)
+
+
+class RankRunner(AbstractPlacexRunner):
+    """ Returns SQL commands for indexing one rank within the placex table.
+    """
 
     def name(self):
         return "rank {}".format(self.rank)
@@ -20,24 +48,16 @@ class RankRunner:
                """.format(self.rank)
 
     def sql_get_objects(self):
-        return """SELECT place_id FROM placex
-                  WHERE indexed_status > 0 and rank_address = {}
-                  ORDER BY geometry_sector""".format(self.rank)
-
-    @staticmethod
-    def sql_index_place(ids):
-        return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\
-               .format(','.join((str(i) for i in ids)))
+        return """{} WHERE indexed_status > 0 and rank_address = {}
+                     ORDER BY geometry_sector
+               """.format(self.SELECT_SQL, self.rank)
 
 
-class BoundaryRunner:
+class BoundaryRunner(AbstractPlacexRunner):
     """ Returns SQL commands for indexing the administrative boundaries
         of a certain rank.
     """
 
-    def __init__(self, rank):
-        self.rank = rank
-
     def name(self):
         return "boundaries rank {}".format(self.rank)
 
@@ -49,16 +69,10 @@ class BoundaryRunner:
                """.format(self.rank)
 
     def sql_get_objects(self):
-        return """SELECT place_id FROM placex
-                  WHERE indexed_status > 0 and rank_search = {}
-                        and class = 'boundary' and type = 'administrative'
-                  ORDER BY partition, admin_level
-               """.format(self.rank)
-
-    @staticmethod
-    def sql_index_place(ids):
-        return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\
-               .format(','.join((str(i) for i in ids)))
+        return """{} WHERE indexed_status > 0 and rank_search = {}
+                           and class = 'boundary' and type = 'administrative'
+                     ORDER BY partition, admin_level
+               """.format(self.SELECT_SQL, self.rank)
 
 
 class InterpolationRunner:
@@ -82,10 +96,10 @@ class InterpolationRunner:
                   ORDER BY geometry_sector"""
 
     @staticmethod
-    def sql_index_place(ids):
-        return """UPDATE location_property_osmline
-                  SET indexed_status = 0 WHERE place_id IN ({})
-               """.format(','.join((str(i) for i in ids)))
+    def index_places(worker, ids):
+        worker.perform(""" UPDATE location_property_osmline
+                           SET indexed_status = 0 WHERE place_id IN ({})
+                       """.format(','.join((str(i[0]) for i in ids))))
 
 
 class PostcodeRunner:
@@ -107,7 +121,7 @@ class PostcodeRunner:
                   ORDER BY country_code, postcode"""
 
     @staticmethod
-    def sql_index_place(ids):
-        return """UPDATE location_postcode SET indexed_status = 0
-                  WHERE place_id IN ({})
-               """.format(','.join((str(i) for i in ids)))
+    def index_places(worker, ids):
+        worker.perform(""" UPDATE location_postcode SET indexed_status = 0
+                           WHERE place_id IN ({})
+                       """.format(','.join((str(i[0]) for i in ids))))
index 6381e4b4a2da8865c89e711910458a2a7fb13d74..345e134975db02b77425634d4ecb4a08b8242661 100644 (file)
@@ -106,9 +106,19 @@ class NominatimEnvironment:
             self.website_dir.cleanup()
 
         self.website_dir = tempfile.TemporaryDirectory()
-        cfg = Configuration(None, self.src_dir / 'settings', environ=self.test_env)
-        cfg.lib_dir.php = self.src_dir / 'lib-php'
-        refresh.setup_website(Path(self.website_dir.name) / 'website', cfg)
+        refresh.setup_website(Path(self.website_dir.name) / 'website',
+                              self.get_test_config())
+
+
+    def get_test_config(self):
+        cfg = Configuration(Path(self.website_dir.name), self.src_dir / 'settings',
+                            environ=self.test_env)
+        cfg.set_libdirs(module=self.build_dir / 'module',
+                        osm2pgsql=self.build_dir / 'osm2pgsql' / 'osm2pgsql',
+                        php=self.src_dir / 'lib-php',
+                        sql=self.src_dir / 'lib-sql',
+                        data=self.src_dir / 'data')
+        return cfg
 
     def get_libpq_dsn(self):
         dsn = self.test_env['NOMINATIM_DATABASE_DSN']
index 72a610eb123733db313ee74d510b633afdac5fb3..dea09833f9cc84efd66fd743a4e79dc027562429 100644 (file)
@@ -7,6 +7,7 @@ from place_inserter import PlaceColumn
 from table_compare import NominatimID, DBRow
 
 from nominatim.indexer import indexer
+from nominatim.tokenizer import factory as tokenizer_factory
 
 def check_database_integrity(context):
     """ Check some generic constraints on the tables.
@@ -86,6 +87,9 @@ def add_data_to_planet_ways(context):
 def import_and_index_data_from_place_table(context):
     """ Import data previously set up in the place table.
     """
+    nctx = context.nominatim
+
+    tokenizer = tokenizer_factory.create_tokenizer(nctx.get_test_config())
     context.nominatim.copy_from_place(context.db)
 
     # XXX use tool function as soon as it is ported
index 6692eba61d349b8908415d793078fba17b0db3af..223a599ec4fc818c0bd3ad1a3712251b6ab8ec7a 100644 (file)
@@ -17,6 +17,7 @@ class IndexerTestDB:
         self.conn = conn
         self.conn.set_isolation_level(0)
         with self.conn.cursor() as cur:
+            cur.execute('CREATE EXTENSION hstore')
             cur.execute("""CREATE TABLE placex (place_id BIGINT,
                                                 class TEXT,
                                                 type TEXT,
@@ -26,6 +27,7 @@ class IndexerTestDB:
                                                 indexed_date TIMESTAMP,
                                                 partition SMALLINT,
                                                 admin_level SMALLINT,
+                                                address HSTORE,
                                                 geometry_sector INTEGER)""")
             cur.execute("""CREATE TABLE location_property_osmline (
                                place_id BIGINT,
@@ -46,6 +48,17 @@ class IndexerTestDB:
                              END IF;
                              RETURN NEW;
                            END; $$ LANGUAGE plpgsql;""")
+            cur.execute("""CREATE OR REPLACE FUNCTION placex_prepare_update(p placex,
+                                                      OUT name HSTORE,
+                                                      OUT address HSTORE,
+                                                      OUT country_feature VARCHAR)
+                           AS $$
+                           BEGIN
+                            address := p.address;
+                            name := p.address;
+                           END;
+                           $$ LANGUAGE plpgsql STABLE;
+                        """)
             for table in ('placex', 'location_property_osmline', 'location_postcode'):
                 cur.execute("""CREATE TRIGGER {0}_update BEFORE UPDATE ON {0}
                                FOR EACH ROW EXECUTE PROCEDURE date_update()