1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2025 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Functions for specialised logging with HTML output.
10 from typing import Any, Iterator, Optional, List, Tuple, cast, Union, Mapping, Sequence
11 from contextvars import ContextVar
18 import sqlalchemy as sa
19 from sqlalchemy.ext.asyncio import AsyncConnection
22 from pygments import highlight
23 from pygments.lexers import PythonLexer, PostgresLexer
24 from pygments.formatters import HtmlFormatter
26 except ModuleNotFoundError:
27 CODE_HIGHLIGHT = False
30 def _debug_name(res: Any) -> str:
32 return cast(str, res.names.get('name', next(iter(res.names.values()))))
34 return f"Hnr {res.housenumber}" if res.housenumber is not None else '[NONE]'
38 """ Interface for logging function.
40 The base implementation does nothing. Overwrite the functions
41 in derived classes which implement logging functionality.
43 def get_buffer(self) -> str:
44 """ Return the current content of the log buffer.
48 def function(self, func: str, **kwargs: Any) -> None:
49 """ Start a new debug chapter for the given function and its parameters.
52 def section(self, heading: str) -> None:
53 """ Start a new section with the given title.
56 def comment(self, text: str) -> None:
57 """ Add a simple comment to the debug output.
60 def var_dump(self, heading: str, var: Any) -> None:
61 """ Print the content of the variable to the debug output prefixed by
65 def table_dump(self, heading: str, rows: Iterator[Optional[List[Any]]]) -> None:
66 """ Print the table generated by the generator function.
69 def result_dump(self, heading: str, results: Iterator[Tuple[Any, Any]]) -> None:
70 """ Print a list of search results generated by the generator function.
73 def sql(self, conn: AsyncConnection, statement: 'sa.Executable',
74 params: Union[Mapping[str, Any], Sequence[Mapping[str, Any]], None]) -> None:
75 """ Print the SQL for the given statement.
78 def format_sql(self, conn: AsyncConnection, statement: 'sa.Executable',
79 extra_params: Union[Mapping[str, Any], Sequence[Mapping[str, Any]], None]
81 """ Return the compiled version of the statement.
83 compiled = cast('sa.ClauseElement', statement).compile(conn.sync_engine)
85 params = dict(compiled.params)
86 if isinstance(extra_params, Mapping):
87 for k, v in extra_params.items():
88 if hasattr(v, 'to_wkt'):
89 params[k] = v.to_wkt()
90 elif isinstance(v, (int, float)):
94 elif isinstance(extra_params, Sequence) and extra_params:
95 for k in extra_params[0]:
98 sqlstr = str(compiled)
100 if conn.dialect.name == 'postgresql':
101 if sa.__version__.startswith('1'):
103 sqlstr = re.sub(r'__\[POSTCOMPILE_[^]]*\]', '%s', sqlstr)
104 return sqlstr % tuple((repr(params.get(name, None))
105 for name in compiled.positiontup)) # type: ignore
109 sqlstr = re.sub(r'__\[POSTCOMPILE_([^]]*)\]', r'%(\1)s', sqlstr)
110 return sqlstr % params
112 assert conn.dialect.name == 'sqlite'
114 # params in positional order
115 pparams = (repr(params.get(name, None)) for name in compiled.positiontup) # type: ignore
117 sqlstr = re.sub(r'__\[POSTCOMPILE_([^]]*)\]', '?', sqlstr)
118 sqlstr = re.sub(r"\?", lambda m: next(pparams), sqlstr)
123 class HTMLLogger(BaseLogger):
124 """ Logger that formats messages in HTML.
126 def __init__(self) -> None:
127 self.buffer = io.StringIO()
129 def _timestamp(self) -> None:
130 self._write(f'<p class="timestamp">[{dt.datetime.now()}]</p>')
132 def get_buffer(self) -> str:
133 return HTML_HEADER + self.buffer.getvalue() + HTML_FOOTER
135 def function(self, func: str, **kwargs: Any) -> None:
137 self._write(f"<h1>Debug output for {func}()</h1>\n<p>Parameters:<dl>")
138 for name, value in kwargs.items():
139 self._write(f'<dt>{name}</dt><dd>{self._python_var(value)}</dd>')
140 self._write('</dl></p>')
142 def section(self, heading: str) -> None:
144 self._write(f"<h2>{heading}</h2>")
146 def comment(self, text: str) -> None:
148 self._write(f"<p>{text}</p>")
150 def var_dump(self, heading: str, var: Any) -> None:
155 self._write(f'<h5>{heading}</h5>{self._python_var(var)}')
157 def table_dump(self, heading: str, rows: Iterator[Optional[List[Any]]]) -> None:
161 self._write(f'<table><thead><tr><th colspan="{len(head)}">{heading}</th></tr><tr>')
163 self._write(f'<th>{cell}</th>')
164 self._write('</tr></thead><tbody>')
169 self._write(f'<td>{cell}</td>')
171 self._write('</tbody></table>')
173 def result_dump(self, heading: str, results: Iterator[Tuple[Any, Any]]) -> None:
174 """ Print a list of search results generated by the generator function.
178 def format_osm(osm_object: Optional[Tuple[str, int]]) -> str:
192 return f'<a href="https://www.openstreetmap.org/{fullt}/{i}">{t}{i}</a>'
194 self._write(f'<h5>{heading}</h5><p><dl>')
196 for rank, res in results:
197 self._write(f'<dt>[{rank:.3f}]</dt> <dd>{res.source_table.name}(')
198 self._write(f"{_debug_name(res)}, type=({','.join(res.category)}), ")
199 self._write(f"rank={res.rank_address}, ")
200 self._write(f"osm={format_osm(res.osm_object)}, ")
201 self._write(f'cc={res.country_code}, ')
202 self._write(f'importance={res.importance or float("nan"):.5f})</dd>')
204 self._write(f'</dl><b>TOTAL:</b> {total}</p>')
206 def sql(self, conn: AsyncConnection, statement: 'sa.Executable',
207 params: Union[Mapping[str, Any], Sequence[Mapping[str, Any]], None]) -> None:
209 sqlstr = self.format_sql(conn, statement, params)
211 sqlstr = highlight(sqlstr, PostgresLexer(),
212 HtmlFormatter(nowrap=True, lineseparator='<br />'))
213 self._write(f'<div class="highlight"><code class="lang-sql">{sqlstr}</code></div>')
215 self._write(f'<code class="lang-sql">{html.escape(sqlstr)}</code>')
217 def _python_var(self, var: Any) -> str:
219 fmt = highlight(str(var), PythonLexer(), HtmlFormatter(nowrap=True))
220 return f'<div class="highlight"><code class="lang-python">{fmt}</code></div>'
222 return f'<code class="lang-python">{html.escape(str(var))}</code>'
224 def _write(self, text: str) -> None:
225 """ Add the raw text to the debug output.
227 self.buffer.write(text)
230 class TextLogger(BaseLogger):
231 """ Logger creating output suitable for the console.
233 def __init__(self) -> None:
234 self.buffer = io.StringIO()
236 def _timestamp(self) -> None:
237 self._write(f'[{dt.datetime.now()}]\n')
239 def get_buffer(self) -> str:
240 return self.buffer.getvalue()
242 def function(self, func: str, **kwargs: Any) -> None:
243 self._write(f"#### Debug output for {func}()\n\nParameters:\n")
244 for name, value in kwargs.items():
245 self._write(f' {name}: {self._python_var(value)}\n')
248 def section(self, heading: str) -> None:
250 self._write(f"\n# {heading}\n\n")
252 def comment(self, text: str) -> None:
253 self._write(f"{text}\n")
255 def var_dump(self, heading: str, var: Any) -> None:
259 self._write(f'{heading}:\n {self._python_var(var)}\n\n')
261 def table_dump(self, heading: str, rows: Iterator[Optional[List[Any]]]) -> None:
262 self._write(f'{heading}:\n')
263 data = [list(map(self._python_var, row)) if row else None for row in rows]
264 assert data[0] is not None
265 num_cols = len(data[0])
267 maxlens = [max(len(d[i]) for d in data if d) for i in range(num_cols)]
268 tablewidth = sum(maxlens) + 3 * num_cols + 1
269 row_format = '| ' + ' | '.join(f'{{:<{ln}}}' for ln in maxlens) + ' |\n'
270 self._write('-'*tablewidth + '\n')
271 self._write(row_format.format(*data[0]))
272 self._write('-'*tablewidth + '\n')
275 self._write(row_format.format(*row))
277 self._write('-'*tablewidth + '\n')
279 self._write('-'*tablewidth + '\n')
281 def result_dump(self, heading: str, results: Iterator[Tuple[Any, Any]]) -> None:
283 self._write(f'{heading}:\n')
285 for rank, res in results:
286 self._write(f'[{rank:.3f}] {res.source_table.name}(')
287 self._write(f"{_debug_name(res)}, type=({','.join(res.category)}), ")
288 self._write(f"rank={res.rank_address}, ")
289 self._write(f"osm={''.join(map(str, res.osm_object or []))}, ")
290 self._write(f'cc={res.country_code}, ')
291 self._write(f'importance={res.importance or -1:.5f})\n')
293 self._write(f'TOTAL: {total}\n\n')
295 def sql(self, conn: AsyncConnection, statement: 'sa.Executable',
296 params: Union[Mapping[str, Any], Sequence[Mapping[str, Any]], None]) -> None:
298 sqlstr = '\n| '.join(textwrap.wrap(self.format_sql(conn, statement, params), width=78))
299 self._write(f"| {sqlstr}\n\n")
301 def _python_var(self, var: Any) -> str:
304 def _write(self, text: str) -> None:
305 self.buffer.write(text)
308 logger: ContextVar[BaseLogger] = ContextVar('logger', default=BaseLogger())
311 def set_log_output(fmt: str) -> None:
312 """ Enable collecting debug information.
315 logger.set(HTMLLogger())
317 logger.set(TextLogger())
319 logger.set(BaseLogger())
322 def log() -> BaseLogger:
323 """ Return the logger for the current context.
328 def get_and_disable() -> str:
329 """ Return the current content of the debug buffer and disable logging.
331 buf = logger.get().get_buffer()
332 logger.set(BaseLogger())
336 HTML_HEADER: str = """<!DOCTYPE html>
339 <title>Nominatim - Debug</title>
342 (HtmlFormatter(nobackground=True).get_style_defs('.highlight') # type: ignore[no-untyped-call]
343 if CODE_HIGHLIGHT else '') + \
345 h2 { font-size: x-large }
349 font-family: monospace
358 dt::after { content: ": "; }
371 border: solid lightgrey 0.1pt;
373 background-color: #f7f7f7
378 border: solid lightgrey 0.1pt
383 border-collapse: collapse;
386 border-right: thin solid;
394 width: calc(100% - 5pt);
405 HTML_FOOTER: str = "</body></html>"