]> git.openstreetmap.org Git - nominatim.git/blob - tests/steps/db_results.py
fix address interpolation for self-intersecting ways
[nominatim.git] / tests / steps / db_results.py
1 """ Steps for checking the DB after import and update tests.
2
3     There are two groups of test here. The first group tests
4     the contents of db tables directly, the second checks
5     query results by using the command line query tool.
6 """
7
8 from nose.tools import *
9 from lettuce import *
10 import psycopg2
11 import psycopg2.extensions
12 import psycopg2.extras
13 import os
14 import subprocess
15 import random
16 import json
17 import re
18 import logging
19 from collections import OrderedDict
20
21 logger = logging.getLogger(__name__)
22
23 @step(u'table placex contains as names for (N|R|W)(\d+)')
24 def check_placex_names(step, osmtyp, osmid):
25     """ Check for the exact content of the name hstaore in placex.
26     """
27     cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
28     cur.execute('SELECT name FROM placex where osm_type = %s and osm_id =%s', (osmtyp, int(osmid)))
29     for line in cur:
30         names = dict(line['name'])
31         for name in step.hashes:
32             assert_in(name['k'], names)
33             assert_equals(names[name['k']], name['v'])
34             del names[name['k']]
35         assert_equals(len(names), 0)
36
37
38
39 @step(u'table ([a-z_]+) contains$')
40 def check_placex_content(step, tablename):
41     """ check that the given lines are in the given table
42         Entries are searched by osm_type/osm_id and then all
43         given columns are tested. If there is more than one
44         line for an OSM object, they must match in these columns.
45     """
46     cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
47     for line in step.hashes:
48         osmtype, osmid, cls = world.split_id(line['object'])
49         q = 'SELECT *'
50         if tablename == 'placex':
51             q = q + ", ST_X(centroid) as clat, ST_Y(centroid) as clon"
52         q = q + ", ST_GeometryType(geometry) as geometrytype"
53         q = q + ' FROM %s where osm_type = %%s and osm_id = %%s' % (tablename,)
54         if cls is None:
55             params = (osmtype, osmid)
56         else:
57             q = q + ' and class = %s'
58             params = (osmtype, osmid, cls)
59         cur.execute(q, params)
60         assert(cur.rowcount > 0)
61         for res in cur:
62             for k,v in line.iteritems():
63                 if not k == 'object':
64                     assert_in(k, res)
65                     if type(res[k]) is dict:
66                         val = world.make_hash(v)
67                         assert_equals(res[k], val)
68                     elif k in ('parent_place_id', 'linked_place_id'):
69                         pid = world.get_placeid(v)
70                         assert_equals(pid, res[k], "Results for '%s'/'%s' differ: '%s' != '%s'" % (line['object'], k, pid, res[k]))
71                     elif k == 'centroid':
72                         world.match_geometry((res['clat'], res['clon']), v)
73                     else:
74                         assert_equals(str(res[k]), v, "Results for '%s'/'%s' differ: '%s' != '%s'" % (line['object'], k, str(res[k]), v))
75
76 @step(u'table (placex?) has no entry for (N|R|W)(\d+)(:\w+)?')
77 def check_placex_missing(step, tablename, osmtyp, osmid, placeclass):
78     cur = world.conn.cursor()
79     q = 'SELECT count(*) FROM %s where osm_type = %%s and osm_id = %%s' % (tablename, )
80     args = [osmtyp, int(osmid)]
81     if placeclass is not None:
82         q = q + ' and class = %s'
83         args.append(placeclass[1:])
84     cur.execute(q, args)
85     numres = cur.fetchone()[0]
86     assert_equals (numres, 0)
87
88 @step(u'search_name table contains$')
89 def check_search_name_content(step):
90     cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
91     for line in step.hashes:
92         placeid = world.get_placeid(line['place_id'])
93         cur.execute('SELECT * FROM search_name WHERE place_id = %s', (placeid,))
94         assert(cur.rowcount > 0)
95         for res in cur:
96             for k,v in line.iteritems():
97                 if k in ('search_rank', 'address_rank'):
98                     assert_equals(int(v), res[k], "Results for '%s'/'%s' differ: '%s' != '%d'" % (line['place_id'], k, v, res[k]))
99                 elif k in ('importance'):
100                     assert_equals(float(v), res[k], "Results for '%s'/'%s' differ: '%s' != '%d'" % (line['place_id'], k, v, res[k]))
101                 elif k in ('name_vector', 'nameaddress_vector'):
102                     terms = [x.strip().replace('#', ' ') for x in v.split(',')]
103                     cur.execute('SELECT word_id, word_token FROM word, (SELECT unnest(%s) as term) t WHERE word_token = make_standard_name(t.term)', (terms,))
104                     assert cur.rowcount >= len(terms)
105                     for wid in cur:
106                         assert_in(wid['word_id'], res[k], "Missing term for %s/%s: %s" % (line['place_id'], k, wid['word_token']))
107                 elif k in ('country_code'):
108                     assert_equals(v, res[k], "Results for '%s'/'%s' differ: '%s' != '%d'" % (line['place_id'], k, v, res[k]))
109                 elif k == 'place_id':
110                     pass
111                 else:
112                     raise Exception("Cannot handle field %s in search_name table" % (k, ))
113
114 @step(u'node (\d+) expands to housenumbers')
115 def check_interpolated_housenumbers(step, nodeid):
116     """Check that the exact set of housenumbers has been entered in
117        placex for the given source node. Expected are tow columns:
118        housenumber and centroid
119     """
120     numbers = {}
121     for line in step.hashes:
122         assert line["housenumber"] not in numbers
123         numbers[line["housenumber"]] = line["centroid"]
124     cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
125     cur.execute("""SELECT DISTINCT housenumber,
126                           ST_X(centroid) as clat, ST_Y(centroid) as clon
127                    FROM placex WHERE osm_type = 'N' and osm_id = %s""",
128                    (int(nodeid),))
129     assert_equals(len(numbers), cur.rowcount)
130     for r in cur:
131         assert_in(r["housenumber"], numbers)
132         world.match_geometry((r['clat'], r['clon']), numbers[r["housenumber"]])
133         del numbers[r["housenumber"]]
134
135
136 @step(u'table search_name has no entry for (.*)')
137 def check_placex_missing(step, osmid):
138     """ Checks if there is an entry in the search index for the
139         given place object.
140     """
141     cur = world.conn.cursor()
142     placeid = world.get_placeid(osmid)
143     cur.execute('SELECT count(*) FROM search_name WHERE place_id =%s', (placeid,))
144     numres = cur.fetchone()[0]
145     assert_equals (numres, 0)
146