1 """ Steps for checking the results of queries.
 
   4 from nose.tools import *
 
   6 from tidylib import tidy_document
 
   7 from collections import OrderedDict
 
  11 from xml.dom.minidom import parseString
 
  13 logger = logging.getLogger(__name__)
 
  16     """ Puts the DOM structure into more convenient python
 
  17         with a similar structure as the json document, so
 
  18         that the same the semantics can be used. It does not
 
  19         check if the content is valid (or at least not more than
 
  20         necessary to transform it into a dict structure).
 
  22     page = parseString(world.page).documentElement
 
  25     world.result_header = OrderedDict(page.attributes.items())
 
  26     logger.debug('Result header: %r' % (world.result_header))
 
  30     if page.nodeName == 'searchresults' or page.nodeName == 'lookupresults':
 
  31         for node in page.childNodes:
 
  32             if node.nodeName != "#text":
 
  33                 assert_equals(node.nodeName, 'place', msg="Unexpected element '%s'" % node.nodeName)
 
  34                 newresult = OrderedDict(node.attributes.items())
 
  35                 assert_not_in('address', newresult)
 
  36                 assert_not_in('geokml', newresult)
 
  37                 assert_not_in('extratags', newresult)
 
  38                 assert_not_in('namedetails', newresult)
 
  39                 address = OrderedDict()
 
  40                 for sub in node.childNodes:
 
  41                     if sub.nodeName == 'geokml':
 
  42                         newresult['geokml'] = sub.childNodes[0].toxml()
 
  43                     elif sub.nodeName == 'extratags':
 
  44                         newresult['extratags'] = {}
 
  45                         for tag in sub.childNodes:
 
  46                             assert_equals(tag.nodeName, 'tag')
 
  47                             attrs = dict(tag.attributes.items())
 
  48                             assert_in('key', attrs)
 
  49                             assert_in('value', attrs)
 
  50                             newresult['extratags'][attrs['key']] = attrs['value']
 
  51                     elif sub.nodeName == 'namedetails':
 
  52                         newresult['namedetails'] = {}
 
  53                         for tag in sub.childNodes:
 
  54                             assert_equals(tag.nodeName, 'name')
 
  55                             attrs = dict(tag.attributes.items())
 
  56                             assert_in('desc', attrs)
 
  57                             newresult['namedetails'][attrs['desc']] = tag.firstChild.nodeValue.strip()
 
  59                     elif sub.nodeName == '#text':
 
  62                         address[sub.nodeName] = sub.firstChild.nodeValue.strip()
 
  64                     newresult['address'] = address
 
  65                 world.results.append(newresult)
 
  66     elif page.nodeName == 'reversegeocode':
 
  69         for node in page.childNodes:
 
  70             if node.nodeName == 'result':
 
  71                 assert_equals(len(world.results), 0)
 
  73                 world.results.append(OrderedDict(node.attributes.items()))
 
  74                 assert_not_in('display_name', world.results[0])
 
  75                 assert_not_in('address', world.results[0])
 
  76                 world.results[0]['display_name'] = node.firstChild.nodeValue.strip()
 
  77             elif node.nodeName == 'error':
 
  78                 assert_equals(len(world.results), 0)
 
  80             elif node.nodeName == 'addressparts':
 
  82                 address = OrderedDict()
 
  83                 for sub in node.childNodes:
 
  84                     address[sub.nodeName] = sub.firstChild.nodeValue.strip()
 
  85                 world.results[0]['address'] = address
 
  86             elif node.nodeName == 'extratags':
 
  87                 world.results[0]['extratags'] = {}
 
  88                 for tag in node.childNodes:
 
  89                     assert_equals(tag.nodeName, 'tag')
 
  90                     attrs = dict(tag.attributes.items())
 
  91                     assert_in('key', attrs)
 
  92                     assert_in('value', attrs)
 
  93                     world.results[0]['extratags'][attrs['key']] = attrs['value']
 
  94             elif node.nodeName == 'namedetails':
 
  95                 world.results[0]['namedetails'] = {}
 
  96                 for tag in node.childNodes:
 
  97                     assert_equals(tag.nodeName, 'name')
 
  98                     attrs = dict(tag.attributes.items())
 
  99                     assert_in('desc', attrs)
 
 100                     world.results[0]['namedetails'][attrs['desc']] = tag.firstChild.nodeValue.strip()
 
 101             elif node.nodeName == "geokml":
 
 102                 world.results[0]['geokml'] = node
 
 103             elif node.nodeName == "#text":
 
 106                 assert False, "Unknown content '%s' in XML" % node.nodeName
 
 108         assert False, "Unknown document node name %s in XML" % page.nodeName
 
 110     logger.debug("The following was parsed out of XML:")
 
 111     logger.debug(world.results)
 
 113 @step(u'a HTTP (\d+) is returned')
 
 114 def api_result_http_error(step, error):
 
 115     assert_equals(world.returncode, int(error))
 
 117 @step(u'the result is valid( \w+)?')
 
 118 def api_result_is_valid(step, fmt):
 
 119     assert_equals(world.returncode, 200)
 
 121     if world.response_format == 'html':
 
 122         document, errors = tidy_document(world.page, 
 
 123                              options={'char-encoding' : 'utf8'})
 
 124         # assert(len(errors) == 0), "Errors found in HTML document:\n%s" % errors
 
 125         world.results = document
 
 126     elif world.response_format == 'xml':
 
 128     elif world.response_format == 'json':
 
 129         world.results = json.JSONDecoder(object_pairs_hook=OrderedDict).decode(world.page)
 
 130         if world.request_type == 'reverse':
 
 131             world.results = (world.results,)
 
 133         assert False, "Unknown page format: %s" % (world.response_format)
 
 136         assert_equals (fmt.strip(), world.response_format)
 
 139 def compare(operator, op1, op2):
 
 140     if operator == 'less than':
 
 142     elif operator == 'more than':
 
 144     elif operator == 'exactly':
 
 146     elif operator == 'at least':
 
 148     elif operator == 'at most':
 
 151         raise Exception("unknown operator '%s'" % operator)
 
 153 @step(u'(less than|more than|exactly|at least|at most) (\d+) results? (?:is|are) returned')
 
 154 def validate_result_number(step, operator, number):
 
 155     step.given('the result is valid')
 
 156     numres = len(world.results)
 
 157     assert compare(operator, numres, int(number)), \
 
 158         "Bad number of results: expected %s %s, got %d." % (operator, number, numres)
 
 160 @step(u'result (\d+) has( not)? attributes (\S+)')
 
 161 def search_check_for_result_attribute(step, num, invalid, attrs):
 
 163     step.given('at least %d results are returned' % (num + 1))
 
 164     res = world.results[num]
 
 165     for attr in attrs.split(','):
 
 167             assert_not_in(attr.strip(), res)
 
 169             assert_in(attr.strip(),res)
 
 171 @step(u'there is a json wrapper "([^"]*)"')
 
 172 def api_result_check_json_wrapper(step, wrapper):
 
 173     step.given('the result is valid json')
 
 174     assert_equals(world.json_callback, wrapper)
 
 176 @step(u'result header contains')
 
 177 def api_result_header_contains(step):
 
 178     step.given('the result is valid')
 
 179     for line in step.hashes:
 
 180         assert_in(line['attr'], world.result_header)
 
 181         m = re.match("%s$" % (line['value'],), world.result_header[line['attr']])
 
 183 @step(u'result header has no attribute (.*)')
 
 184 def api_result_header_contains_not(step, attr):
 
 185     step.given('the result is valid')
 
 186     assert_not_in(attr, world.result_header)
 
 188 @step(u'results contain$')
 
 189 def api_result_contains(step):
 
 190     step.given('at least 1 result is returned')
 
 191     for line in step.hashes:
 
 193             reslist = (world.results[int(line['ID'])],)
 
 195             reslist = world.results
 
 196         for k,v in line.iteritems():
 
 198                 for curres in reslist:
 
 199                     world.match_geometry((float(curres['lat']), float(curres['lon'])), v)
 
 201                 for curres in reslist:
 
 204                         # mathematical operation
 
 205                         evalexp = '%s %s' % (curres[k], v)
 
 207                         logger.debug('Evaluating: %s = %s' % (res, evalexp))
 
 208                         assert_true(res, "Evaluation failed: %s" % (evalexp, ))
 
 211                         m = re.match("%s$" % (v,), curres[k])
 
 212                         assert_is_not_none(m, msg="field %s does not match: %s$ != %s." % (k, v, curres[k]))
 
 214 @step(u'results contain valid boundingboxes$')
 
 215 def api_result_address_contains(step):
 
 216     step.given('the result is valid')
 
 217     for curres in world.results:
 
 218         bb = curres['boundingbox']
 
 219         if world.response_format == 'json':
 
 221         m = re.match('^(-?\d+\.\d+),(-?\d+\.\d+),(-?\d+\.\d+),(-?\d+\.\d+)$', bb)
 
 222         assert_is_not_none(m, msg="invalid boundingbox: %s." % (curres['boundingbox']))
 
 224 @step(u'result addresses contain$')
 
 225 def api_result_address_contains(step):
 
 226     step.given('the result is valid')
 
 227     for line in step.hashes:
 
 229             reslist = (world.results[int(line['ID'])],)
 
 231             reslist = world.results
 
 232         for k,v in line.iteritems():
 
 235                     curres = res['address']
 
 237                     m = re.match("%s$" % (v,), curres[k])
 
 238                     assert_is_not_none(m, msg="field %s does not match: %s$ != %s." % (k, v, curres[k]))
 
 241 @step(u'address of result (\d+) contains')
 
 242 def api_result_address_exact(step, resid):
 
 244     step.given('at least %d results are returned' % (resid + 1))
 
 245     addr = world.results[resid]['address']
 
 246     for line in step.hashes:
 
 247         assert_in(line['type'], addr)
 
 248         m = re.match("%s$" % line['value'], addr[line['type']])
 
 249         assert_is_not_none(m, msg="field %s does not match: %s$ != %s." % (
 
 250                                   line['type'], line['value'], addr[line['type']]))
 
 251         #assert_equals(line['value'], addr[line['type']])
 
 253 @step(u'address of result (\d+) does not contain (.*)')
 
 254 def api_result_address_details_missing(step, resid, types):
 
 256     step.given('at least %d results are returned' % (resid + 1))
 
 257     addr = world.results[resid]['address']
 
 258     for t in types.split(','):
 
 259         assert_not_in(t.strip(), addr)
 
 262 @step(u'address of result (\d+) is')
 
 263 def api_result_address_exact(step, resid):
 
 265     step.given('at least %d results are returned' % (resid + 1))
 
 266     result = world.results[resid]
 
 268     assert_equals(len(step.hashes), len(result['address']))
 
 269     for k,v in result['address'].iteritems():
 
 270         assert_equals(step.hashes[linenr]['type'], k)
 
 271         assert_equals(step.hashes[linenr]['value'], v)
 
 275 @step('there are( no)? duplicates')
 
 276 def api_result_check_for_duplicates(step, nodups=None):
 
 277     step.given('at least 1 result is returned')
 
 279     for res in world.results:
 
 280         resarr.append((res['osm_type'], res['class'],
 
 281                         res['type'], res['display_name']))
 
 284         assert len(resarr) > len(set(resarr))
 
 286         assert_equal(len(resarr), len(set(resarr)))