1 """ Steps for checking the results of queries.
 
   4 from nose.tools import *
 
   6 from tidylib import tidy_document
 
   7 from collections import OrderedDict
 
  11 from xml.dom.minidom import parseString
 
  13 logger = logging.getLogger(__name__)
 
  16     """ Puts the DOM structure into more convenient python
 
  17         with a similar structure as the json document, so
 
  18         that the same the semantics can be used. It does not
 
  19         check if the content is valid (or at least not more than
 
  20         necessary to transform it into a dict structure).
 
  22     page = parseString(world.page).documentElement
 
  25     world.result_header = OrderedDict(page.attributes.items())
 
  26     logger.debug('Result header: %r' % (world.result_header))
 
  30     if page.nodeName == 'searchresults' or page.nodeName == 'lookupresults':
 
  31         for node in page.childNodes:
 
  32             if node.nodeName != "#text":
 
  33                 assert_equals(node.nodeName, 'place', msg="Unexpected element '%s'" % node.nodeName)
 
  34                 newresult = OrderedDict(node.attributes.items())
 
  35                 assert_not_in('address', newresult)
 
  36                 assert_not_in('geokml', newresult)
 
  37                 assert_not_in('extratags', newresult)
 
  38                 assert_not_in('namedetails', newresult)
 
  39                 address = OrderedDict()
 
  40                 for sub in node.childNodes:
 
  41                     if sub.nodeName == 'geokml':
 
  42                         newresult['geokml'] = sub.childNodes[0].toxml()
 
  43                     elif sub.nodeName == 'extratags':
 
  44                         newresult['extratags'] = {}
 
  45                         for tag in sub.childNodes:
 
  46                             assert_equals(tag.nodeName, 'tag')
 
  47                             attrs = dict(tag.attributes.items())
 
  48                             assert_in('key', attrs)
 
  49                             assert_in('value', attrs)
 
  50                             newresult['extratags'][attrs['key']] = attrs['value']
 
  51                     elif sub.nodeName == 'namedetails':
 
  52                         newresult['namedetails'] = {}
 
  53                         for tag in sub.childNodes:
 
  54                             assert_equals(tag.nodeName, 'name')
 
  55                             attrs = dict(tag.attributes.items())
 
  56                             assert_in('desc', attrs)
 
  57                             newresult['namedetails'][attrs['desc']] = tag.firstChild.nodeValue.strip()
 
  59                     elif sub.nodeName == '#text':
 
  62                         address[sub.nodeName] = sub.firstChild.nodeValue.strip()
 
  64                     newresult['address'] = address
 
  65                 world.results.append(newresult)
 
  66     elif page.nodeName == 'reversegeocode':
 
  69         for node in page.childNodes:
 
  70             if node.nodeName == 'result':
 
  71                 assert_equals(len(world.results), 0)
 
  73                 world.results.append(OrderedDict(node.attributes.items()))
 
  74                 assert_not_in('display_name', world.results[0])
 
  75                 assert_not_in('address', world.results[0])
 
  76                 world.results[0]['display_name'] = node.firstChild.nodeValue.strip()
 
  77             elif node.nodeName == 'error':
 
  78                 assert_equals(len(world.results), 0)
 
  80             elif node.nodeName == 'addressparts':
 
  82                 address = OrderedDict()
 
  83                 for sub in node.childNodes:
 
  84                     address[sub.nodeName] = sub.firstChild.nodeValue.strip()
 
  85                 world.results[0]['address'] = address
 
  86             elif node.nodeName == 'extratags':
 
  87                 world.results[0]['extratags'] = {}
 
  88                 for tag in node.childNodes:
 
  89                     assert_equals(tag.nodeName, 'tag')
 
  90                     attrs = dict(tag.attributes.items())
 
  91                     assert_in('key', attrs)
 
  92                     assert_in('value', attrs)
 
  93                     world.results[0]['extratags'][attrs['key']] = attrs['value']
 
  94             elif node.nodeName == 'namedetails':
 
  95                 world.results[0]['namedetails'] = {}
 
  96                 for tag in node.childNodes:
 
  97                     assert_equals(tag.nodeName, 'name')
 
  98                     attrs = dict(tag.attributes.items())
 
  99                     assert_in('desc', attrs)
 
 100                     world.results[0]['namedetails'][attrs['desc']] = tag.firstChild.nodeValue.strip()
 
 101             elif node.nodeName == "#text":
 
 104                 assert False, "Unknown content '%s' in XML" % node.nodeName
 
 106         assert False, "Unknown document node name %s in XML" % page.nodeName
 
 108     logger.debug("The following was parsed out of XML:")
 
 109     logger.debug(world.results)
 
 111 @step(u'a HTTP (\d+) is returned')
 
 112 def api_result_http_error(step, error):
 
 113     assert_equals(world.returncode, int(error))
 
 115 @step(u'the result is valid( \w+)?')
 
 116 def api_result_is_valid(step, fmt):
 
 117     assert_equals(world.returncode, 200)
 
 119     if world.response_format == 'html':
 
 120         document, errors = tidy_document(world.page, 
 
 121                              options={'char-encoding' : 'utf8'})
 
 122         # assert(len(errors) == 0), "Errors found in HTML document:\n%s" % errors
 
 123         world.results = document
 
 124     elif world.response_format == 'xml':
 
 126     elif world.response_format == 'json':
 
 127         world.results = json.JSONDecoder(object_pairs_hook=OrderedDict).decode(world.page)
 
 128         if world.request_type == 'reverse':
 
 129             world.results = (world.results,)
 
 131         assert False, "Unknown page format: %s" % (world.response_format)
 
 134         assert_equals (fmt.strip(), world.response_format)
 
 137 def compare(operator, op1, op2):
 
 138     if operator == 'less than':
 
 140     elif operator == 'more than':
 
 142     elif operator == 'exactly':
 
 144     elif operator == 'at least':
 
 146     elif operator == 'at most':
 
 149         raise Exception("unknown operator '%s'" % operator)
 
 151 @step(u'(less than|more than|exactly|at least|at most) (\d+) results? (?:is|are) returned')
 
 152 def validate_result_number(step, operator, number):
 
 153     step.given('the result is valid')
 
 154     numres = len(world.results)
 
 155     assert compare(operator, numres, int(number)), \
 
 156         "Bad number of results: expected %s %s, got %d." % (operator, number, numres)
 
 158 @step(u'result (\d+) has( not)? attributes (\S+)')
 
 159 def search_check_for_result_attribute(step, num, invalid, attrs):
 
 161     step.given('at least %d results are returned' % (num + 1))
 
 162     res = world.results[num]
 
 163     for attr in attrs.split(','):
 
 165             assert_not_in(attr.strip(), res)
 
 167             assert_in(attr.strip(),res)
 
 169 @step(u'there is a json wrapper "([^"]*)"')
 
 170 def api_result_check_json_wrapper(step, wrapper):
 
 171     step.given('the result is valid json')
 
 172     assert_equals(world.json_callback, wrapper)
 
 174 @step(u'result header contains')
 
 175 def api_result_header_contains(step):
 
 176     step.given('the result is valid')
 
 177     for line in step.hashes:
 
 178         assert_in(line['attr'], world.result_header)
 
 179         m = re.match("%s$" % (line['value'],), world.result_header[line['attr']])
 
 181 @step(u'result header has no attribute (.*)')
 
 182 def api_result_header_contains_not(step, attr):
 
 183     step.given('the result is valid')
 
 184     assert_not_in(attr, world.result_header)
 
 186 @step(u'results contain$')
 
 187 def api_result_contains(step):
 
 188     step.given('at least 1 result is returned')
 
 189     for line in step.hashes:
 
 191             reslist = (world.results[int(line['ID'])],)
 
 193             reslist = world.results
 
 194         for k,v in line.iteritems():
 
 196                 for curres in reslist:
 
 197                     world.match_geometry((float(curres['lat']), float(curres['lon'])), v)
 
 199                 for curres in reslist:
 
 202                         # mathematical operation
 
 203                         evalexp = '%s %s' % (curres[k], v)
 
 205                         logger.debug('Evaluating: %s = %s' % (res, evalexp))
 
 206                         assert_true(res, "Evaluation failed: %s" % (evalexp, ))
 
 209                         m = re.match("%s$" % (v,), curres[k])
 
 210                         assert_is_not_none(m, msg="field %s does not match: %s$ != %s." % (k, v, curres[k]))
 
 213 @step(u'result addresses contain$')
 
 214 def api_result_address_contains(step):
 
 215     step.given('the result is valid')
 
 216     for line in step.hashes:
 
 218             reslist = (world.results[int(line['ID'])],)
 
 220             reslist = world.results
 
 221         for k,v in line.iteritems():
 
 224                     curres = res['address']
 
 226                     m = re.match("%s$" % (v,), curres[k])
 
 227                     assert_is_not_none(m, msg="field %s does not match: %s$ != %s." % (k, v, curres[k]))
 
 230 @step(u'address of result (\d+) contains')
 
 231 def api_result_address_exact(step, resid):
 
 233     step.given('at least %d results are returned' % (resid + 1))
 
 234     addr = world.results[resid]['address']
 
 235     for line in step.hashes:
 
 236         assert_in(line['type'], addr)
 
 237         m = re.match("%s$" % line['value'], addr[line['type']])
 
 238         assert_is_not_none(m, msg="field %s does not match: %s$ != %s." % (
 
 239                                   line['type'], line['value'], addr[line['type']]))
 
 240         #assert_equals(line['value'], addr[line['type']])
 
 242 @step(u'address of result (\d+) does not contain (.*)')
 
 243 def api_result_address_details_missing(step, resid, types):
 
 245     step.given('at least %d results are returned' % (resid + 1))
 
 246     addr = world.results[resid]['address']
 
 247     for t in types.split(','):
 
 248         assert_not_in(t.strip(), addr)
 
 251 @step(u'address of result (\d+) is')
 
 252 def api_result_address_exact(step, resid):
 
 254     step.given('at least %d results are returned' % (resid + 1))
 
 255     result = world.results[resid]
 
 257     assert_equals(len(step.hashes), len(result['address']))
 
 258     for k,v in result['address'].iteritems():
 
 259         assert_equals(step.hashes[linenr]['type'], k)
 
 260         assert_equals(step.hashes[linenr]['value'], v)
 
 264 @step('there are( no)? duplicates')
 
 265 def api_result_check_for_duplicates(step, nodups=None):
 
 266     step.given('at least 1 result is returned')
 
 268     for res in world.results:
 
 269         resarr.append((res['osm_type'], res['class'],
 
 270                         res['type'], res['display_name']))
 
 273         assert len(resarr) > len(set(resarr))
 
 275         assert_equal(len(resarr), len(set(resarr)))