]> git.openstreetmap.org Git - nominatim.git/blob - tests/steps/api_result.py
add mention of required Tiger files for test database
[nominatim.git] / tests / steps / api_result.py
1 """ Steps for checking the results of queries.
2 """
3
4 from nose.tools import *
5 from lettuce import *
6 from tidylib import tidy_document
7 from collections import OrderedDict
8 import json
9 import logging
10 import re
11 from xml.dom.minidom import parseString
12
13 logger = logging.getLogger(__name__)
14
15 def _parse_xml():
16     """ Puts the DOM structure into more convenient python
17         with a similar structure as the json document, so
18         that the same the semantics can be used. It does not
19         check if the content is valid (or at least not more than
20         necessary to transform it into a dict structure).
21     """
22     page = parseString(world.page).documentElement
23
24     # header info
25     world.result_header = OrderedDict(page.attributes.items())
26     logger.debug('Result header: %r' % (world.result_header))
27     world.results = []
28
29     # results
30     if page.nodeName == 'searchresults' or page.nodeName == 'lookupresults':
31         for node in page.childNodes:
32             if node.nodeName != "#text":
33                 assert_equals(node.nodeName, 'place', msg="Unexpected element '%s'" % node.nodeName)
34                 newresult = OrderedDict(node.attributes.items())
35                 assert_not_in('address', newresult)
36                 assert_not_in('geokml', newresult)
37                 assert_not_in('extratags', newresult)
38                 assert_not_in('namedetails', newresult)
39                 address = OrderedDict()
40                 for sub in node.childNodes:
41                     if sub.nodeName == 'geokml':
42                         newresult['geokml'] = sub.childNodes[0].toxml()
43                     elif sub.nodeName == 'extratags':
44                         newresult['extratags'] = {}
45                         for tag in sub.childNodes:
46                             assert_equals(tag.nodeName, 'tag')
47                             attrs = dict(tag.attributes.items())
48                             assert_in('key', attrs)
49                             assert_in('value', attrs)
50                             newresult['extratags'][attrs['key']] = attrs['value']
51                     elif sub.nodeName == 'namedetails':
52                         newresult['namedetails'] = {}
53                         for tag in sub.childNodes:
54                             assert_equals(tag.nodeName, 'name')
55                             attrs = dict(tag.attributes.items())
56                             assert_in('desc', attrs)
57                             newresult['namedetails'][attrs['desc']] = tag.firstChild.nodeValue.strip()
58
59                     elif sub.nodeName == '#text':
60                         pass
61                     else:
62                         address[sub.nodeName] = sub.firstChild.nodeValue.strip()
63                 if address:
64                     newresult['address'] = address
65                 world.results.append(newresult)
66     elif page.nodeName == 'reversegeocode':
67         haserror = False
68         address = {}
69         for node in page.childNodes:
70             if node.nodeName == 'result':
71                 assert_equals(len(world.results), 0)
72                 assert (not haserror)
73                 world.results.append(OrderedDict(node.attributes.items()))
74                 assert_not_in('display_name', world.results[0])
75                 assert_not_in('address', world.results[0])
76                 world.results[0]['display_name'] = node.firstChild.nodeValue.strip()
77             elif node.nodeName == 'error':
78                 assert_equals(len(world.results), 0)
79                 haserror = True
80             elif node.nodeName == 'addressparts':
81                 assert (not haserror)
82                 address = OrderedDict()
83                 for sub in node.childNodes:
84                     address[sub.nodeName] = sub.firstChild.nodeValue.strip()
85                 world.results[0]['address'] = address
86             elif node.nodeName == 'extratags':
87                 world.results[0]['extratags'] = {}
88                 for tag in node.childNodes:
89                     assert_equals(tag.nodeName, 'tag')
90                     attrs = dict(tag.attributes.items())
91                     assert_in('key', attrs)
92                     assert_in('value', attrs)
93                     world.results[0]['extratags'][attrs['key']] = attrs['value']
94             elif node.nodeName == 'namedetails':
95                 world.results[0]['namedetails'] = {}
96                 for tag in node.childNodes:
97                     assert_equals(tag.nodeName, 'name')
98                     attrs = dict(tag.attributes.items())
99                     assert_in('desc', attrs)
100                     world.results[0]['namedetails'][attrs['desc']] = tag.firstChild.nodeValue.strip()
101             elif node.nodeName == "geokml":
102                 world.results[0]['geokml'] = node
103             elif node.nodeName == "#text":
104                 pass
105             else:
106                 assert False, "Unknown content '%s' in XML" % node.nodeName
107     else:
108         assert False, "Unknown document node name %s in XML" % page.nodeName
109
110     logger.debug("The following was parsed out of XML:")
111     logger.debug(world.results)
112
113 @step(u'a HTTP (\d+) is returned')
114 def api_result_http_error(step, error):
115     assert_equals(world.returncode, int(error))
116
117 @step(u'the result is valid( \w+)?')
118 def api_result_is_valid(step, fmt):
119     assert_equals(world.returncode, 200)
120
121     if world.response_format == 'html':
122         document, errors = tidy_document(world.page, 
123                              options={'char-encoding' : 'utf8'})
124         # assert(len(errors) == 0), "Errors found in HTML document:\n%s" % errors
125         world.results = document
126     elif world.response_format == 'xml':
127         _parse_xml()
128     elif world.response_format == 'json':
129         world.results = json.JSONDecoder(object_pairs_hook=OrderedDict).decode(world.page)
130         if world.request_type == 'reverse':
131             world.results = (world.results,)
132     else:
133         assert False, "Unknown page format: %s" % (world.response_format)
134
135     if fmt:
136         assert_equals (fmt.strip(), world.response_format)
137
138
139 def compare(operator, op1, op2):
140     if operator == 'less than':
141         return op1 < op2
142     elif operator == 'more than':
143         return op1 > op2
144     elif operator == 'exactly':
145         return op1 == op2
146     elif operator == 'at least':
147         return op1 >= op2
148     elif operator == 'at most':
149         return op1 <= op2
150     else:
151         raise Exception("unknown operator '%s'" % operator)
152
153 @step(u'(less than|more than|exactly|at least|at most) (\d+) results? (?:is|are) returned')
154 def validate_result_number(step, operator, number):
155     step.given('the result is valid')
156     numres = len(world.results)
157     assert compare(operator, numres, int(number)), \
158         "Bad number of results: expected %s %s, got %d." % (operator, number, numres)
159
160 @step(u'result (\d+) has( not)? attributes (\S+)')
161 def search_check_for_result_attribute(step, num, invalid, attrs):
162     num = int(num)
163     step.given('at least %d results are returned' % (num + 1))
164     res = world.results[num]
165     for attr in attrs.split(','):
166         if invalid:
167             assert_not_in(attr.strip(), res)
168         else:
169             assert_in(attr.strip(),res)
170         
171 @step(u'there is a json wrapper "([^"]*)"')
172 def api_result_check_json_wrapper(step, wrapper):
173     step.given('the result is valid json')
174     assert_equals(world.json_callback, wrapper)
175
176 @step(u'result header contains')
177 def api_result_header_contains(step):
178     step.given('the result is valid')
179     for line in step.hashes:
180         assert_in(line['attr'], world.result_header)
181         m = re.match("%s$" % (line['value'],), world.result_header[line['attr']])
182
183 @step(u'result header has no attribute (.*)')
184 def api_result_header_contains_not(step, attr):
185     step.given('the result is valid')
186     assert_not_in(attr, world.result_header)
187
188 @step(u'results contain$')
189 def api_result_contains(step):
190     step.given('at least 1 result is returned')
191     for line in step.hashes:
192         if 'ID' in line:
193             reslist = (world.results[int(line['ID'])],)
194         else:
195             reslist = world.results
196         for k,v in line.iteritems():
197             if k == 'latlon':
198                 for curres in reslist:
199                     world.match_geometry((float(curres['lat']), float(curres['lon'])), v)
200             elif k != 'ID':
201                 for curres in reslist:
202                     assert_in(k, curres)
203                     if v[0] in '<>=':
204                         # mathematical operation
205                         evalexp = '%s %s' % (curres[k], v)
206                         res = eval(evalexp)
207                         logger.debug('Evaluating: %s = %s' % (res, evalexp))
208                         assert_true(res, "Evaluation failed: %s" % (evalexp, ))
209                     else:
210                         # regex match
211                         m = re.match("%s$" % (v,), curres[k])
212                         assert_is_not_none(m, msg="field %s does not match: %s$ != %s." % (k, v, curres[k]))
213
214 @step(u'results contain valid boundingboxes$')
215 def api_result_address_contains(step):
216     step.given('the result is valid')
217     for curres in world.results:
218         bb = curres['boundingbox']
219         if world.response_format == 'json':
220             bb = ','.join(bb)
221         m = re.match('^(-?\d+\.\d+),(-?\d+\.\d+),(-?\d+\.\d+),(-?\d+\.\d+)$', bb)
222         assert_is_not_none(m, msg="invalid boundingbox: %s." % (curres['boundingbox']))
223
224 @step(u'result addresses contain$')
225 def api_result_address_contains(step):
226     step.given('the result is valid')
227     for line in step.hashes:
228         if 'ID' in line:
229             reslist = (world.results[int(line['ID'])],)
230         else:
231             reslist = world.results
232         for k,v in line.iteritems():
233             if k != 'ID':
234                 for res in reslist:
235                     curres = res['address']
236                     assert_in(k, curres)
237                     m = re.match("%s$" % (v,), curres[k])
238                     assert_is_not_none(m, msg="field %s does not match: %s$ != %s." % (k, v, curres[k]))
239
240
241 @step(u'address of result (\d+) contains')
242 def api_result_address_exact(step, resid):
243     resid = int(resid)
244     step.given('at least %d results are returned' % (resid + 1))
245     addr = world.results[resid]['address']
246     for line in step.hashes:
247         assert_in(line['type'], addr)
248         m = re.match("%s$" % line['value'], addr[line['type']])
249         assert_is_not_none(m, msg="field %s does not match: %s$ != %s." % (
250                                   line['type'], line['value'], addr[line['type']]))
251         #assert_equals(line['value'], addr[line['type']])
252
253 @step(u'address of result (\d+) does not contain (.*)')
254 def api_result_address_details_missing(step, resid, types):
255     resid = int(resid)
256     step.given('at least %d results are returned' % (resid + 1))
257     addr = world.results[resid]['address']
258     for t in types.split(','):
259         assert_not_in(t.strip(), addr)
260
261
262 @step(u'address of result (\d+) is')
263 def api_result_address_exact(step, resid):
264     resid = int(resid)
265     step.given('at least %d results are returned' % (resid + 1))
266     result = world.results[resid]
267     linenr = 0
268     assert_equals(len(step.hashes), len(result['address']))
269     for k,v in result['address'].iteritems():
270         assert_equals(step.hashes[linenr]['type'], k)
271         assert_equals(step.hashes[linenr]['value'], v)
272         linenr += 1
273
274
275 @step('there are( no)? duplicates')
276 def api_result_check_for_duplicates(step, nodups=None):
277     step.given('at least 1 result is returned')
278     resarr = []
279     for res in world.results:
280         resarr.append((res['osm_type'], res['class'],
281                         res['type'], res['display_name']))
282
283     if nodups is None:
284         assert len(resarr) > len(set(resarr))
285     else:
286         assert_equal(len(resarr), len(set(resarr)))