1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Functions for specialised logging with HTML output.
10 from typing import Any, Iterator, Optional, List, Tuple, cast, Union, Mapping, Sequence
11 from contextvars import ContextVar
17 import sqlalchemy as sa
18 from sqlalchemy.ext.asyncio import AsyncConnection
21 from pygments import highlight
22 from pygments.lexers import PythonLexer, PostgresLexer
23 from pygments.formatters import HtmlFormatter
25 except ModuleNotFoundError:
26 CODE_HIGHLIGHT = False
29 def _debug_name(res: Any) -> str:
31 return cast(str, res.names.get('name', next(iter(res.names.values()))))
33 return f"Hnr {res.housenumber}" if res.housenumber is not None else '[NONE]'
37 """ Interface for logging function.
39 The base implementation does nothing. Overwrite the functions
40 in derived classes which implement logging functionality.
42 def get_buffer(self) -> str:
43 """ Return the current content of the log buffer.
47 def function(self, func: str, **kwargs: Any) -> None:
48 """ Start a new debug chapter for the given function and its parameters.
52 def section(self, heading: str) -> None:
53 """ Start a new section with the given title.
57 def comment(self, text: str) -> None:
58 """ Add a simple comment to the debug output.
62 def var_dump(self, heading: str, var: Any) -> None:
63 """ Print the content of the variable to the debug output prefixed by
68 def table_dump(self, heading: str, rows: Iterator[Optional[List[Any]]]) -> None:
69 """ Print the table generated by the generator function.
73 def result_dump(self, heading: str, results: Iterator[Tuple[Any, Any]]) -> None:
74 """ Print a list of search results generated by the generator function.
78 def sql(self, conn: AsyncConnection, statement: 'sa.Executable',
79 params: Union[Mapping[str, Any], Sequence[Mapping[str, Any]], None]) -> None:
80 """ Print the SQL for the given statement.
83 def format_sql(self, conn: AsyncConnection, statement: 'sa.Executable',
84 extra_params: Union[Mapping[str, Any],
85 Sequence[Mapping[str, Any]], None]) -> str:
86 """ Return the comiled version of the statement.
88 compiled = cast('sa.ClauseElement', statement).compile(conn.sync_engine)
90 params = dict(compiled.params)
91 if isinstance(extra_params, Mapping):
92 for k, v in extra_params.items():
94 elif isinstance(extra_params, Sequence) and extra_params:
95 for k in extra_params[0]:
98 sqlstr = str(compiled)
100 if sa.__version__.startswith('1'):
102 sqlstr = re.sub(r'__\[POSTCOMPILE_[^]]*\]', '%s', sqlstr)
103 return sqlstr % tuple((repr(params.get(name, None))
104 for name in compiled.positiontup)) # type: ignore
108 # Fixes an odd issue with Python 3.7 where percentages are not
110 sqlstr = re.sub(r'%(?!\()', '%%', sqlstr)
111 sqlstr = re.sub(r'__\[POSTCOMPILE_([^]]*)\]', r'%(\1)s', sqlstr)
112 return sqlstr % params
114 class HTMLLogger(BaseLogger):
115 """ Logger that formats messages in HTML.
117 def __init__(self) -> None:
118 self.buffer = io.StringIO()
121 def _timestamp(self) -> None:
122 self._write(f'<p class="timestamp">[{dt.datetime.now()}]</p>')
125 def get_buffer(self) -> str:
126 return HTML_HEADER + self.buffer.getvalue() + HTML_FOOTER
129 def function(self, func: str, **kwargs: Any) -> None:
131 self._write(f"<h1>Debug output for {func}()</h1>\n<p>Parameters:<dl>")
132 for name, value in kwargs.items():
133 self._write(f'<dt>{name}</dt><dd>{self._python_var(value)}</dd>')
134 self._write('</dl></p>')
137 def section(self, heading: str) -> None:
139 self._write(f"<h2>{heading}</h2>")
142 def comment(self, text: str) -> None:
144 self._write(f"<p>{text}</p>")
147 def var_dump(self, heading: str, var: Any) -> None:
152 self._write(f'<h5>{heading}</h5>{self._python_var(var)}')
155 def table_dump(self, heading: str, rows: Iterator[Optional[List[Any]]]) -> None:
159 self._write(f'<table><thead><tr><th colspan="{len(head)}">{heading}</th></tr><tr>')
161 self._write(f'<th>{cell}</th>')
162 self._write('</tr></thead><tbody>')
167 self._write(f'<td>{cell}</td>')
169 self._write('</tbody></table>')
172 def result_dump(self, heading: str, results: Iterator[Tuple[Any, Any]]) -> None:
173 """ Print a list of search results generated by the generator function.
176 def format_osm(osm_object: Optional[Tuple[str, int]]) -> str:
190 return f'<a href="https://www.openstreetmap.org/{fullt}/{i}">{t}{i}</a>'
192 self._write(f'<h5>{heading}</h5><p><dl>')
194 for rank, res in results:
195 self._write(f'<dt>[{rank:.3f}]</dt> <dd>{res.source_table.name}(')
196 self._write(f"{_debug_name(res)}, type=({','.join(res.category)}), ")
197 self._write(f"rank={res.rank_address}, ")
198 self._write(f"osm={format_osm(res.osm_object)}, ")
199 self._write(f'cc={res.country_code}, ')
200 self._write(f'importance={res.importance or float("nan"):.5f})</dd>')
202 self._write(f'</dl><b>TOTAL:</b> {total}</p>')
205 def sql(self, conn: AsyncConnection, statement: 'sa.Executable',
206 params: Union[Mapping[str, Any], Sequence[Mapping[str, Any]], None]) -> None:
208 sqlstr = self.format_sql(conn, statement, params)
210 sqlstr = highlight(sqlstr, PostgresLexer(),
211 HtmlFormatter(nowrap=True, lineseparator='<br />'))
212 self._write(f'<div class="highlight"><code class="lang-sql">{sqlstr}</code></div>')
214 self._write(f'<code class="lang-sql">{sqlstr}</code>')
217 def _python_var(self, var: Any) -> str:
219 fmt = highlight(str(var), PythonLexer(), HtmlFormatter(nowrap=True))
220 return f'<div class="highlight"><code class="lang-python">{fmt}</code></div>'
222 return f'<code class="lang-python">{str(var)}</code>'
225 def _write(self, text: str) -> None:
226 """ Add the raw text to the debug output.
228 self.buffer.write(text)
231 class TextLogger(BaseLogger):
232 """ Logger creating output suitable for the console.
234 def __init__(self) -> None:
235 self.buffer = io.StringIO()
238 def get_buffer(self) -> str:
239 return self.buffer.getvalue()
242 def function(self, func: str, **kwargs: Any) -> None:
243 self._write(f"#### Debug output for {func}()\n\nParameters:\n")
244 for name, value in kwargs.items():
245 self._write(f' {name}: {self._python_var(value)}\n')
249 def section(self, heading: str) -> None:
250 self._write(f"\n# {heading}\n\n")
253 def comment(self, text: str) -> None:
254 self._write(f"{text}\n")
257 def var_dump(self, heading: str, var: Any) -> None:
261 self._write(f'{heading}:\n {self._python_var(var)}\n\n')
264 def table_dump(self, heading: str, rows: Iterator[Optional[List[Any]]]) -> None:
265 self._write(f'{heading}:\n')
266 data = [list(map(self._python_var, row)) if row else None for row in rows]
267 assert data[0] is not None
268 num_cols = len(data[0])
270 maxlens = [max(len(d[i]) for d in data if d) for i in range(num_cols)]
271 tablewidth = sum(maxlens) + 3 * num_cols + 1
272 row_format = '| ' +' | '.join(f'{{:<{l}}}' for l in maxlens) + ' |\n'
273 self._write('-'*tablewidth + '\n')
274 self._write(row_format.format(*data[0]))
275 self._write('-'*tablewidth + '\n')
278 self._write(row_format.format(*row))
280 self._write('-'*tablewidth + '\n')
282 self._write('-'*tablewidth + '\n')
285 def result_dump(self, heading: str, results: Iterator[Tuple[Any, Any]]) -> None:
286 self._write(f'{heading}:\n')
288 for rank, res in results:
289 self._write(f'[{rank:.3f}] {res.source_table.name}(')
290 self._write(f"{_debug_name(res)}, type=({','.join(res.category)}), ")
291 self._write(f"rank={res.rank_address}, ")
292 self._write(f"osm={''.join(map(str, res.osm_object or []))}, ")
293 self._write(f'cc={res.country_code}, ')
294 self._write(f'importance={res.importance or -1:.5f})\n')
296 self._write(f'TOTAL: {total}\n\n')
299 def sql(self, conn: AsyncConnection, statement: 'sa.Executable',
300 params: Union[Mapping[str, Any], Sequence[Mapping[str, Any]], None]) -> None:
301 sqlstr = '\n| '.join(textwrap.wrap(self.format_sql(conn, statement, params), width=78))
302 self._write(f"| {sqlstr}\n\n")
305 def _python_var(self, var: Any) -> str:
309 def _write(self, text: str) -> None:
310 self.buffer.write(text)
313 logger: ContextVar[BaseLogger] = ContextVar('logger', default=BaseLogger())
316 def set_log_output(fmt: str) -> None:
317 """ Enable collecting debug information.
320 logger.set(HTMLLogger())
322 logger.set(TextLogger())
324 logger.set(BaseLogger())
327 def log() -> BaseLogger:
328 """ Return the logger for the current context.
333 def get_and_disable() -> str:
334 """ Return the current content of the debug buffer and disable logging.
336 buf = logger.get().get_buffer()
337 logger.set(BaseLogger())
341 HTML_HEADER: str = """<!DOCTYPE html>
344 <title>Nominatim - Debug</title>
347 (HtmlFormatter(nobackground=True).get_style_defs('.highlight') if CODE_HIGHLIGHT else '') +\
349 h2 { font-size: x-large }
353 font-family: monospace
362 dt::after { content: ": "; }
375 border: solid lightgrey 0.1pt;
377 background-color: #f7f7f7
382 border: solid lightgrey 0.1pt
387 border-collapse: collapse;
390 border-right: thin solid;
398 width: calc(100% - 5pt);
409 HTML_FOOTER: str = "</body></html>"