1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2025 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Various helper classes for running Nominatim commands.
11 from collections import namedtuple
13 APIResponse = namedtuple('APIResponse', ['endpoint', 'status', 'body', 'headers'])
17 """ Execute a call to an API endpoint.
19 def __init__(self, environ, api_engine):
20 create_func = getattr(self, f"create_engine_{api_engine}")
21 self.exec_engine = create_func(environ)
23 def run(self, endpoint, params, http_headers):
24 return asyncio.run(self.exec_engine(endpoint, params, http_headers))
26 def run_step(self, endpoint, base_params, datatable, fmt, http_headers):
28 base_params['format'] = fmt.strip()
31 if datatable[0] == ['param', 'value']:
32 base_params.update(datatable[1:])
34 base_params.update(zip(datatable[0], datatable[1]))
36 return self.run(endpoint, base_params, http_headers)
38 def create_engine_falcon(self, environ):
39 import nominatim_api.server.falcon.server
42 async def exec_engine_falcon(endpoint, params, http_headers):
43 app = nominatim_api.server.falcon.server.get_application(None, environ)
45 async with falcon.testing.ASGIConductor(app) as conductor:
46 response = await conductor.get("/" + endpoint, params=params,
49 return APIResponse(endpoint, response.status_code,
50 response.text, response.headers)
52 return exec_engine_falcon
54 def create_engine_starlette(self, environ):
55 import nominatim_api.server.starlette.server
56 from asgi_lifespan import LifespanManager
59 async def _request(endpoint, params, http_headers):
60 app = nominatim_api.server.starlette.server.get_application(None, environ)
62 async with LifespanManager(app):
63 async with httpx.AsyncClient(app=app, base_url="http://nominatim.test") as client:
64 response = await client.get("/" + endpoint, params=params,
67 return APIResponse(endpoint, response.status_code,
68 response.text, response.headers)