--- /dev/null
+"""
+Classes wrapping HTTP responses from the Nominatim API.
+"""
+from collections import OrderedDict
+import re
+import json
+import xml.etree.ElementTree as ET
+
+from check_functions import Almost
+
+def _geojson_result_to_json_result(geojson_result):
+ result = geojson_result['properties']
+ result['geojson'] = geojson_result['geometry']
+ if 'bbox' in geojson_result:
+ # bbox is minlon, minlat, maxlon, maxlat
+ # boundingbox is minlat, maxlat, minlon, maxlon
+ result['boundingbox'] = [geojson_result['bbox'][1],
+ geojson_result['bbox'][3],
+ geojson_result['bbox'][0],
+ geojson_result['bbox'][2]]
+ return result
+
+class BadRowValueAssert:
+ """ Lazily formatted message for failures to find a field content.
+ """
+
+ def __init__(self, response, idx, field, value):
+ self.idx = idx
+ self.field = field
+ self.value = value
+ self.row = response.result[idx]
+
+ def __str__(self):
+ return "\nBad value for row {} field '{}'. Expected: {}, got: {}.\nFull row: {}"""\
+ .format(self.idx, self.field, self.value,
+ self.row[self.field], json.dumps(self.row, indent=4))
+
+
+class GenericResponse:
+ """ Common base class for all API responses.
+ """
+ def __init__(self, page, fmt, errorcode=200):
+ self.page = page
+ self.format = fmt
+ self.errorcode = errorcode
+ self.result = []
+ self.header = dict()
+
+ if errorcode == 200:
+ getattr(self, '_parse_' + fmt)()
+
+ def _parse_json(self):
+ m = re.fullmatch(r'([\w$][^(]*)\((.*)\)', self.page)
+ if m is None:
+ code = self.page
+ else:
+ code = m.group(2)
+ self.header['json_func'] = m.group(1)
+ self.result = json.JSONDecoder(object_pairs_hook=OrderedDict).decode(code)
+ if isinstance(self.result, OrderedDict):
+ self.result = [self.result]
+
+ def _parse_geojson(self):
+ self._parse_json()
+ if 'error' in self.result[0]:
+ self.result = []
+ else:
+ self.result = list(map(_geojson_result_to_json_result, self.result[0]['features']))
+
+ def _parse_geocodejson(self):
+ self._parse_geojson()
+ if self.result is not None:
+ self.result = [r['geocoding'] for r in self.result]
+
+ def assert_field(self, idx, field, value):
+ """ Check that result row `idx` has a field `field` with value `value`.
+ Float numbers are matched approximately. When the expected value
+ starts with a carat, regular expression matching is used.
+ """
+ assert field in self.result[idx], \
+ "Result row {} has no field '{}'.\nFull row: {}"\
+ .format(idx, field, json.dumps(self.result[idx], indent=4))
+
+ if isinstance(value, float):
+ assert Almost(value) == float(self.result[idx][field]), \
+ BadRowValueAssert(self, idx, field, value)
+ elif value.startswith("^"):
+ assert re.fullmatch(value, self.result[idx][field]), \
+ BadRowValueAssert(self, idx, field, value)
+ else:
+ assert str(self.result[idx][field]) == str(value), \
+ BadRowValueAssert(self, idx, field, value)
+
+ def match_row(self, row):
+ """ Match the result fields against the given behave table row.
+ """
+ if 'ID' in row.headings:
+ todo = [int(row['ID'])]
+ else:
+ todo = range(len(self.result))
+
+ for i in todo:
+ for name, value in zip(row.headings, row.cells):
+ if name == 'ID':
+ pass
+ elif name == 'osm':
+ self.assert_field(i, 'osm_type', value[0])
+ self.assert_field(i, 'osm_id', value[1:])
+ elif name == 'centroid':
+ lon, lat = value.split(' ')
+ self.assert_field(i, 'lat', float(lat))
+ self.assert_field(i, 'lon', float(lon))
+ else:
+ self.assert_field(i, name, value)
+
+ def property_list(self, prop):
+ return [x[prop] for x in self.result]
+
+
+class SearchResponse(GenericResponse):
+ """ Specialised class for search and lookup responses.
+ Transforms the xml response in a format similar to json.
+ """
+
+ def _parse_xml(self):
+ xml_tree = ET.fromstring(self.page)
+
+ self.header = dict(xml_tree.attrib)
+
+ for child in xml_tree:
+ assert child.tag == "place"
+ self.result.append(dict(child.attrib))
+
+ address = {}
+ for sub in child:
+ if sub.tag == 'extratags':
+ self.result[-1]['extratags'] = {}
+ for tag in sub:
+ self.result[-1]['extratags'][tag.attrib['key']] = tag.attrib['value']
+ elif sub.tag == 'namedetails':
+ self.result[-1]['namedetails'] = {}
+ for tag in sub:
+ self.result[-1]['namedetails'][tag.attrib['desc']] = tag.text
+ elif sub.tag == 'geokml':
+ self.result[-1][sub.tag] = True
+ else:
+ address[sub.tag] = sub.text
+
+ if address:
+ self.result[-1]['address'] = address
+
+
+class ReverseResponse(GenericResponse):
+ """ Specialised class for reverse responses.
+ Transforms the xml response in a format similar to json.
+ """
+
+ def _parse_xml(self):
+ xml_tree = ET.fromstring(self.page)
+
+ self.header = dict(xml_tree.attrib)
+ self.result = []
+
+ for child in xml_tree:
+ if child.tag == 'result':
+ assert not self.result, "More than one result in reverse result"
+ self.result.append(dict(child.attrib))
+ elif child.tag == 'addressparts':
+ address = {}
+ for sub in child:
+ address[sub.tag] = sub.text
+ self.result[0]['address'] = address
+ elif child.tag == 'extratags':
+ self.result[0]['extratags'] = {}
+ for tag in child:
+ self.result[0]['extratags'][tag.attrib['key']] = tag.attrib['value']
+ elif child.tag == 'namedetails':
+ self.result[0]['namedetails'] = {}
+ for tag in child:
+ self.result[0]['namedetails'][tag.attrib['desc']] = tag.text
+ elif child.tag == 'geokml':
+ self.result[0][child.tag] = True
+ else:
+ assert child.tag == 'error', \
+ "Unknown XML tag {} on page: {}".format(child.tag, self.page)
+
+
+class StatusResponse(GenericResponse):
+ """ Specialised class for status responses.
+ Can also parse text responses.
+ """
+
+ def _parse_text(self):
+ pass
-""" Steps that run search queries.
+""" Steps that run queries against the API.
Queries may either be run directly via PHP using the query script
- or via the HTTP interface.
+ or via the HTTP interface using php-cgi.
"""
-
import json
import os
-import io
import re
import logging
-import xml.etree.ElementTree as ET
from urllib.parse import urlencode
-from collections import OrderedDict
-from check_functions import Almost
from utils import run_script
+from http_responses import GenericResponse, SearchResponse, ReverseResponse, StatusResponse
-logger = logging.getLogger(__name__)
+LOG = logging.getLogger(__name__)
BASE_SERVER_ENV = {
'HTTP_HOST' : 'localhost',
else:
raise Exception("unknown operator '%s'" % operator)
-class GenericResponse(object):
-
- def match_row(self, row):
- if 'ID' in row.headings:
- todo = [int(row['ID'])]
- else:
- todo = range(len(self.result))
-
- for i in todo:
- res = self.result[i]
- for h in row.headings:
- if h == 'ID':
- pass
- elif h == 'osm':
- assert res['osm_type'] == row[h][0]
- assert res['osm_id'] == int(row[h][1:])
- elif h == 'centroid':
- x, y = row[h].split(' ')
- assert Almost(float(y)) == float(res['lat'])
- assert Almost(float(x)) == float(res['lon'])
- elif row[h].startswith("^"):
- assert h in res
- assert re.fullmatch(row[h], res[h]) is not None, \
- "attribute '%s': expected: '%s', got '%s'" % (h, row[h], res[h])
- else:
- assert h in res
- assert str(res[h]) == str(row[h])
-
- def property_list(self, prop):
- return [ x[prop] for x in self.result ]
-
-
-class SearchResponse(GenericResponse):
-
- def __init__(self, page, fmt='json', errorcode=200):
- self.page = page
- self.format = fmt
- self.errorcode = errorcode
- self.result = []
- self.header = dict()
-
- if errorcode == 200:
- getattr(self, 'parse_' + fmt)()
-
- def parse_json(self):
- m = re.fullmatch(r'([\w$][^(]*)\((.*)\)', self.page)
- if m is None:
- code = self.page
- else:
- code = m.group(2)
- self.header['json_func'] = m.group(1)
- self.result = json.JSONDecoder(object_pairs_hook=OrderedDict).decode(code)
-
- def parse_geojson(self):
- self.parse_json()
- self.result = geojson_results_to_json_results(self.result)
-
- def parse_geocodejson(self):
- self.parse_geojson()
- if self.result is not None:
- self.result = [r['geocoding'] for r in self.result]
-
- def parse_xml(self):
- et = ET.fromstring(self.page)
-
- self.header = dict(et.attrib)
-
- for child in et:
- assert child.tag == "place"
- self.result.append(dict(child.attrib))
-
- address = {}
- for sub in child:
- if sub.tag == 'extratags':
- self.result[-1]['extratags'] = {}
- for tag in sub:
- self.result[-1]['extratags'][tag.attrib['key']] = tag.attrib['value']
- elif sub.tag == 'namedetails':
- self.result[-1]['namedetails'] = {}
- for tag in sub:
- self.result[-1]['namedetails'][tag.attrib['desc']] = tag.text
- elif sub.tag in ('geokml'):
- self.result[-1][sub.tag] = True
- else:
- address[sub.tag] = sub.text
-
- if len(address) > 0:
- self.result[-1]['address'] = address
-
-
-class ReverseResponse(GenericResponse):
-
- def __init__(self, page, fmt='json', errorcode=200):
- self.page = page
- self.format = fmt
- self.errorcode = errorcode
- self.result = []
- self.header = dict()
-
- if errorcode == 200:
- getattr(self, 'parse_' + fmt)()
-
- def parse_json(self):
- m = re.fullmatch(r'([\w$][^(]*)\((.*)\)', self.page)
- if m is None:
- code = self.page
- else:
- code = m.group(2)
- self.header['json_func'] = m.group(1)
- self.result = [json.JSONDecoder(object_pairs_hook=OrderedDict).decode(code)]
-
- def parse_geojson(self):
- self.parse_json()
- if 'error' in self.result:
- return
- self.result = geojson_results_to_json_results(self.result[0])
-
- def parse_geocodejson(self):
- self.parse_geojson()
- if self.result is not None:
- self.result = [r['geocoding'] for r in self.result]
-
- def parse_xml(self):
- et = ET.fromstring(self.page)
-
- self.header = dict(et.attrib)
- self.result = []
-
- for child in et:
- if child.tag == 'result':
- assert len(self.result) == 0, "More than one result in reverse result"
- self.result.append(dict(child.attrib))
- elif child.tag == 'addressparts':
- address = {}
- for sub in child:
- address[sub.tag] = sub.text
- self.result[0]['address'] = address
- elif child.tag == 'extratags':
- self.result[0]['extratags'] = {}
- for tag in child:
- self.result[0]['extratags'][tag.attrib['key']] = tag.attrib['value']
- elif child.tag == 'namedetails':
- self.result[0]['namedetails'] = {}
- for tag in child:
- self.result[0]['namedetails'][tag.attrib['desc']] = tag.text
- elif child.tag in ('geokml'):
- self.result[0][child.tag] = True
- else:
- assert child.tag == 'error', \
- "Unknown XML tag %s on page: %s" % (child.tag, self.page)
-
-
-class DetailsResponse(GenericResponse):
-
- def __init__(self, page, fmt='json', errorcode=200):
- self.page = page
- self.format = fmt
- self.errorcode = errorcode
- self.result = []
- self.header = dict()
-
- if errorcode == 200:
- getattr(self, 'parse_' + fmt)()
-
- def parse_json(self):
- self.result = [json.JSONDecoder(object_pairs_hook=OrderedDict).decode(self.page)]
-
-
-class StatusResponse(GenericResponse):
-
- def __init__(self, page, fmt='text', errorcode=200):
- self.page = page
- self.format = fmt
- self.errorcode = errorcode
-
- if errorcode == 200 and fmt != 'text':
- getattr(self, 'parse_' + fmt)()
-
- def parse_json(self):
- self.result = [json.JSONDecoder(object_pairs_hook=OrderedDict).decode(self.page)]
-
-
-def geojson_result_to_json_result(geojson_result):
- result = geojson_result['properties']
- result['geojson'] = geojson_result['geometry']
- if 'bbox' in geojson_result:
- # bbox is minlon, minlat, maxlon, maxlat
- # boundingbox is minlat, maxlat, minlon, maxlon
- result['boundingbox'] = [
- geojson_result['bbox'][1],
- geojson_result['bbox'][3],
- geojson_result['bbox'][0],
- geojson_result['bbox'][2]
- ]
- return result
-
-
-def geojson_results_to_json_results(geojson_results):
- if 'error' in geojson_results:
- return
- return list(map(geojson_result_to_json_result, geojson_results['features']))
-
@when(u'searching for "(?P<query>.*)"(?P<dups> with dups)?')
def query_cmd(context, query, dups):
env['SCRIPT_FILENAME'] = os.path.join(env['CONTEXT_DOCUMENT_ROOT'],
'%s.php' % endpoint)
- logger.debug("Environment:" + json.dumps(env, sort_keys=True, indent=2))
+ LOG.debug("Environment:" + json.dumps(env, sort_keys=True, indent=2))
if hasattr(context, 'http_headers'):
env.update(context.http_headers)
else:
outfmt = fmt.strip()
- context.response = DetailsResponse(outp, outfmt, status)
+ context.response = GenericResponse(outp, outfmt, status)
@when(u'sending (?P<fmt>\S+ )?lookup query for (?P<query>.*)')
def website_lookup_request(context, fmt, query):