]> git.openstreetmap.org Git - nominatim.git/blob - tests/steps/api_result.py
Reverse geocode include geometry
[nominatim.git] / tests / steps / api_result.py
1 """ Steps for checking the results of queries.
2 """
3
4 from nose.tools import *
5 from lettuce import *
6 from tidylib import tidy_document
7 from collections import OrderedDict
8 import json
9 import logging
10 import re
11 from xml.dom.minidom import parseString
12
13 logger = logging.getLogger(__name__)
14
15 def _parse_xml():
16     """ Puts the DOM structure into more convenient python
17         with a similar structure as the json document, so
18         that the same the semantics can be used. It does not
19         check if the content is valid (or at least not more than
20         necessary to transform it into a dict structure).
21     """
22     page = parseString(world.page).documentElement
23
24     # header info
25     world.result_header = OrderedDict(page.attributes.items())
26     logger.debug('Result header: %r' % (world.result_header))
27     world.results = []
28
29     # results
30     if page.nodeName == 'searchresults' or page.nodeName == 'lookupresults':
31         for node in page.childNodes:
32             if node.nodeName != "#text":
33                 assert_equals(node.nodeName, 'place', msg="Unexpected element '%s'" % node.nodeName)
34                 newresult = OrderedDict(node.attributes.items())
35                 assert_not_in('address', newresult)
36                 assert_not_in('geokml', newresult)
37                 assert_not_in('extratags', newresult)
38                 assert_not_in('namedetails', newresult)
39                 address = OrderedDict()
40                 for sub in node.childNodes:
41                     if sub.nodeName == 'geokml':
42                         newresult['geokml'] = sub.childNodes[0].toxml()
43                     elif sub.nodeName == 'extratags':
44                         newresult['extratags'] = {}
45                         for tag in sub.childNodes:
46                             assert_equals(tag.nodeName, 'tag')
47                             attrs = dict(tag.attributes.items())
48                             assert_in('key', attrs)
49                             assert_in('value', attrs)
50                             newresult['extratags'][attrs['key']] = attrs['value']
51                     elif sub.nodeName == 'namedetails':
52                         newresult['namedetails'] = {}
53                         for tag in sub.childNodes:
54                             assert_equals(tag.nodeName, 'name')
55                             attrs = dict(tag.attributes.items())
56                             assert_in('desc', attrs)
57                             newresult['namedetails'][attrs['desc']] = tag.firstChild.nodeValue.strip()
58
59                     elif sub.nodeName == '#text':
60                         pass
61                     else:
62                         address[sub.nodeName] = sub.firstChild.nodeValue.strip()
63                 if address:
64                     newresult['address'] = address
65                 world.results.append(newresult)
66     elif page.nodeName == 'reversegeocode':
67         haserror = False
68         address = {}
69         for node in page.childNodes:
70             if node.nodeName == 'result':
71                 assert_equals(len(world.results), 0)
72                 assert (not haserror)
73                 world.results.append(OrderedDict(node.attributes.items()))
74                 assert_not_in('display_name', world.results[0])
75                 assert_not_in('address', world.results[0])
76                 world.results[0]['display_name'] = node.firstChild.nodeValue.strip()
77             elif node.nodeName == 'error':
78                 assert_equals(len(world.results), 0)
79                 haserror = True
80             elif node.nodeName == 'addressparts':
81                 assert (not haserror)
82                 address = OrderedDict()
83                 for sub in node.childNodes:
84                     address[sub.nodeName] = sub.firstChild.nodeValue.strip()
85                 world.results[0]['address'] = address
86             elif node.nodeName == 'extratags':
87                 world.results[0]['extratags'] = {}
88                 for tag in node.childNodes:
89                     assert_equals(tag.nodeName, 'tag')
90                     attrs = dict(tag.attributes.items())
91                     assert_in('key', attrs)
92                     assert_in('value', attrs)
93                     world.results[0]['extratags'][attrs['key']] = attrs['value']
94             elif node.nodeName == 'namedetails':
95                 world.results[0]['namedetails'] = {}
96                 for tag in node.childNodes:
97                     assert_equals(tag.nodeName, 'name')
98                     attrs = dict(tag.attributes.items())
99                     assert_in('desc', attrs)
100                     world.results[0]['namedetails'][attrs['desc']] = tag.firstChild.nodeValue.strip()
101             elif node.nodeName == "geokml":
102                 world.results[0]['geokml'] = node
103             elif node.nodeName == "#text":
104                 pass
105             else:
106                 assert False, "Unknown content '%s' in XML" % node.nodeName
107     else:
108         assert False, "Unknown document node name %s in XML" % page.nodeName
109
110     logger.debug("The following was parsed out of XML:")
111     logger.debug(world.results)
112
113 @step(u'a HTTP (\d+) is returned')
114 def api_result_http_error(step, error):
115     assert_equals(world.returncode, int(error))
116
117 @step(u'the result is valid( \w+)?')
118 def api_result_is_valid(step, fmt):
119     assert_equals(world.returncode, 200)
120
121     if world.response_format == 'html':
122         document, errors = tidy_document(world.page, 
123                              options={'char-encoding' : 'utf8'})
124         # assert(len(errors) == 0), "Errors found in HTML document:\n%s" % errors
125         world.results = document
126     elif world.response_format == 'xml':
127         _parse_xml()
128     elif world.response_format == 'json':
129         world.results = json.JSONDecoder(object_pairs_hook=OrderedDict).decode(world.page)
130         if world.request_type == 'reverse':
131             world.results = (world.results,)
132     else:
133         assert False, "Unknown page format: %s" % (world.response_format)
134
135     if fmt:
136         assert_equals (fmt.strip(), world.response_format)
137
138
139 def compare(operator, op1, op2):
140     if operator == 'less than':
141         return op1 < op2
142     elif operator == 'more than':
143         return op1 > op2
144     elif operator == 'exactly':
145         return op1 == op2
146     elif operator == 'at least':
147         return op1 >= op2
148     elif operator == 'at most':
149         return op1 <= op2
150     else:
151         raise Exception("unknown operator '%s'" % operator)
152
153 @step(u'(less than|more than|exactly|at least|at most) (\d+) results? (?:is|are) returned')
154 def validate_result_number(step, operator, number):
155     step.given('the result is valid')
156     numres = len(world.results)
157     assert compare(operator, numres, int(number)), \
158         "Bad number of results: expected %s %s, got %d." % (operator, number, numres)
159
160 @step(u'result (\d+) has( not)? attributes (\S+)')
161 def search_check_for_result_attribute(step, num, invalid, attrs):
162     num = int(num)
163     step.given('at least %d results are returned' % (num + 1))
164     res = world.results[num]
165     for attr in attrs.split(','):
166         if invalid:
167             assert_not_in(attr.strip(), res)
168         else:
169             assert_in(attr.strip(),res)
170         
171 @step(u'there is a json wrapper "([^"]*)"')
172 def api_result_check_json_wrapper(step, wrapper):
173     step.given('the result is valid json')
174     assert_equals(world.json_callback, wrapper)
175
176 @step(u'result header contains')
177 def api_result_header_contains(step):
178     step.given('the result is valid')
179     for line in step.hashes:
180         assert_in(line['attr'], world.result_header)
181         m = re.match("%s$" % (line['value'],), world.result_header[line['attr']])
182
183 @step(u'result header has no attribute (.*)')
184 def api_result_header_contains_not(step, attr):
185     step.given('the result is valid')
186     assert_not_in(attr, world.result_header)
187
188 @step(u'results contain$')
189 def api_result_contains(step):
190     step.given('at least 1 result is returned')
191     for line in step.hashes:
192         if 'ID' in line:
193             reslist = (world.results[int(line['ID'])],)
194         else:
195             reslist = world.results
196         for k,v in line.iteritems():
197             if k == 'latlon':
198                 for curres in reslist:
199                     world.match_geometry((float(curres['lat']), float(curres['lon'])), v)
200             elif k != 'ID':
201                 for curres in reslist:
202                     assert_in(k, curres)
203                     if v[0] in '<>=':
204                         # mathematical operation
205                         evalexp = '%s %s' % (curres[k], v)
206                         res = eval(evalexp)
207                         logger.debug('Evaluating: %s = %s' % (res, evalexp))
208                         assert_true(res, "Evaluation failed: %s" % (evalexp, ))
209                     else:
210                         # regex match
211                         m = re.match("%s$" % (v,), curres[k])
212                         assert_is_not_none(m, msg="field %s does not match: %s$ != %s." % (k, v, curres[k]))
213
214
215 @step(u'result addresses contain$')
216 def api_result_address_contains(step):
217     step.given('the result is valid')
218     for line in step.hashes:
219         if 'ID' in line:
220             reslist = (world.results[int(line['ID'])],)
221         else:
222             reslist = world.results
223         for k,v in line.iteritems():
224             if k != 'ID':
225                 for res in reslist:
226                     curres = res['address']
227                     assert_in(k, curres)
228                     m = re.match("%s$" % (v,), curres[k])
229                     assert_is_not_none(m, msg="field %s does not match: %s$ != %s." % (k, v, curres[k]))
230
231
232 @step(u'address of result (\d+) contains')
233 def api_result_address_exact(step, resid):
234     resid = int(resid)
235     step.given('at least %d results are returned' % (resid + 1))
236     addr = world.results[resid]['address']
237     for line in step.hashes:
238         assert_in(line['type'], addr)
239         m = re.match("%s$" % line['value'], addr[line['type']])
240         assert_is_not_none(m, msg="field %s does not match: %s$ != %s." % (
241                                   line['type'], line['value'], addr[line['type']]))
242         #assert_equals(line['value'], addr[line['type']])
243
244 @step(u'address of result (\d+) does not contain (.*)')
245 def api_result_address_details_missing(step, resid, types):
246     resid = int(resid)
247     step.given('at least %d results are returned' % (resid + 1))
248     addr = world.results[resid]['address']
249     for t in types.split(','):
250         assert_not_in(t.strip(), addr)
251
252
253 @step(u'address of result (\d+) is')
254 def api_result_address_exact(step, resid):
255     resid = int(resid)
256     step.given('at least %d results are returned' % (resid + 1))
257     result = world.results[resid]
258     linenr = 0
259     assert_equals(len(step.hashes), len(result['address']))
260     for k,v in result['address'].iteritems():
261         assert_equals(step.hashes[linenr]['type'], k)
262         assert_equals(step.hashes[linenr]['value'], v)
263         linenr += 1
264
265
266 @step('there are( no)? duplicates')
267 def api_result_check_for_duplicates(step, nodups=None):
268     step.given('at least 1 result is returned')
269     resarr = []
270     for res in world.results:
271         resarr.append((res['osm_type'], res['class'],
272                         res['type'], res['display_name']))
273
274     if nodups is None:
275         assert len(resarr) > len(set(resarr))
276     else:
277         assert_equal(len(resarr), len(set(resarr)))