1 """ Steps for checking the DB after import and update tests.
 
   3     There are two groups of test here. The first group tests
 
   4     the contents of db tables directly, the second checks
 
   5     query results by using the command line query tool.
 
   8 from nose.tools import *
 
  11 import psycopg2.extensions
 
  12 import psycopg2.extras
 
  18 from collections import OrderedDict
 
  20 logger = logging.getLogger(__name__)
 
  22 @step(u'table placex contains as names for (N|R|W)(\d+)')
 
  23 def check_placex_names(step, osmtyp, osmid):
 
  24     """ Check for the exact content of the name hstore in placex.
 
  26     cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
 
  27     cur.execute('SELECT name FROM placex where osm_type = %s and osm_id =%s', (osmtyp, int(osmid)))
 
  29         names = dict(line['name'])
 
  30         for name in step.hashes:
 
  31             assert_in(name['k'], names)
 
  32             assert_equals(names[name['k']], name['v'])
 
  34         assert_equals(len(names), 0)
 
  39 @step(u'table ([a-z_]+) contains$')
 
  40 def check_placex_content(step, tablename):
 
  41     """ check that the given lines are in the given table
 
  42         Entries are searched by osm_type/osm_id and then all
 
  43         given columns are tested. If there is more than one
 
  44         line for an OSM object, they must match in these columns.
 
  47         cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
 
  48         for line in step.hashes:
 
  49             osmtype, osmid, cls = world.split_id(line['object'])
 
  51             if tablename == 'placex':
 
  52                 q = q + ", ST_X(centroid) as clat, ST_Y(centroid) as clon"
 
  53             if tablename == 'location_property_osmline':
 
  54                 q = q + ' FROM %s where osm_id = %%s' % (tablename,)
 
  56                 q = q + ", ST_GeometryType(geometry) as geometrytype"
 
  57                 q = q + ' FROM %s where osm_type = %%s and osm_id = %%s' % (tablename,)
 
  59                 if tablename == 'location_property_osmline':
 
  62                     params = (osmtype, osmid)
 
  64                 q = q + ' and class = %s'
 
  65                 if tablename == 'location_property_osmline':
 
  68                     params = (osmtype, osmid, cls)
 
  69             cur.execute(q, params)
 
  70             assert(cur.rowcount > 0)
 
  72                 for k,v in line.iteritems():
 
  75                         if type(res[k]) is dict:
 
  76                             val = world.make_hash(v)
 
  77                             assert_equals(res[k], val)
 
  78                         elif k in ('parent_place_id', 'linked_place_id'):
 
  79                             pid = world.get_placeid(v)
 
  80                             assert_equals(pid, res[k], "Results for '%s'/'%s' differ: '%s' != '%s'" % (line['object'], k, pid, res[k]))
 
  82                             world.match_geometry((res['clat'], res['clon']), v)
 
  84                             assert_equals(str(res[k]), v, "Results for '%s'/'%s' differ: '%s' != '%s'" % (line['object'], k, str(res[k]), v))
 
  89 @step(u'table (placex?) has no entry for (N|R|W)(\d+)(:\w+)?')
 
  90 def check_placex_missing(step, tablename, osmtyp, osmid, placeclass):
 
  91     cur = world.conn.cursor()
 
  93         q = 'SELECT count(*) FROM %s where osm_type = %%s and osm_id = %%s' % (tablename, )
 
  94         args = [osmtyp, int(osmid)]
 
  95         if placeclass is not None:
 
  96             q = q + ' and class = %s'
 
  97             args.append(placeclass[1:])
 
  99         numres = cur.fetchone()[0]
 
 100         assert_equals (numres, 0)
 
 105 @step(u'table location_property_osmline has no entry for W(\d+)?')
 
 106 def check_osmline_missing(step, osmid):
 
 107     cur = world.conn.cursor()
 
 109         q = 'SELECT count(*) FROM location_property_osmline where osm_id = %s' % (osmid, )
 
 111         numres = cur.fetchone()[0]
 
 112         assert_equals (numres, 0)
 
 117 @step(u'search_name table contains$')
 
 118 def check_search_name_content(step):
 
 119     cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
 
 120     for line in step.hashes:
 
 121         placeid = world.get_placeid(line['place_id'])
 
 122         cur.execute('SELECT * FROM search_name WHERE place_id = %s', (placeid,))
 
 123         assert(cur.rowcount > 0)
 
 125             for k,v in line.iteritems():
 
 126                 if k in ('search_rank', 'address_rank'):
 
 127                     assert_equals(int(v), res[k], "Results for '%s'/'%s' differ: '%s' != '%d'" % (line['place_id'], k, v, res[k]))
 
 128                 elif k in ('importance'):
 
 129                     assert_equals(float(v), res[k], "Results for '%s'/'%s' differ: '%s' != '%d'" % (line['place_id'], k, v, res[k]))
 
 130                 elif k in ('name_vector', 'nameaddress_vector'):
 
 131                     terms = [x.strip().replace('#', ' ') for x in v.split(',')]
 
 132                     cur.execute('SELECT word_id, word_token FROM word, (SELECT unnest(%s) as term) t WHERE word_token = make_standard_name(t.term)', (terms,))
 
 133                     assert cur.rowcount >= len(terms)
 
 135                         assert_in(wid['word_id'], res[k], "Missing term for %s/%s: %s" % (line['place_id'], k, wid['word_token']))
 
 136                 elif k in ('country_code'):
 
 137                     assert_equals(v, res[k], "Results for '%s'/'%s' differ: '%s' != '%d'" % (line['place_id'], k, v, res[k]))
 
 138                 elif k == 'place_id':
 
 141                     raise Exception("Cannot handle field %s in search_name table" % (k, ))
 
 143 @step(u'way (\d+) expands to lines')
 
 144 def check_interpolation_lines(step, wayid):
 
 145     """Check that the correct interpolation line has been entered in
 
 146        location_property_osmline for the given source line/nodes.
 
 147        Expected are three columns:
 
 148        startnumber, endnumber and linegeo
 
 151     for line in step.hashes:
 
 152         lines.append((line["startnumber"], line["endnumber"], line["geometry"]))
 
 153     cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
 
 154     cur.execute("""SELECT startnumber::text, endnumber::text, st_astext(linegeo) as geometry
 
 155                    FROM location_property_osmline WHERE osm_id = %s""",
 
 157     assert_equals(len(lines), cur.rowcount)
 
 159         linegeo = str(str(r["geometry"].split('(')[1]).split(')')[0]).replace(',', ', ')
 
 160         exp = (r["startnumber"], r["endnumber"], linegeo)
 
 161         assert_in(exp, lines)
 
 164 @step(u'way (\d+) expands exactly to housenumbers ([0-9,]*)')
 
 165 def check_interpolated_housenumber_list(step, nodeid, numberlist):
 
 166     """ Checks that the interpolated house numbers corresponds
 
 169     expected = numberlist.split(',');
 
 170     cur = world.conn.cursor()
 
 171     cur.execute("""SELECT housenumber FROM placex
 
 172                    WHERE osm_type = 'W' and osm_id = %s
 
 173                    and class = 'place' and type = 'address'""", (int(nodeid),))
 
 175         assert_in(r[0], expected, "Unexpected house number %s for node %s." % (r[0], nodeid))
 
 176         expected.remove(r[0])
 
 177     assert_equals(0, len(expected), "Missing house numbers for way %s: %s" % (nodeid, expected))
 
 179 @step(u'way (\d+) expands to no housenumbers')
 
 180 def check_no_interpolated_housenumber_list(step, nodeid):
 
 181     """ Checks that the interpolated house numbers corresponds
 
 184     cur = world.conn.cursor()
 
 185     cur.execute("""SELECT housenumber FROM placex
 
 186                    WHERE osm_type = 'W' and osm_id = %s
 
 187                      and class = 'place' and type = 'address'""", (int(nodeid),))
 
 188     res = [r[0] for r in cur]
 
 189     assert_equals(0, len(res), "Unexpected house numbers for way %s: %s" % (nodeid, res))
 
 191 @step(u'table search_name has no entry for (.*)')
 
 192 def check_placex_missing(step, osmid):
 
 193     """ Checks if there is an entry in the search index for the
 
 196     cur = world.conn.cursor()
 
 197     placeid = world.get_placeid(osmid)
 
 198     cur.execute('SELECT count(*) FROM search_name WHERE place_id =%s', (placeid,))
 
 199     numres = cur.fetchone()[0]
 
 200     assert_equals (numres, 0)