From 14d17a3ad452c3b370beac1c2ab7b17e0f566982 Mon Sep 17 00:00:00 2001 From: Sarah Hoffmann Date: Fri, 9 Jan 2015 23:16:53 +0100 Subject: [PATCH] properly close connection in test --- tests/steps/db_results.py | 84 +++++++++++++++++++++------------------ 1 file changed, 46 insertions(+), 38 deletions(-) diff --git a/tests/steps/db_results.py b/tests/steps/db_results.py index f65e9924..2566418e 100644 --- a/tests/steps/db_results.py +++ b/tests/steps/db_results.py @@ -22,7 +22,7 @@ logger = logging.getLogger(__name__) @step(u'table placex contains as names for (N|R|W)(\d+)') def check_placex_names(step, osmtyp, osmid): - """ Check for the exact content of the name hstaore in placex. + """ Check for the exact content of the name hstore in placex. """ cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor) cur.execute('SELECT name FROM placex where osm_type = %s and osm_id =%s', (osmtyp, int(osmid))) @@ -43,47 +43,55 @@ def check_placex_content(step, tablename): given columns are tested. If there is more than one line for an OSM object, they must match in these columns. """ - cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor) - for line in step.hashes: - osmtype, osmid, cls = world.split_id(line['object']) - q = 'SELECT *' - if tablename == 'placex': - q = q + ", ST_X(centroid) as clat, ST_Y(centroid) as clon" - q = q + ", ST_GeometryType(geometry) as geometrytype" - q = q + ' FROM %s where osm_type = %%s and osm_id = %%s' % (tablename,) - if cls is None: - params = (osmtype, osmid) - else: - q = q + ' and class = %s' - params = (osmtype, osmid, cls) - cur.execute(q, params) - assert(cur.rowcount > 0) - for res in cur: - for k,v in line.iteritems(): - if not k == 'object': - assert_in(k, res) - if type(res[k]) is dict: - val = world.make_hash(v) - assert_equals(res[k], val) - elif k in ('parent_place_id', 'linked_place_id'): - pid = world.get_placeid(v) - assert_equals(pid, res[k], "Results for '%s'/'%s' differ: '%s' != '%s'" % (line['object'], k, pid, res[k])) - elif k == 'centroid': - world.match_geometry((res['clat'], res['clon']), v) - else: - assert_equals(str(res[k]), v, "Results for '%s'/'%s' differ: '%s' != '%s'" % (line['object'], k, str(res[k]), v)) + try: + cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor) + for line in step.hashes: + osmtype, osmid, cls = world.split_id(line['object']) + q = 'SELECT *' + if tablename == 'placex': + q = q + ", ST_X(centroid) as clat, ST_Y(centroid) as clon" + q = q + ", ST_GeometryType(geometry) as geometrytype" + q = q + ' FROM %s where osm_type = %%s and osm_id = %%s' % (tablename,) + if cls is None: + params = (osmtype, osmid) + else: + q = q + ' and class = %s' + params = (osmtype, osmid, cls) + cur.execute(q, params) + assert(cur.rowcount > 0) + for res in cur: + for k,v in line.iteritems(): + if not k == 'object': + assert_in(k, res) + if type(res[k]) is dict: + val = world.make_hash(v) + assert_equals(res[k], val) + elif k in ('parent_place_id', 'linked_place_id'): + pid = world.get_placeid(v) + assert_equals(pid, res[k], "Results for '%s'/'%s' differ: '%s' != '%s'" % (line['object'], k, pid, res[k])) + elif k == 'centroid': + world.match_geometry((res['clat'], res['clon']), v) + else: + assert_equals(str(res[k]), v, "Results for '%s'/'%s' differ: '%s' != '%s'" % (line['object'], k, str(res[k]), v)) + finally: + cur.close() + world.conn.commit() @step(u'table (placex?) has no entry for (N|R|W)(\d+)(:\w+)?') def check_placex_missing(step, tablename, osmtyp, osmid, placeclass): cur = world.conn.cursor() - q = 'SELECT count(*) FROM %s where osm_type = %%s and osm_id = %%s' % (tablename, ) - args = [osmtyp, int(osmid)] - if placeclass is not None: - q = q + ' and class = %s' - args.append(placeclass[1:]) - cur.execute(q, args) - numres = cur.fetchone()[0] - assert_equals (numres, 0) + try: + q = 'SELECT count(*) FROM %s where osm_type = %%s and osm_id = %%s' % (tablename, ) + args = [osmtyp, int(osmid)] + if placeclass is not None: + q = q + ' and class = %s' + args.append(placeclass[1:]) + cur.execute(q, args) + numres = cur.fetchone()[0] + assert_equals (numres, 0) + finally: + cur.close() + world.conn.commit() @step(u'search_name table contains$') def check_search_name_content(step): -- 2.45.2