]> git.openstreetmap.org Git - nominatim.git/commitdiff
bdd: move geoemtry creation into separate file
authorSarah Hoffmann <lonvia@denofr.de>
Mon, 4 Jan 2021 15:29:15 +0000 (16:29 +0100)
committerSarah Hoffmann <lonvia@denofr.de>
Mon, 4 Jan 2021 15:34:40 +0000 (16:34 +0100)
Also renames the OsmDataFactory in the more appropriate
GeometryFactory and modernizes code for python3.

test/bdd/environment.py
test/bdd/steps/geometry_factory.py [new file with mode: 0644]

index d95fea4a02ac239ef72a6ce998576e06b4f1676d..b6f6b986404d967190a42c221b089a1efd4e8517 100644 (file)
@@ -6,6 +6,8 @@ import psycopg2.extras
 import subprocess
 import tempfile
 
+from steps.geometry_factory import GeometryFactory
+
 logger = logging.getLogger(__name__)
 
 userconfig = {
@@ -209,83 +211,6 @@ class NominatimEnvironment(object):
         assert (proc.returncode == 0), "Script '%s' failed:\n%s\n%s\n" % (script, outp, outerr)
 
 
-class OSMDataFactory(object):
-
-    def __init__(self):
-        scriptpath = os.path.dirname(os.path.abspath(__file__))
-        self.scene_path = os.environ.get('SCENE_PATH',
-                           os.path.join(scriptpath, '..', 'scenes', 'data'))
-        self.scene_cache = {}
-        self.clear_grid()
-
-    def parse_geometry(self, geom, scene):
-        if geom.find(':') >= 0:
-            return "ST_SetSRID(%s, 4326)" % self.get_scene_geometry(scene, geom)
-
-        if geom.find(',') < 0:
-            out = "POINT(%s)" % self.mk_wkt_point(geom)
-        elif geom.find('(') < 0:
-            line = ','.join([self.mk_wkt_point(x) for x in geom.split(',')])
-            out = "LINESTRING(%s)" % line
-        else:
-            inner = geom.strip('() ')
-            line = ','.join([self.mk_wkt_point(x) for x in inner.split(',')])
-            out = "POLYGON((%s))" % line
-
-        return "ST_SetSRID('%s'::geometry, 4326)" % out
-
-    def mk_wkt_point(self, point):
-        geom = point.strip()
-        if geom.find(' ') >= 0:
-            return geom
-        else:
-            pt = self.grid_node(int(geom))
-            assert pt is not None, "Bad scenario: Point '{}' not found in grid".format(geom)
-            return "%f %f" % pt
-
-    def get_scene_geometry(self, default_scene, name):
-        geoms = []
-        for obj in name.split('+'):
-            oname = obj.strip()
-            if oname.startswith(':'):
-                assert default_scene is not None, "Bad scenario: You need to set a scene"
-                defscene = self.load_scene(default_scene)
-                wkt = defscene[oname[1:]]
-            else:
-                scene, obj = oname.split(':', 2)
-                scene_geoms = self.load_scene(scene)
-                wkt = scene_geoms[obj]
-
-            geoms.append("'%s'::geometry" % wkt)
-
-        if len(geoms) == 1:
-            return geoms[0]
-        else:
-            return 'ST_LineMerge(ST_Collect(ARRAY[%s]))' % ','.join(geoms)
-
-    def load_scene(self, name):
-        if name in self.scene_cache:
-            return self.scene_cache[name]
-
-        scene = {}
-        with open(os.path.join(self.scene_path, "%s.wkt" % name), 'r') as fd:
-            for line in fd:
-                if line.strip():
-                    obj, wkt = line.split('|', 2)
-                    scene[obj.strip()] = wkt.strip()
-            self.scene_cache[name] = scene
-
-        return scene
-
-    def clear_grid(self):
-        self.grid = {}
-
-    def add_grid_node(self, nodeid, x, y):
-        self.grid[nodeid] = (x, y)
-
-    def grid_node(self, nodeid):
-        return self.grid.get(nodeid)
-
 
 def before_all(context):
     # logging setup
@@ -296,7 +221,7 @@ def before_all(context):
     logging.debug('User config: %s' %(str(context.config.userdata)))
     # Nominatim test setup
     context.nominatim = NominatimEnvironment(context.config.userdata)
-    context.osm = OSMDataFactory()
+    context.osm = GeometryFactory()
 
 
 def before_scenario(context, scenario):
diff --git a/test/bdd/steps/geometry_factory.py b/test/bdd/steps/geometry_factory.py
new file mode 100644 (file)
index 0000000..7eedfc3
--- /dev/null
@@ -0,0 +1,113 @@
+from pathlib import Path
+import os
+
+class GeometryFactory:
+    """ Provides functions to create geometries from scenes and data grids.
+    """
+
+    def __init__(self):
+        defpath = Path(__file__) / '..' / '..' / '..' / 'scenes' / 'data'
+        self.scene_path = os.environ.get('SCENE_PATH', defpath.resolve())
+        self.scene_cache = {}
+        self.clear_grid()
+
+    def parse_geometry(self, geom, scene):
+        """ Create a WKT SQL term for the given geometry.
+            The function understands the following formats:
+
+              [<scene>]:<name>
+                 Geometry from a scene. If the scene is omitted, use the
+                 default scene.
+              <P>
+                 Point geometry
+              <P>,...,<P>
+                 Line geometry
+              (<P>,...,<P>)
+                 Polygon geometry
+
+           <P> may either be a coordinate of the form '<x> <y>' or a single
+           number. In the latter case it must refer to a point in
+           a previously defined grid.
+        """
+        if geom.find(':') >= 0:
+            return "ST_SetSRID({}, 4326)".format(self.get_scene_geometry(scene, geom))
+
+        if geom.find(',') < 0:
+            out = "POINT({})".format(self.mk_wkt_point(geom))
+        elif geom.find('(') < 0:
+            out = "LINESTRING({})".format(self.mk_wkt_points(geom))
+        else:
+            out = "POLYGON(({}))".format(self.mk_wkt_points(geom.strip('() ')))
+
+        return "ST_SetSRID('{}'::geometry, 4326)".format(out)
+
+    def mk_wkt_point(self, point):
+        """ Parse a point description.
+            The point may either consist of 'x y' cooordinates or a number
+            that refers to a grid setup.
+        """
+        geom = point.strip()
+        if geom.find(' ') >= 0:
+            return geom
+
+        try:
+            pt = self.grid_node(int(geom))
+        except ValueError:
+            assert False, "Scenario error: Point '{}' is not a number".format(geom)
+
+        assert pt is not None, "Scenario error: Point '{}' not found in grid".format(geom)
+        return "{} {}".format(*pt)
+
+    def mk_wkt_points(self, geom):
+        """ Parse a list of points.
+            The list must be a comma-separated list of points. Points
+            in coordinate and grid format may be mixed.
+        """
+        return ','.join([self.mk_wkt_point(x) for x in geom.split(',')])
+
+    def get_scene_geometry(self, default_scene, name):
+        """ Load the geometry from a scene.
+        """
+        geoms = []
+        for obj in name.split('+'):
+            oname = obj.strip()
+            if oname.startswith(':'):
+                assert default_scene is not None, "Scenario error: You need to set a scene"
+                defscene = self.load_scene(default_scene)
+                wkt = defscene[oname[1:]]
+            else:
+                scene, obj = oname.split(':', 2)
+                scene_geoms = self.load_scene(scene)
+                wkt = scene_geoms[obj]
+
+            geoms.append("'{}'::geometry".format(wkt))
+
+        if len(geoms) == 1:
+            return geoms[0]
+
+        return 'ST_LineMerge(ST_Collect(ARRAY[{}]))'.format(','.join(geoms))
+
+    def load_scene(self, name):
+        """ Load a scene from a file.
+        """
+        if name in self.scene_cache:
+            return self.scene_cache[name]
+
+        scene = {}
+        with open(Path(self.scene_path) / "{}.wkt".format(name), 'r') as fd:
+            for line in fd:
+                if line.strip():
+                    obj, wkt = line.split('|', 2)
+                    scene[obj.strip()] = wkt.strip()
+            self.scene_cache[name] = scene
+
+        return scene
+
+    def clear_grid(self):
+        self.grid = {}
+
+    def add_grid_node(self, nodeid, x, y):
+        self.grid[nodeid] = (x, y)
+
+    def grid_node(self, nodeid):
+        return self.grid.get(nodeid)