1 # SPDX-License-Identifier: GPL-3.0-or-later
 
   3 # This file is part of Nominatim. (https://nominatim.org)
 
   5 # Copyright (C) 2024 by the Nominatim developer community.
 
   6 # For a full list of authors see the git log.
 
   8 Output formatters for API version v1.
 
  10 from typing import List, Dict, Mapping, Any
 
  14 from ..utils.json_writer import JsonWriter
 
  15 from ..status import StatusResult
 
  16 from ..results import DetailedResult, ReverseResults, SearchResults, \
 
  17                       AddressLines, AddressLine
 
  18 from ..localization import Locales
 
  19 from ..result_formatting import FormatDispatcher
 
  20 from .classtypes import ICONS
 
  21 from . import format_json, format_xml
 
  22 from .. import logging as loglib
 
  23 from ..server import content_types as ct
 
  26 class RawDataList(List[Dict[str, Any]]):
 
  27     """ Data type for formatting raw data lists 'as is' in json.
 
  31 dispatch = FormatDispatcher({'text': ct.CONTENT_TEXT,
 
  32                              'xml': ct.CONTENT_XML,
 
  33                              'debug': ct.CONTENT_HTML})
 
  36 @dispatch.error_format_func
 
  37 def _format_error(content_type: str, msg: str, status: int) -> str:
 
  38     if content_type == ct.CONTENT_XML:
 
  39         return f"""<?xml version="1.0" encoding="UTF-8" ?>
 
  42                      <message>{msg}</message>
 
  46     if content_type == ct.CONTENT_JSON:
 
  47         return f"""{{"error":{{"code":{status},"message":"{msg}"}}}}"""
 
  49     if content_type == ct.CONTENT_HTML:
 
  50         loglib.log().section('Execution error')
 
  51         loglib.log().var_dump('Status', status)
 
  52         loglib.log().var_dump('Message', msg)
 
  53         return loglib.get_and_disable()
 
  55     return f"ERROR {status}: {msg}"
 
  58 @dispatch.format_func(StatusResult, 'text')
 
  59 def _format_status_text(result: StatusResult, _: Mapping[str, Any]) -> str:
 
  61         return f"ERROR: {result.message}"
 
  66 @dispatch.format_func(StatusResult, 'json')
 
  67 def _format_status_json(result: StatusResult, _: Mapping[str, Any]) -> str:
 
  71         .keyval('status', result.status)\
 
  72         .keyval('message', result.message)\
 
  73         .keyval_not_none('data_updated', result.data_updated,
 
  74                          lambda v: v.isoformat())\
 
  75         .keyval('software_version', str(result.software_version))\
 
  76         .keyval_not_none('database_version', result.database_version, str)\
 
  82 def _add_address_row(writer: JsonWriter, row: AddressLine,
 
  83                      locales: Locales) -> None:
 
  84     writer.start_object()\
 
  85             .keyval('localname', locales.display_name(row.names))\
 
  86             .keyval_not_none('place_id', row.place_id)
 
  88     if row.osm_object is not None:
 
  89         writer.keyval('osm_id', row.osm_object[1])\
 
  90               .keyval('osm_type', row.osm_object[0])
 
  93         writer.keyval_not_none('place_type', row.extratags.get('place_type'))
 
  95     writer.keyval('class', row.category[0])\
 
  96           .keyval('type', row.category[1])\
 
  97           .keyval_not_none('admin_level', row.admin_level)\
 
  98           .keyval('rank_address', row.rank_address)\
 
  99           .keyval('distance', row.distance)\
 
 100           .keyval('isaddress', row.isaddress)\
 
 104 def _add_address_rows(writer: JsonWriter, section: str, rows: AddressLines,
 
 105                       locales: Locales) -> None:
 
 106     writer.key(section).start_array()
 
 108         _add_address_row(writer, row, locales)
 
 110     writer.end_array().next()
 
 113 def _add_parent_rows_grouped(writer: JsonWriter, rows: AddressLines,
 
 114                              locales: Locales) -> None:
 
 115     # group by category type
 
 116     data = collections.defaultdict(list)
 
 119         _add_address_row(sub, row, locales)
 
 120         data[row.category[1]].append(sub())
 
 122     writer.key('hierarchy').start_object()
 
 123     for group, grouped in data.items():
 
 124         writer.key(group).start_array()
 
 125         grouped.sort()  # sorts alphabetically by local name
 
 127             writer.raw(line).next()
 
 128         writer.end_array().next()
 
 130     writer.end_object().next()
 
 133 @dispatch.format_func(DetailedResult, 'json')
 
 134 def _format_details_json(result: DetailedResult, options: Mapping[str, Any]) -> str:
 
 135     locales = options.get('locales', Locales())
 
 136     geom = result.geometry.get('geojson')
 
 137     centroid = result.centroid.to_geojson()
 
 141         .keyval_not_none('place_id', result.place_id)\
 
 142         .keyval_not_none('parent_place_id', result.parent_place_id)
 
 144     if result.osm_object is not None:
 
 145         out.keyval('osm_type', result.osm_object[0])\
 
 146            .keyval('osm_id', result.osm_object[1])
 
 148     out.keyval('category', result.category[0])\
 
 149        .keyval('type', result.category[1])\
 
 150        .keyval('admin_level', result.admin_level)\
 
 151        .keyval('localname', result.locale_name or '')\
 
 152        .keyval('names', result.names or {})\
 
 153        .keyval('addresstags', result.address or {})\
 
 154        .keyval_not_none('housenumber', result.housenumber)\
 
 155        .keyval_not_none('calculated_postcode', result.postcode)\
 
 156        .keyval_not_none('country_code', result.country_code)\
 
 157        .keyval_not_none('indexed_date', result.indexed_date, lambda v: v.isoformat())\
 
 158        .keyval_not_none('importance', result.importance)\
 
 159        .keyval('calculated_importance', result.calculated_importance())\
 
 160        .keyval('extratags', result.extratags or {})\
 
 161        .keyval_not_none('calculated_wikipedia', result.wikipedia)\
 
 162        .keyval('rank_address', result.rank_address)\
 
 163        .keyval('rank_search', result.rank_search)\
 
 164        .keyval('isarea', 'Polygon' in (geom or result.geometry.get('type') or ''))\
 
 165        .key('centroid').raw(centroid).next()\
 
 166        .key('geometry').raw(geom or centroid).next()
 
 168     if options.get('icon_base_url', None):
 
 169         icon = ICONS.get(result.category)
 
 171             out.keyval('icon', f"{options['icon_base_url']}/{icon}.p.20.png")
 
 173     if result.address_rows is not None:
 
 174         _add_address_rows(out, 'address', result.address_rows, locales)
 
 176     if result.linked_rows:
 
 177         _add_address_rows(out, 'linked_places', result.linked_rows, locales)
 
 179     if result.name_keywords is not None or result.address_keywords is not None:
 
 180         out.key('keywords').start_object()
 
 182         for sec, klist in (('name', result.name_keywords), ('address', result.address_keywords)):
 
 183             out.key(sec).start_array()
 
 184             for word in (klist or []):
 
 186                      .keyval('id', word.word_id)\
 
 187                      .keyval('token', word.word_token)\
 
 189             out.end_array().next()
 
 191         out.end_object().next()
 
 193     if result.parented_rows is not None:
 
 194         if options.get('group_hierarchy', False):
 
 195             _add_parent_rows_grouped(out, result.parented_rows, locales)
 
 197             _add_address_rows(out, 'hierarchy', result.parented_rows, locales)
 
 204 @dispatch.format_func(ReverseResults, 'xml')
 
 205 def _format_reverse_xml(results: ReverseResults, options: Mapping[str, Any]) -> str:
 
 206     return format_xml.format_base_xml(results,
 
 207                                       options, True, 'reversegeocode',
 
 208                                       {'querystring': options.get('query', '')})
 
 211 @dispatch.format_func(ReverseResults, 'geojson')
 
 212 def _format_reverse_geojson(results: ReverseResults,
 
 213                             options: Mapping[str, Any]) -> str:
 
 214     return format_json.format_base_geojson(results, options, True)
 
 217 @dispatch.format_func(ReverseResults, 'geocodejson')
 
 218 def _format_reverse_geocodejson(results: ReverseResults,
 
 219                                 options: Mapping[str, Any]) -> str:
 
 220     return format_json.format_base_geocodejson(results, options, True)
 
 223 @dispatch.format_func(ReverseResults, 'json')
 
 224 def _format_reverse_json(results: ReverseResults,
 
 225                          options: Mapping[str, Any]) -> str:
 
 226     return format_json.format_base_json(results, options, True,
 
 230 @dispatch.format_func(ReverseResults, 'jsonv2')
 
 231 def _format_reverse_jsonv2(results: ReverseResults,
 
 232                            options: Mapping[str, Any]) -> str:
 
 233     return format_json.format_base_json(results, options, True,
 
 234                                         class_label='category')
 
 237 @dispatch.format_func(SearchResults, 'xml')
 
 238 def _format_search_xml(results: SearchResults, options: Mapping[str, Any]) -> str:
 
 239     extra = {'querystring': options.get('query', '')}
 
 240     for attr in ('more_url', 'exclude_place_ids', 'viewbox'):
 
 241         if options.get(attr):
 
 242             extra[attr] = options[attr]
 
 243     return format_xml.format_base_xml(results, options, False, 'searchresults',
 
 247 @dispatch.format_func(SearchResults, 'geojson')
 
 248 def _format_search_geojson(results: SearchResults,
 
 249                            options: Mapping[str, Any]) -> str:
 
 250     return format_json.format_base_geojson(results, options, False)
 
 253 @dispatch.format_func(SearchResults, 'geocodejson')
 
 254 def _format_search_geocodejson(results: SearchResults,
 
 255                                options: Mapping[str, Any]) -> str:
 
 256     return format_json.format_base_geocodejson(results, options, False)
 
 259 @dispatch.format_func(SearchResults, 'json')
 
 260 def _format_search_json(results: SearchResults,
 
 261                         options: Mapping[str, Any]) -> str:
 
 262     return format_json.format_base_json(results, options, False,
 
 266 @dispatch.format_func(SearchResults, 'jsonv2')
 
 267 def _format_search_jsonv2(results: SearchResults,
 
 268                           options: Mapping[str, Any]) -> str:
 
 269     return format_json.format_base_json(results, options, False,
 
 270                                         class_label='category')
 
 273 @dispatch.format_func(RawDataList, 'json')
 
 274 def _format_raw_data_json(results: RawDataList,  _: Mapping[str, Any]) -> str:
 
 279         for k, v in res.items():
 
 280             if isinstance(v, dt.datetime):
 
 281                 out.keyval(k, v.isoformat(sep=' ', timespec='seconds'))
 
 284         out.end_object().next()