1 # SPDX-License-Identifier: GPL-3.0-or-later
 
   3 # This file is part of Nominatim. (https://nominatim.org)
 
   5 # Copyright (C) 2025 by the Nominatim developer community.
 
   6 # For a full list of authors see the git log.
 
   8 Functions for specialised logging with HTML output.
 
  10 from typing import Any, Iterator, Optional, List, Tuple, cast, Union, Mapping, Sequence
 
  11 from contextvars import ContextVar
 
  18 import sqlalchemy as sa
 
  19 from sqlalchemy.ext.asyncio import AsyncConnection
 
  22     from pygments import highlight
 
  23     from pygments.lexers import PythonLexer, PostgresLexer
 
  24     from pygments.formatters import HtmlFormatter
 
  26 except ModuleNotFoundError:
 
  27     CODE_HIGHLIGHT = False
 
  30 def _debug_name(res: Any) -> str:
 
  32         return cast(str, res.names.get('name', next(iter(res.names.values()))))
 
  34     return f"Hnr {res.housenumber}" if res.housenumber is not None else '[NONE]'
 
  38     """ Interface for logging function.
 
  40         The base implementation does nothing. Overwrite the functions
 
  41         in derived classes which implement logging functionality.
 
  43     def get_buffer(self) -> str:
 
  44         """ Return the current content of the log buffer.
 
  48     def function(self, func: str, **kwargs: Any) -> None:
 
  49         """ Start a new debug chapter for the given function and its parameters.
 
  52     def section(self, heading: str) -> None:
 
  53         """ Start a new section with the given title.
 
  56     def comment(self, text: str) -> None:
 
  57         """ Add a simple comment to the debug output.
 
  60     def var_dump(self, heading: str, var: Any) -> None:
 
  61         """ Print the content of the variable to the debug output prefixed by
 
  65     def table_dump(self, heading: str, rows: Iterator[Optional[List[Any]]]) -> None:
 
  66         """ Print the table generated by the generator function.
 
  69     def result_dump(self, heading: str, results: Iterator[Tuple[Any, Any]]) -> None:
 
  70         """ Print a list of search results generated by the generator function.
 
  73     def sql(self, conn: AsyncConnection, statement: 'sa.Executable',
 
  74             params: Union[Mapping[str, Any], Sequence[Mapping[str, Any]], None]) -> None:
 
  75         """ Print the SQL for the given statement.
 
  78     def format_sql(self, conn: AsyncConnection, statement: 'sa.Executable',
 
  79                    extra_params: Union[Mapping[str, Any], Sequence[Mapping[str, Any]], None]
 
  81         """ Return the compiled version of the statement.
 
  83         compiled = cast('sa.ClauseElement', statement).compile(conn.sync_engine)
 
  85         params = dict(compiled.params)
 
  86         if isinstance(extra_params, Mapping):
 
  87             for k, v in extra_params.items():
 
  88                 if hasattr(v, 'to_wkt'):
 
  89                     params[k] = v.to_wkt()
 
  90                 elif isinstance(v, (int, float)):
 
  94         elif isinstance(extra_params, Sequence) and extra_params:
 
  95             for k in extra_params[0]:
 
  98         sqlstr = str(compiled)
 
 100         if conn.dialect.name == 'postgresql':
 
 101             if sa.__version__.startswith('1'):
 
 103                     sqlstr = re.sub(r'__\[POSTCOMPILE_[^]]*\]', '%s', sqlstr)
 
 104                     return sqlstr % tuple((repr(params.get(name, None))
 
 105                                           for name in compiled.positiontup))  # type: ignore
 
 109             sqlstr = re.sub(r'__\[POSTCOMPILE_([^]]*)\]', r'%(\1)s', sqlstr)
 
 110             return sqlstr % params
 
 112         assert conn.dialect.name == 'sqlite'
 
 114         # params in positional order
 
 115         pparams = (repr(params.get(name, None)) for name in compiled.positiontup)  # type: ignore
 
 117         sqlstr = re.sub(r'__\[POSTCOMPILE_([^]]*)\]', '?', sqlstr)
 
 118         sqlstr = re.sub(r"\?", lambda m: next(pparams), sqlstr)
 
 123 class HTMLLogger(BaseLogger):
 
 124     """ Logger that formats messages in HTML.
 
 126     def __init__(self) -> None:
 
 127         self.buffer = io.StringIO()
 
 129     def _timestamp(self) -> None:
 
 130         self._write(f'<p class="timestamp">[{dt.datetime.now()}]</p>')
 
 132     def get_buffer(self) -> str:
 
 133         return HTML_HEADER + self.buffer.getvalue() + HTML_FOOTER
 
 135     def function(self, func: str, **kwargs: Any) -> None:
 
 137         self._write(f"<h1>Debug output for {func}()</h1>\n<p>Parameters:<dl>")
 
 138         for name, value in kwargs.items():
 
 139             self._write(f'<dt>{name}</dt><dd>{self._python_var(value)}</dd>')
 
 140         self._write('</dl></p>')
 
 142     def section(self, heading: str) -> None:
 
 144         self._write(f"<h2>{heading}</h2>")
 
 146     def comment(self, text: str) -> None:
 
 148         self._write(f"<p>{text}</p>")
 
 150     def var_dump(self, heading: str, var: Any) -> None:
 
 155         self._write(f'<h5>{heading}</h5>{self._python_var(var)}')
 
 157     def table_dump(self, heading: str, rows: Iterator[Optional[List[Any]]]) -> None:
 
 161         self._write(f'<table><thead><tr><th colspan="{len(head)}">{heading}</th></tr><tr>')
 
 163             self._write(f'<th>{cell}</th>')
 
 164         self._write('</tr></thead><tbody>')
 
 169                     self._write(f'<td>{cell}</td>')
 
 171         self._write('</tbody></table>')
 
 173     def result_dump(self, heading: str, results: Iterator[Tuple[Any, Any]]) -> None:
 
 174         """ Print a list of search results generated by the generator function.
 
 178         def format_osm(osm_object: Optional[Tuple[str, int]]) -> str:
 
 192             return f'<a href="https://www.openstreetmap.org/{fullt}/{i}">{t}{i}</a>'
 
 194         self._write(f'<h5>{heading}</h5><p><dl>')
 
 196         for rank, res in results:
 
 197             self._write(f'<dt>[{rank:.3f}]</dt>  <dd>{res.source_table.name}(')
 
 198             self._write(f"{_debug_name(res)}, type=({','.join(res.category)}), ")
 
 199             self._write(f"rank={res.rank_address}, ")
 
 200             self._write(f"osm={format_osm(res.osm_object)}, ")
 
 201             self._write(f'cc={res.country_code}, ')
 
 202             self._write(f'importance={res.importance or float("nan"):.5f})</dd>')
 
 204         self._write(f'</dl><b>TOTAL:</b> {total}</p>')
 
 206     def sql(self, conn: AsyncConnection, statement: 'sa.Executable',
 
 207             params: Union[Mapping[str, Any], Sequence[Mapping[str, Any]], None]) -> None:
 
 209         sqlstr = self.format_sql(conn, statement, params)
 
 211             sqlstr = highlight(sqlstr, PostgresLexer(),
 
 212                                HtmlFormatter(nowrap=True, lineseparator='<br />'))
 
 213             self._write(f'<div class="highlight"><code class="lang-sql">{sqlstr}</code></div>')
 
 215             self._write(f'<code class="lang-sql">{html.escape(sqlstr)}</code>')
 
 217     def _python_var(self, var: Any) -> str:
 
 219             fmt = highlight(str(var), PythonLexer(), HtmlFormatter(nowrap=True))
 
 220             return f'<div class="highlight"><code class="lang-python">{fmt}</code></div>'
 
 222         return f'<code class="lang-python">{html.escape(str(var))}</code>'
 
 224     def _write(self, text: str) -> None:
 
 225         """ Add the raw text to the debug output.
 
 227         self.buffer.write(text)
 
 230 class TextLogger(BaseLogger):
 
 231     """ Logger creating output suitable for the console.
 
 233     def __init__(self) -> None:
 
 234         self.buffer = io.StringIO()
 
 236     def _timestamp(self) -> None:
 
 237         self._write(f'[{dt.datetime.now()}]\n')
 
 239     def get_buffer(self) -> str:
 
 240         return self.buffer.getvalue()
 
 242     def function(self, func: str, **kwargs: Any) -> None:
 
 243         self._write(f"#### Debug output for {func}()\n\nParameters:\n")
 
 244         for name, value in kwargs.items():
 
 245             self._write(f'  {name}: {self._python_var(value)}\n')
 
 248     def section(self, heading: str) -> None:
 
 250         self._write(f"\n# {heading}\n\n")
 
 252     def comment(self, text: str) -> None:
 
 253         self._write(f"{text}\n")
 
 255     def var_dump(self, heading: str, var: Any) -> None:
 
 259         self._write(f'{heading}:\n  {self._python_var(var)}\n\n')
 
 261     def table_dump(self, heading: str, rows: Iterator[Optional[List[Any]]]) -> None:
 
 262         self._write(f'{heading}:\n')
 
 263         data = [list(map(self._python_var, row)) if row else None for row in rows]
 
 264         assert data[0] is not None
 
 265         num_cols = len(data[0])
 
 267         maxlens = [max(len(d[i]) for d in data if d) for i in range(num_cols)]
 
 268         tablewidth = sum(maxlens) + 3 * num_cols + 1
 
 269         row_format = '| ' + ' | '.join(f'{{:<{ln}}}' for ln in maxlens) + ' |\n'
 
 270         self._write('-'*tablewidth + '\n')
 
 271         self._write(row_format.format(*data[0]))
 
 272         self._write('-'*tablewidth + '\n')
 
 275                 self._write(row_format.format(*row))
 
 277                 self._write('-'*tablewidth + '\n')
 
 279             self._write('-'*tablewidth + '\n')
 
 281     def result_dump(self, heading: str, results: Iterator[Tuple[Any, Any]]) -> None:
 
 283         self._write(f'{heading}:\n')
 
 285         for rank, res in results:
 
 286             self._write(f'[{rank:.3f}]  {res.source_table.name}(')
 
 287             self._write(f"{_debug_name(res)}, type=({','.join(res.category)}), ")
 
 288             self._write(f"rank={res.rank_address}, ")
 
 289             self._write(f"osm={''.join(map(str, res.osm_object or []))}, ")
 
 290             self._write(f'cc={res.country_code}, ')
 
 291             self._write(f'importance={res.importance or -1:.5f})\n')
 
 293         self._write(f'TOTAL: {total}\n\n')
 
 295     def sql(self, conn: AsyncConnection, statement: 'sa.Executable',
 
 296             params: Union[Mapping[str, Any], Sequence[Mapping[str, Any]], None]) -> None:
 
 298         sqlstr = '\n| '.join(textwrap.wrap(self.format_sql(conn, statement, params), width=78))
 
 299         self._write(f"| {sqlstr}\n\n")
 
 301     def _python_var(self, var: Any) -> str:
 
 304     def _write(self, text: str) -> None:
 
 305         self.buffer.write(text)
 
 308 logger: ContextVar[BaseLogger] = ContextVar('logger', default=BaseLogger())
 
 311 def set_log_output(fmt: str) -> None:
 
 312     """ Enable collecting debug information.
 
 315         logger.set(HTMLLogger())
 
 317         logger.set(TextLogger())
 
 319         logger.set(BaseLogger())
 
 322 def log() -> BaseLogger:
 
 323     """ Return the logger for the current context.
 
 328 def get_and_disable() -> str:
 
 329     """ Return the current content of the debug buffer and disable logging.
 
 331     buf = logger.get().get_buffer()
 
 332     logger.set(BaseLogger())
 
 336 HTML_HEADER: str = """<!DOCTYPE html>
 
 339   <title>Nominatim - Debug</title>
 
 342     (HtmlFormatter(nobackground=True).get_style_defs('.highlight')  # type: ignore[no-untyped-call]
 
 343      if CODE_HIGHLIGHT else '') + \
 
 345     h2 { font-size: x-large }
 
 349       font-family: monospace
 
 358     dt::after { content: ": "; }
 
 371         border: solid lightgrey 0.1pt;
 
 373         background-color: #f7f7f7
 
 378         border: solid lightgrey 0.1pt
 
 383         border-collapse: collapse;
 
 386         border-right: thin solid;
 
 394         width: calc(100% - 5pt);
 
 405 HTML_FOOTER: str = "</body></html>"