else()
message (STATUS "Using PHP binary " ${PHP_BIN})
endif()
- if (NOT PHPCGI_BIN)
- find_program (PHPCGI_BIN php-cgi)
- endif()
- # sanity check if PHP binary exists
- if (NOT EXISTS ${PHPCGI_BIN})
- message(WARNING "php-cgi binary not found. nominatim tool will not provide query functions.")
- set (PHPCGI_BIN "")
- else()
- message (STATUS "Using php-cgi binary " ${PHPCGI_BIN})
- endif()
endif()
#-----------------------------------------------------------------------------
version.GIT_COMMIT_HASH = '@GIT_HASH@'
exit(cli.nominatim(module_dir='@NOMINATIM_LIBDIR@/module',
- osm2pgsql_path='@NOMINATIM_LIBDIR@/osm2pgsql',
- phpcgi_path='@PHPCGI_BIN@'))
+ osm2pgsql_path='@NOMINATIM_LIBDIR@/osm2pgsql'))
version.GIT_COMMIT_HASH = '@GIT_HASH@'
exit(cli.nominatim(module_dir='@CMAKE_BINARY_DIR@/module',
- osm2pgsql_path='@CMAKE_BINARY_DIR@/osm2pgsql/osm2pgsql',
- phpcgi_path='@PHPCGI_BIN@'))
+ osm2pgsql_path='@CMAKE_BINARY_DIR@/osm2pgsql/osm2pgsql'))
* [PHP](https://php.net) (7.3+)
* PHP-pgsql
* PHP-intl (bundled with PHP)
- * PHP-cgi (for running queries from the command line)
For running continuous updates:
+++ /dev/null
-<?php
-/**
- * SPDX-License-Identifier: GPL-2.0-only
- *
- * This file is part of Nominatim. (https://nominatim.org)
- *
- * Copyright (C) 2022 by the Nominatim developer community.
- * For a full list of authors see the git log.
- */
- @define('CONST_LibDir', dirname(dirname(__FILE__)));
- // Script to extract structured city and street data
- // from a running nominatim instance as CSV data
-
-
- require_once(CONST_LibDir.'/init-cmd.php');
- require_once(CONST_LibDir.'/ParameterParser.php');
- ini_set('memory_limit', '800M');
-
- $aCMDOptions = array(
- 'Export addresses as CSV file from a Nominatim database',
- array('help', 'h', 0, 1, 0, 0, false, 'Show Help'),
- array('quiet', 'q', 0, 1, 0, 0, 'bool', 'Quiet output'),
- array('verbose', 'v', 0, 1, 0, 0, 'bool', 'Verbose output'),
-
- array('output-type', '', 0, 1, 1, 1, 'str', 'Type of places to output (see below)'),
- array('output-format', '', 0, 1, 1, 1, 'str', 'Column mapping (see below)'),
- array('output-all-postcodes', '', 0, 1, 0, 0, 'bool', 'List all postcodes for address instead of just the most likely one'),
- array('language', '', 0, 1, 1, 1, 'str', 'Preferred language for output (local name, if omitted)'),
- array('restrict-to-country', '', 0, 1, 1, 1, 'str', 'Export only objects within country (country code)'),
- array('restrict-to-osm-node', '', 0, 1, 1, 1, 'int', 'Export only objects that are children of this OSM node'),
- array('restrict-to-osm-way', '', 0, 1, 1, 1, 'int', 'Export only objects that are children of this OSM way'),
- array('restrict-to-osm-relation', '', 0, 1, 1, 1, 'int', 'Export only objects that are children of this OSM relation'),
- array('project-dir', '', 0, 1, 1, 1, 'realpath', 'Base directory of the Nominatim installation (default: .)'),
- "\nAddress ranks: continent, country, state, county, city, suburb, street, path",
- 'Additional output types: postcode, placeid (placeid for each object)',
- "\noutput-format must be a semicolon-separated list of address ranks. Multiple ranks",
- 'can be merged into one column by simply using a comma-separated list.',
- "\nDefault output-type: street",
- 'Default output format: street;suburb;city;county;state;country'
- );
- getCmdOpt($_SERVER['argv'], $aCMDOptions, $aCMDResult, true, true);
-
- loadSettings($aCMDResult['project-dir'] ?? getcwd());
-
- $aRankmap = array(
- 'continent' => 1,
- 'country' => 4,
- 'state' => 8,
- 'county' => 12,
- 'city' => 16,
- 'suburb' => 20,
- 'street' => 26,
- 'path' => 27
- );
-
- $oDB = new Nominatim\DB();
- $oDB->connect();
-
- if (isset($aCMDResult['output-type'])) {
- if (!isset($aRankmap[$aCMDResult['output-type']])) {
- fail('unknown output-type: '.$aCMDResult['output-type']);
- }
- $iOutputRank = $aRankmap[$aCMDResult['output-type']];
- } else {
- $iOutputRank = $aRankmap['street'];
- }
-
-
- // Preferred language
- $oParams = new Nominatim\ParameterParser();
- if (!isset($aCMDResult['language'])) {
- $aCMDResult['language'] = 'xx';
- }
- $aLangPrefOrder = $oParams->getPreferredLanguages($aCMDResult['language']);
- $sLanguagePrefArraySQL = $oDB->getArraySQL($oDB->getDBQuotedList($aLangPrefOrder));
-
- // output formatting: build up a lookup table that maps address ranks to columns
- $aColumnMapping = array();
- $iNumCol = 0;
- if (!isset($aCMDResult['output-format'])) {
- $aCMDResult['output-format'] = 'street;suburb;city;county;state;country';
- }
- foreach (preg_split('/\s*;\s*/', $aCMDResult['output-format']) as $sColumn) {
- $bHasData = false;
- foreach (preg_split('/\s*,\s*/', $sColumn) as $sRank) {
- if ($sRank == 'postcode' || $sRank == 'placeid') {
- $aColumnMapping[$sRank] = $iNumCol;
- $bHasData = true;
- } elseif (isset($aRankmap[$sRank])) {
- $iRank = $aRankmap[$sRank];
- if ($iRank <= $iOutputRank) {
- $aColumnMapping[(string)$iRank] = $iNumCol;
- $bHasData = true;
- }
- }
- }
- if ($bHasData) {
- $iNumCol++;
- }
- }
-
- // build the query for objects
- $sPlacexSQL = 'select min(place_id) as place_id, ';
- $sPlacexSQL .= 'array_agg(place_id) as place_ids, ';
- $sPlacexSQL .= 'country_code as cc, ';
- $sPlacexSQL .= 'postcode, ';
- // get the address places excluding postcodes
- $sPlacexSQL .= 'array(select address_place_id from place_addressline a';
- $sPlacexSQL .= ' where a.place_id = placex.place_id and isaddress';
- $sPlacexSQL .= ' and address_place_id != placex.place_id';
- $sPlacexSQL .= ' and not cached_rank_address in (5,11)';
- $sPlacexSQL .= ' and cached_rank_address > 2 order by cached_rank_address)';
- $sPlacexSQL .= ' as address';
- $sPlacexSQL .= ' from placex where name is not null and linked_place_id is null';
-
- $sPlacexSQL .= ' and rank_address = '.$iOutputRank;
-
- if (isset($aCMDResult['restrict-to-country'])) {
- $sPlacexSQL .= ' and country_code = '.$oDB->getDBQuoted($aCMDResult['restrict-to-country']);
- }
-
- // restriction to parent place id
- $sParentId = false;
- $sOsmType = false;
-
- if (isset($aCMDResult['restrict-to-osm-node'])) {
- $sOsmType = 'N';
- $sOsmId = $aCMDResult['restrict-to-osm-node'];
- }
- if (isset($aCMDResult['restrict-to-osm-way'])) {
- $sOsmType = 'W';
- $sOsmId = $aCMDResult['restrict-to-osm-way'];
- }
- if (isset($aCMDResult['restrict-to-osm-relation'])) {
- $sOsmType = 'R';
- $sOsmId = $aCMDResult['restrict-to-osm-relation'];
- }
- if ($sOsmType) {
- $sSQL = 'select place_id from placex where osm_type = :osm_type and osm_id = :osm_id';
- $sParentId = $oDB->getOne($sSQL, array('osm_type' => $sOsmType, 'osm_id' => $sOsmId));
- if (!$sParentId) {
- fail('Could not find place '.$sOsmType.' '.$sOsmId);
- }
- }
- if ($sParentId) {
- $sPlacexSQL .= ' and place_id in (select place_id from place_addressline where address_place_id = '.$sParentId.' and isaddress)';
- }
-
- $sPlacexSQL .= " group by name->'name', address, postcode, country_code, placex.place_id";
-
- // Iterate over placeids
- // to get further hierarchical information
- //var_dump($sPlacexSQL);
- $oResults = $oDB->getQueryStatement($sPlacexSQL);
- $fOutstream = fopen('php://output', 'w');
- while ($aRow = $oResults->fetch()) {
- $iPlaceID = $aRow['place_id'];
- $sSQL = "select rank_address,get_name_by_language(name,$sLanguagePrefArraySQL) as localname from get_addressdata(:place_id, -1)";
- $sSQL .= ' WHERE isaddress';
- $sSQL .= ' order by rank_address desc,isaddress desc';
- $aAddressLines = $oDB->getAll($sSQL, array('place_id' => $iPlaceID));
-
- $aOutput = array_fill(0, $iNumCol, '');
- // output address parts
- foreach ($aAddressLines as $aAddress) {
- if (isset($aColumnMapping[$aAddress['rank_address']])) {
- $aOutput[$aColumnMapping[$aAddress['rank_address']]] = $aAddress['localname'];
- }
- }
- // output postcode
- if (isset($aColumnMapping['postcode'])) {
- if ($aCMDResult['output-all-postcodes']) {
- $sSQL = 'select array_agg(px.postcode) from placex px join place_addressline pa ';
- $sSQL .= 'on px.place_id = pa.address_place_id ';
- $sSQL .= 'where pa.cached_rank_address in (5,11) ';
- $sSQL .= 'and pa.place_id in (select place_id from place_addressline where address_place_id in (:first_place_id)) ';
- $sSQL .= 'group by postcode order by count(*) desc limit 1';
- $sRes = $oDB->getOne($sSQL, array('first_place_id' => substr($aRow['place_ids'], 1, -1)));
-
- $aOutput[$aColumnMapping['postcode']] = substr($sRes, 1, -1);
- } else {
- $aOutput[$aColumnMapping['postcode']] = $aRow['postcode'];
- }
- }
- if (isset($aColumnMapping['placeid'])) {
- $aOutput[$aColumnMapping['placeid']] = substr($aRow['place_ids'], 1, -1);
- }
- fputcsv($fOutstream, $aOutput);
- }
- fclose($fOutstream);
+++ /dev/null
-<?php
-/**
- * SPDX-License-Identifier: GPL-2.0-only
- *
- * This file is part of Nominatim. (https://nominatim.org)
- *
- * Copyright (C) 2022 by the Nominatim developer community.
- * For a full list of authors see the git log.
- */
-@define('CONST_LibDir', dirname(dirname(__FILE__)));
-
-require_once(CONST_LibDir.'/init-cmd.php');
-require_once(CONST_LibDir.'/log.php');
-require_once(CONST_LibDir.'/PlaceLookup.php');
-require_once(CONST_LibDir.'/ReverseGeocode.php');
-
-ini_set('memory_limit', '800M');
-
-$aCMDOptions = array(
- 'Tools to warm nominatim db',
- array('help', 'h', 0, 1, 0, 0, false, 'Show Help'),
- array('quiet', 'q', 0, 1, 0, 0, 'bool', 'Quiet output'),
- array('verbose', 'v', 0, 1, 0, 0, 'bool', 'Verbose output'),
- array('reverse-only', '', 0, 1, 0, 0, 'bool', 'Warm reverse only'),
- array('search-only', '', 0, 1, 0, 0, 'bool', 'Warm search only'),
- array('project-dir', '', 0, 1, 1, 1, 'realpath', 'Base directory of the Nominatim installation (default: .)'),
- );
-getCmdOpt($_SERVER['argv'], $aCMDOptions, $aResult, true, true);
-
-loadSettings($aCMDResult['project-dir'] ?? getcwd());
-
-@define('CONST_Database_DSN', getSetting('DATABASE_DSN'));
-@define('CONST_Default_Language', getSetting('DEFAULT_LANGUAGE', false));
-@define('CONST_Log_DB', getSettingBool('LOG_DB'));
-@define('CONST_Log_File', getSetting('LOG_FILE', false));
-@define('CONST_NoAccessControl', getSettingBool('CORS_NOACCESSCONTROL'));
-@define('CONST_Places_Max_ID_count', getSetting('LOOKUP_MAX_COUNT'));
-@define('CONST_PolygonOutput_MaximumTypes', getSetting('POLYGON_OUTPUT_MAX_TYPES'));
-@define('CONST_Search_BatchMode', getSettingBool('SEARCH_BATCH_MODE'));
-@define('CONST_Search_NameOnlySearchFrequencyThreshold', getSetting('SEARCH_NAME_ONLY_THRESHOLD'));
-@define('CONST_Use_US_Tiger_Data', getSettingBool('USE_US_TIGER_DATA'));
-@define('CONST_MapIcon_URL', getSetting('MAPICON_URL', false));
-@define('CONST_TokenizerDir', CONST_InstallDir.'/tokenizer');
-@define('CONST_Search_WithinCountries', getSetting('SEARCH_WITHIN_COUNTRIES', false));
-
-require_once(CONST_LibDir.'/Geocode.php');
-
-$oDB = new Nominatim\DB();
-$oDB->connect();
-
-$bVerbose = $aResult['verbose'];
-
-function print_results($aResults, $bVerbose)
-{
- if ($bVerbose) {
- if ($aResults && count($aResults)) {
- echo $aResults[0]['langaddress']."\n";
- } else {
- echo "<not found>\n";
- }
- } else {
- echo '.';
- }
-}
-
-if (!$aResult['search-only']) {
- $oReverseGeocode = new Nominatim\ReverseGeocode($oDB);
- $oReverseGeocode->setZoom(20);
- $oPlaceLookup = new Nominatim\PlaceLookup($oDB);
- $oPlaceLookup->setIncludeAddressDetails(true);
- $oPlaceLookup->setLanguagePreference(array('en'));
-
- echo 'Warm reverse: ';
- if ($bVerbose) {
- echo "\n";
- }
- for ($i = 0; $i < 1000; $i++) {
- $fLat = rand(-9000, 9000) / 100;
- $fLon = rand(-18000, 18000) / 100;
- if ($bVerbose) {
- echo "$fLat, $fLon = ";
- }
-
- $oLookup = $oReverseGeocode->lookup($fLat, $fLon);
- $aSearchResults = $oLookup ? $oPlaceLookup->lookup(array($oLookup->iId => $oLookup)) : null;
- print_results($aSearchResults, $bVerbose);
- }
- echo "\n";
-}
-
-if (!$aResult['reverse-only']) {
- $oGeocode = new Nominatim\Geocode($oDB);
-
- echo 'Warm search: ';
- if ($bVerbose) {
- echo "\n";
- }
-
- $oTokenizer = new \Nominatim\Tokenizer($oDB);
-
- $aWords = $oTokenizer->mostFrequentWords(1000);
-
- $sSQL = 'SELECT word FROM word WHERE word is not null ORDER BY search_name_count DESC LIMIT 1000';
- foreach ($aWords as $sWord) {
- if ($bVerbose) {
- echo "$sWord = ";
- }
-
- $oGeocode->setLanguagePreference(array('en'));
- $oGeocode->setQuery($sWord);
- $aSearchResults = $oGeocode->lookup();
- print_results($aSearchResults, $bVerbose);
- }
- echo "\n";
-}
from nominatim.cli import get_set_parser
def get_parser():
- parser = get_set_parser(phpcgi_path='@PHPCGI_BIN@')
+ parser = get_set_parser()
return parser.parser
from .core import (NominatimAPI as NominatimAPI,
NominatimAPIAsync as NominatimAPIAsync)
+from .connection import (SearchConnection as SearchConnection)
from .status import (StatusResult as StatusResult)
from .types import (PlaceID as PlaceID,
OsmID as OsmID,
"""
from typing import Mapping, Optional, Any, AsyncIterator, Dict, Sequence, List, Tuple
import asyncio
+import sys
import contextlib
from pathlib import Path
""" API loader asynchornous version.
"""
def __init__(self, project_dir: Path,
- environ: Optional[Mapping[str, str]] = None) -> None:
+ environ: Optional[Mapping[str, str]] = None,
+ loop: Optional[asyncio.AbstractEventLoop] = None) -> None:
self.config = Configuration(project_dir, environ)
self.server_version = 0
- self._engine_lock = asyncio.Lock()
+ if sys.version_info >= (3, 10):
+ self._engine_lock = asyncio.Lock()
+ else:
+ self._engine_lock = asyncio.Lock(loop=loop) # pylint: disable=unexpected-keyword-arg
self._engine: Optional[sa_asyncio.AsyncEngine] = None
self._tables: Optional[SearchTables] = None
self._property_cache: Dict[str, Any] = {'DB:server_version': 0}
def __init__(self, project_dir: Path,
environ: Optional[Mapping[str, str]] = None) -> None:
self._loop = asyncio.new_event_loop()
- self._async_api = NominatimAPIAsync(project_dir, environ)
+ self._async_api = NominatimAPIAsync(project_dir, environ, loop=self._loop)
def close(self) -> None:
# quoted correctly.
sqlstr = re.sub(r'%(?!\()', '%%', sqlstr)
sqlstr = re.sub(r'__\[POSTCOMPILE_([^]]*)\]', r'%(\1)s', sqlstr)
- print(sqlstr)
return sqlstr % params
class HTMLLogger(BaseLogger):
from nominatim.api.search.token_assignment import TokenAssignment
import nominatim.api.search.db_search_fields as dbf
import nominatim.api.search.db_searches as dbs
-from nominatim.api.logging import log
def wrap_near_search(categories: List[Tuple[str, str]],
""" Build a simple address search for special entries where the
housenumber is the main name token.
"""
- partial_tokens: List[int] = []
- for trange in address:
- partial_tokens.extend(t.token for t in self.query.get_partials_list(trange))
+ sdata.lookups = [dbf.FieldLookup('name_vector', [t.token for t in hnrs], 'lookup_any')]
- sdata.lookups = [dbf.FieldLookup('name_vector', [t.token for t in hnrs], 'lookup_any'),
- dbf.FieldLookup('nameaddress_vector', partial_tokens, 'lookup_all')
- ]
+ partials = [t for trange in address
+ for t in self.query.get_partials_list(trange)]
+
+ if len(partials) != 1 or partials[0].count < 10000:
+ sdata.lookups.append(dbf.FieldLookup('nameaddress_vector',
+ [t.token for t in partials], 'lookup_all'))
+ else:
+ sdata.lookups.append(
+ dbf.FieldLookup('nameaddress_vector',
+ [t.token for t
+ in self.query.get_tokens(address[0], TokenType.WORD)],
+ 'lookup_any'))
+
+ sdata.housenumbers = dbf.WeightedStrings([], [])
yield dbs.PlaceSearch(0.05, sdata, sum(t.count for t in hnrs))
be searched for. This takes into account how frequent the terms
are and tries to find a lookup that optimizes index use.
"""
- penalty = 0.0 # extra penalty currently unused
-
+ penalty = 0.0 # extra penalty
name_partials = self.query.get_partials_list(name)
- exp_name_count = min(t.count for t in name_partials)
- addr_partials = []
- for trange in address:
- addr_partials.extend(self.query.get_partials_list(trange))
+ name_tokens = [t.token for t in name_partials]
+
+ addr_partials = [t for r in address for t in self.query.get_partials_list(r)]
addr_tokens = [t.token for t in addr_partials]
+
partials_indexed = all(t.is_indexed for t in name_partials) \
and all(t.is_indexed for t in addr_partials)
+ exp_count = min(t.count for t in name_partials)
- if (len(name_partials) > 3 or exp_name_count < 1000) and partials_indexed:
- # Lookup by name partials, use address partials to restrict results.
- lookup = [dbf.FieldLookup('name_vector',
- [t.token for t in name_partials], 'lookup_all')]
- if addr_tokens:
- lookup.append(dbf.FieldLookup('nameaddress_vector', addr_tokens, 'restrict'))
- yield penalty, exp_name_count, lookup
+ if (len(name_partials) > 3 or exp_count < 1000) and partials_indexed:
+ yield penalty, exp_count, dbf.lookup_by_names(name_tokens, addr_tokens)
return
- exp_addr_count = min(t.count for t in addr_partials) if addr_partials else exp_name_count
- if exp_addr_count < 1000 and partials_indexed:
+ exp_count = min(exp_count, min(t.count for t in addr_partials)) \
+ if addr_partials else exp_count
+ if exp_count < 1000 and partials_indexed:
# Lookup by address partials and restrict results through name terms.
# Give this a small penalty because lookups in the address index are
# more expensive
- yield penalty + exp_addr_count/5000, exp_addr_count,\
- [dbf.FieldLookup('name_vector', [t.token for t in name_partials], 'restrict'),
- dbf.FieldLookup('nameaddress_vector', addr_tokens, 'lookup_all')]
+ yield penalty + exp_count/5000, exp_count,\
+ dbf.lookup_by_addr(name_tokens, addr_tokens)
return
# Partial term to frequent. Try looking up by rare full names first.
name_fulls = self.query.get_tokens(name, TokenType.WORD)
- rare_names = list(filter(lambda t: t.count < 1000, name_fulls))
+ rare_names = list(filter(lambda t: t.count < 10000, name_fulls))
# At this point drop unindexed partials from the address.
# This might yield wrong results, nothing we can do about that.
if not partials_indexed:
addr_tokens = [t.token for t in addr_partials if t.is_indexed]
- log().var_dump('before', penalty)
penalty += 1.2 * sum(t.penalty for t in addr_partials if not t.is_indexed)
- log().var_dump('after', penalty)
if rare_names:
# Any of the full names applies with all of the partials from the address
- lookup = [dbf.FieldLookup('name_vector', [t.token for t in rare_names], 'lookup_any')]
- if addr_tokens:
- lookup.append(dbf.FieldLookup('nameaddress_vector', addr_tokens, 'restrict'))
- yield penalty, sum(t.count for t in rare_names), lookup
+ yield penalty, sum(t.count for t in rare_names),\
+ dbf.lookup_by_any_name([t.token for t in rare_names], addr_tokens)
# To catch remaining results, lookup by name and address
# We only do this if there is a reasonable number of results expected.
- if min(exp_name_count, exp_addr_count) < 10000:
+ if exp_count < 10000:
if all(t.is_indexed for t in name_partials):
- lookup = [dbf.FieldLookup('name_vector',
- [t.token for t in name_partials], 'lookup_all')]
+ lookup = [dbf.FieldLookup('name_vector', name_tokens, 'lookup_all')]
else:
# we don't have the partials, try with the non-rare names
- non_rare_names = [t.token for t in name_fulls if t.count >= 1000]
+ non_rare_names = [t.token for t in name_fulls if t.count >= 10000]
if not non_rare_names:
return
lookup = [dbf.FieldLookup('name_vector', non_rare_names, 'lookup_any')]
if addr_tokens:
lookup.append(dbf.FieldLookup('nameaddress_vector', addr_tokens, 'lookup_all'))
- yield penalty + 0.1 * max(0, 5 - len(name_partials) - len(addr_tokens)),\
- min(exp_name_count, exp_addr_count), lookup
+ penalty += 0.1 * max(0, 5 - len(name_partials) - len(addr_tokens))
+ if len(rare_names) == len(name_fulls):
+ # if there already was a search for all full tokens,
+ # avoid this if anything has been found
+ penalty += 0.25
+ yield penalty, exp_count, lookup
def get_name_ranking(self, trange: TokenRange) -> dbf.FieldRanking:
self.rankings.append(ranking)
else:
self.penalty += ranking.default
+
+
+def lookup_by_names(name_tokens: List[int], addr_tokens: List[int]) -> List[FieldLookup]:
+ """ Create a lookup list where name tokens are looked up via index
+ and potential address tokens are used to restrict the search further.
+ """
+ lookup = [FieldLookup('name_vector', name_tokens, 'lookup_all')]
+ if addr_tokens:
+ lookup.append(FieldLookup('nameaddress_vector', addr_tokens, 'restrict'))
+
+ return lookup
+
+
+def lookup_by_any_name(name_tokens: List[int], addr_tokens: List[int]) -> List[FieldLookup]:
+ """ Create a lookup list where name tokens are looked up via index
+ and only one of the name tokens must be present.
+ Potential address tokens are used to restrict the search further.
+ """
+ lookup = [FieldLookup('name_vector', name_tokens, 'lookup_any')]
+ if addr_tokens:
+ lookup.append(FieldLookup('nameaddress_vector', addr_tokens, 'restrict'))
+
+ return lookup
+
+
+def lookup_by_addr(name_tokens: List[int], addr_tokens: List[int]) -> List[FieldLookup]:
+ """ Create a lookup list where address tokens are looked up via index
+ and the name tokens are only used to restrict the search further.
+ """
+ return [FieldLookup('name_vector', name_tokens, 'restrict'),
+ FieldLookup('nameaddress_vector', addr_tokens, 'lookup_all')]
"""
Datastructures for a tokenized query.
"""
-from typing import List, Tuple, Optional, NamedTuple, Iterator
+from typing import List, Tuple, Optional, Iterator
from abc import ABC, abstractmethod
import dataclasses
import enum
category objects.
"""
-
-class TokenRange(NamedTuple):
+@dataclasses.dataclass
+class TokenRange:
""" Indexes of query nodes over which a token spans.
"""
start: int
end: int
+ def __lt__(self, other: 'TokenRange') -> bool:
+ return self.end <= other.start
+
+
+ def __le__(self, other: 'TokenRange') -> bool:
+ return NotImplemented
+
+
+ def __gt__(self, other: 'TokenRange') -> bool:
+ return self.start >= other.end
+
+
+ def __ge__(self, other: 'TokenRange') -> bool:
+ return NotImplemented
+
+
def replace_start(self, new_start: int) -> 'TokenRange':
""" Return a new token range with the new start.
"""
return True
+ def _get_assignments_postcode(self, base: TokenAssignment,
+ query_len: int) -> Iterator[TokenAssignment]:
+ """ Yield possible assignments of Postcode searches with an
+ address component.
+ """
+ assert base.postcode is not None
+
+ if (base.postcode.start == 0 and self.direction != -1)\
+ or (base.postcode.end == query_len and self.direction != 1):
+ log().comment('postcode search')
+ # <address>,<postcode> should give preference to address search
+ if base.postcode.start == 0:
+ penalty = self.penalty
+ self.direction = -1 # name searches are only possbile backwards
+ else:
+ penalty = self.penalty + 0.1
+ self.direction = 1 # name searches are only possbile forwards
+ yield dataclasses.replace(base, penalty=penalty)
+
+
+ def _get_assignments_address_forward(self, base: TokenAssignment,
+ query: qmod.QueryStruct) -> Iterator[TokenAssignment]:
+ """ Yield possible assignments of address searches with
+ left-to-right reading.
+ """
+ first = base.address[0]
+
+ log().comment('first word = name')
+ yield dataclasses.replace(base, penalty=self.penalty,
+ name=first, address=base.address[1:])
+
+ # To paraphrase:
+ # * if another name term comes after the first one and before the
+ # housenumber
+ # * a qualifier comes after the name
+ # * the containing phrase is strictly typed
+ if (base.housenumber and first.end < base.housenumber.start)\
+ or (base.qualifier and base.qualifier > first)\
+ or (query.nodes[first.start].ptype != qmod.PhraseType.NONE):
+ return
+
+ penalty = self.penalty
+
+ # Penalty for:
+ # * <name>, <street>, <housenumber> , ...
+ # * queries that are comma-separated
+ if (base.housenumber and base.housenumber > first) or len(query.source) > 1:
+ penalty += 0.25
+
+ for i in range(first.start + 1, first.end):
+ name, addr = first.split(i)
+ log().comment(f'split first word = name ({i - first.start})')
+ yield dataclasses.replace(base, name=name, address=[addr] + base.address[1:],
+ penalty=penalty + PENALTY_TOKENCHANGE[query.nodes[i].btype])
+
+
+ def _get_assignments_address_backward(self, base: TokenAssignment,
+ query: qmod.QueryStruct) -> Iterator[TokenAssignment]:
+ """ Yield possible assignments of address searches with
+ right-to-left reading.
+ """
+ last = base.address[-1]
+
+ if self.direction == -1 or len(base.address) > 1:
+ log().comment('last word = name')
+ yield dataclasses.replace(base, penalty=self.penalty,
+ name=last, address=base.address[:-1])
+
+ # To paraphrase:
+ # * if another name term comes before the last one and after the
+ # housenumber
+ # * a qualifier comes before the name
+ # * the containing phrase is strictly typed
+ if (base.housenumber and last.start > base.housenumber.end)\
+ or (base.qualifier and base.qualifier < last)\
+ or (query.nodes[last.start].ptype != qmod.PhraseType.NONE):
+ return
+
+ penalty = self.penalty
+ if base.housenumber and base.housenumber < last:
+ penalty += 0.4
+ if len(query.source) > 1:
+ penalty += 0.25
+
+ for i in range(last.start + 1, last.end):
+ addr, name = last.split(i)
+ log().comment(f'split last word = name ({i - last.start})')
+ yield dataclasses.replace(base, name=name, address=base.address[:-1] + [addr],
+ penalty=penalty + PENALTY_TOKENCHANGE[query.nodes[i].btype])
+
+
def get_assignments(self, query: qmod.QueryStruct) -> Iterator[TokenAssignment]:
""" Yield possible assignments for the current sequence.
"""
base = TokenAssignment.from_ranges(self.seq)
+ num_addr_tokens = sum(t.end - t.start for t in base.address)
+ if num_addr_tokens > 50:
+ return
+
# Postcode search (postcode-only search is covered in next case)
if base.postcode is not None and base.address:
- if (base.postcode.start == 0 and self.direction != -1)\
- or (base.postcode.end == query.num_token_slots() and self.direction != 1):
- log().comment('postcode search')
- # <address>,<postcode> should give preference to address search
- if base.postcode.start == 0:
- penalty = self.penalty
- else:
- penalty = self.penalty + 0.1
- yield dataclasses.replace(base, penalty=penalty)
+ yield from self._get_assignments_postcode(base, query.num_token_slots())
# Postcode or country-only search
if not base.address:
# <postcode>,<address> should give preference to postcode search
if base.postcode and base.postcode.start == 0:
self.penalty += 0.1
- # Use entire first word as name
+
+ # Right-to-left reading of the address
if self.direction != -1:
- log().comment('first word = name')
- yield dataclasses.replace(base, name=base.address[0],
- penalty=self.penalty,
- address=base.address[1:])
-
- # Use entire last word as name
- if self.direction == -1 or (self.direction == 0 and len(base.address) > 1):
- log().comment('last word = name')
- yield dataclasses.replace(base, name=base.address[-1],
- penalty=self.penalty,
- address=base.address[:-1])
+ yield from self._get_assignments_address_forward(base, query)
+
+ # Left-to-right reading of the address
+ if self.direction != 1:
+ yield from self._get_assignments_address_backward(base, query)
# variant for special housenumber searches
if base.housenumber:
yield dataclasses.replace(base, penalty=self.penalty)
- # Use beginning of first word as name
- if self.direction != -1:
- first = base.address[0]
- if (not base.housenumber or first.end >= base.housenumber.start)\
- and (not base.qualifier or first.start >= base.qualifier.end):
- for i in range(first.start + 1, first.end):
- name, addr = first.split(i)
- penalty = self.penalty + PENALTY_TOKENCHANGE[query.nodes[i].btype]
- log().comment(f'split first word = name ({i - first.start})')
- yield dataclasses.replace(base, name=name, penalty=penalty,
- address=[addr] + base.address[1:])
-
- # Use end of last word as name
- if self.direction != 1:
- last = base.address[-1]
- if (not base.housenumber or last.start <= base.housenumber.end)\
- and (not base.qualifier or last.end <= base.qualifier.start):
- for i in range(last.start + 1, last.end):
- addr, name = last.split(i)
- penalty = self.penalty + PENALTY_TOKENCHANGE[query.nodes[i].btype]
- log().comment(f'split last word = name ({i - last.start})')
- yield dataclasses.replace(base, name=name, penalty=penalty,
- address=base.address[:-1] + [addr])
-
-
def yield_token_assignments(query: qmod.QueryStruct) -> Iterator[TokenAssignment]:
""" Return possible word type assignments to word positions.
"""
Output formatters for API version v1.
"""
-from typing import Mapping, Any
+from typing import List, Dict, Mapping, Any
import collections
+import datetime as dt
import nominatim.api as napi
from nominatim.api.result_formatting import FormatDispatcher
from nominatim.api.v1 import format_json, format_xml
from nominatim.utils.json_writer import JsonWriter
+class RawDataList(List[Dict[str, Any]]):
+ """ Data type for formatting raw data lists 'as is' in json.
+ """
+
dispatch = FormatDispatcher()
@dispatch.format_func(napi.StatusResult, 'text')
options: Mapping[str, Any]) -> str:
return format_json.format_base_json(results, options, False,
class_label='category')
+
+@dispatch.format_func(RawDataList, 'json')
+def _format_raw_data_json(results: RawDataList, _: Mapping[str, Any]) -> str:
+ out = JsonWriter()
+ out.start_array()
+ for res in results:
+ out.start_object()
+ for k, v in res.items():
+ if isinstance(v, dt.datetime):
+ out.keyval(k, v.isoformat(sep= ' ', timespec='seconds'))
+ else:
+ out.keyval(k, v)
+ out.end_object().next()
+
+ out.end_array()
+
+ return out()
import math
from urllib.parse import urlencode
+import sqlalchemy as sa
+
from nominatim.errors import UsageError
from nominatim.config import Configuration
import nominatim.api as napi
import nominatim.api.logging as loglib
from nominatim.api.v1.format import dispatch as formatting
+from nominatim.api.v1.format import RawDataList
from nominatim.api.v1 import helpers
CONTENT_TYPE = {
return params.build_response(output)
+async def deletable_endpoint(api: napi.NominatimAPIAsync, params: ASGIAdaptor) -> Any:
+ """ Server glue for /deletable endpoint.
+ This is a special endpoint that shows polygons that have been
+ deleted or are broken in the OSM data but are kept in the
+ Nominatim database to minimize disruption.
+ """
+ fmt = params.parse_format(RawDataList, 'json')
+
+ async with api.begin() as conn:
+ sql = sa.text(""" SELECT p.place_id, country_code,
+ name->'name' as name, i.*
+ FROM placex p, import_polygon_delete i
+ WHERE p.osm_id = i.osm_id AND p.osm_type = i.osm_type
+ AND p.class = i.class AND p.type = i.type
+ """)
+ results = RawDataList(r._asdict() for r in await conn.execute(sql))
+
+ return params.build_response(formatting.format_result(results, fmt, {}))
+
+
+async def polygons_endpoint(api: napi.NominatimAPIAsync, params: ASGIAdaptor) -> Any:
+ """ Server glue for /polygons endpoint.
+ This is a special endpoint that shows polygons that have changed
+ thier size but are kept in the Nominatim database with their
+ old area to minimize disruption.
+ """
+ fmt = params.parse_format(RawDataList, 'json')
+ sql_params: Dict[str, Any] = {
+ 'days': params.get_int('days', -1),
+ 'cls': params.get('class')
+ }
+ reduced = params.get_bool('reduced', False)
+
+ async with api.begin() as conn:
+ sql = sa.select(sa.text("""osm_type, osm_id, class, type,
+ name->'name' as name,
+ country_code, errormessage, updated"""))\
+ .select_from(sa.text('import_polygon_error'))
+ if sql_params['days'] > 0:
+ sql = sql.where(sa.text("updated > 'now'::timestamp - make_interval(days => :days)"))
+ if reduced:
+ sql = sql.where(sa.text("errormessage like 'Area reduced%'"))
+ if sql_params['cls'] is not None:
+ sql = sql.where(sa.text("class = :cls"))
+
+ sql = sql.order_by(sa.literal_column('updated').desc()).limit(1000)
+
+ results = RawDataList(r._asdict() for r in await conn.execute(sql, sql_params))
+
+ return params.build_response(formatting.format_result(results, fmt, {}))
+
+
EndpointFunc = Callable[[napi.NominatimAPIAsync, ASGIAdaptor], Any]
ROUTES = [
('details', details_endpoint),
('reverse', reverse_endpoint),
('lookup', lookup_endpoint),
- ('search', search_endpoint)
+ ('search', search_endpoint),
+ ('deletable', deletable_endpoint),
+ ('polygons', polygons_endpoint),
]
#
# This file is part of Nominatim. (https://nominatim.org)
#
-# Copyright (C) 2022 by the Nominatim developer community.
+# Copyright (C) 2023 by the Nominatim developer community.
# For a full list of authors see the git log.
"""
Command-line interface to the Nominatim functions for import, update,
database administration and querying.
"""
-from typing import Optional, Any, List, Union
+from typing import Optional, Any
import importlib
import logging
import os
from pathlib import Path
from nominatim.config import Configuration
-from nominatim.tools.exec_utils import run_legacy_script, run_php_server
+from nominatim.tools.exec_utils import run_php_server
from nominatim.errors import UsageError
from nominatim import clicmd
from nominatim import version
self.parser.print_help()
return 1
- args.phpcgi_path = Path(kwargs['phpcgi_path'])
args.project_dir = Path(args.project_dir).resolve()
if 'cli_args' not in kwargs:
#
# No need to document the functions each time.
# pylint: disable=C0111
-class QueryExport:
- """\
- Export addresses as CSV file from the database.
- """
-
- def add_args(self, parser: argparse.ArgumentParser) -> None:
- group = parser.add_argument_group('Output arguments')
- group.add_argument('--output-type', default='street',
- choices=('continent', 'country', 'state', 'county',
- 'city', 'suburb', 'street', 'path'),
- help='Type of places to output (default: street)')
- group.add_argument('--output-format',
- default='street;suburb;city;county;state;country',
- help=("Semicolon-separated list of address types "
- "(see --output-type). Multiple ranks can be "
- "merged into one column by simply using a "
- "comma-separated list."))
- group.add_argument('--output-all-postcodes', action='store_true',
- help=("List all postcodes for address instead of "
- "just the most likely one"))
- group.add_argument('--language',
- help=("Preferred language for output "
- "(use local name, if omitted)"))
- group = parser.add_argument_group('Filter arguments')
- group.add_argument('--restrict-to-country', metavar='COUNTRY_CODE',
- help='Export only objects within country')
- group.add_argument('--restrict-to-osm-node', metavar='ID', type=int,
- help='Export only children of this OSM node')
- group.add_argument('--restrict-to-osm-way', metavar='ID', type=int,
- help='Export only children of this OSM way')
- group.add_argument('--restrict-to-osm-relation', metavar='ID', type=int,
- help='Export only children of this OSM relation')
-
-
- def run(self, args: NominatimArgs) -> int:
- params: List[Union[int, str]] = [
- '--output-type', args.output_type,
- '--output-format', args.output_format]
- if args.output_all_postcodes:
- params.append('--output-all-postcodes')
- if args.language:
- params.extend(('--language', args.language))
- if args.restrict_to_country:
- params.extend(('--restrict-to-country', args.restrict_to_country))
- if args.restrict_to_osm_node:
- params.extend(('--restrict-to-osm-node', args.restrict_to_osm_node))
- if args.restrict_to_osm_way:
- params.extend(('--restrict-to-osm-way', args.restrict_to_osm_way))
- if args.restrict_to_osm_relation:
- params.extend(('--restrict-to-osm-relation', args.restrict_to_osm_relation))
-
- return run_legacy_script('export.php', *params, config=args.config)
-
-
class AdminServe:
"""\
Start a simple web server for serving the API.
parser.add_subcommand('admin', clicmd.AdminFuncs())
- parser.add_subcommand('export', QueryExport())
+ parser.add_subcommand('export', clicmd.QueryExport())
parser.add_subcommand('serve', AdminServe())
parser.add_subcommand('search', clicmd.APISearch())
#
# This file is part of Nominatim. (https://nominatim.org)
#
-# Copyright (C) 2022 by the Nominatim developer community.
+# Copyright (C) 2023 by the Nominatim developer community.
# For a full list of authors see the git log.
"""
Subcommand definitions for the command-line tool.
from nominatim.clicmd.admin import AdminFuncs as AdminFuncs
from nominatim.clicmd.freeze import SetupFreeze as SetupFreeze
from nominatim.clicmd.special_phrases import ImportSpecialPhrases as ImportSpecialPhrases
+from nominatim.clicmd.export import QueryExport as QueryExport
"""
import logging
import argparse
+import random
-from nominatim.tools.exec_utils import run_legacy_script
+from nominatim.db.connection import connect
from nominatim.clicmd.args import NominatimArgs
+import nominatim.api as napi
# Do not repeat documentation of subcommand classes.
# pylint: disable=C0111
return 1
+
def _warm(self, args: NominatimArgs) -> int:
LOG.warning('Warming database caches')
- params = ['warm.php']
- if args.target == 'reverse':
- params.append('--reverse-only')
- if args.target == 'search':
- params.append('--search-only')
- return run_legacy_script(*params, config=args.config)
+
+ api = napi.NominatimAPI(args.project_dir)
+
+ try:
+ if args.target != 'reverse':
+ for _ in range(1000):
+ api.reverse((random.uniform(-90, 90), random.uniform(-180, 180)),
+ address_details=True)
+
+ if args.target != 'search':
+ from ..tokenizer import factory as tokenizer_factory
+
+ tokenizer = tokenizer_factory.get_tokenizer_for_db(args.config)
+ with connect(args.config.get_libpq_dsn()) as conn:
+ words = tokenizer.most_frequent_words(conn, 1000)
+
+ for word in words:
+ api.search(word)
+ finally:
+ api.close()
+
+ return 0
"""
Subcommand definitions for API calls from the command line.
"""
-from typing import Mapping, Dict, Any
+from typing import Dict, Any
import argparse
import logging
import json
import sys
-from nominatim.tools.exec_utils import run_api_script
-from nominatim.errors import UsageError
from nominatim.clicmd.args import NominatimArgs
import nominatim.api as napi
import nominatim.api.v1 as api_output
"Parameter is difference tolerance in degrees."))
-def _run_api(endpoint: str, args: NominatimArgs, params: Mapping[str, object]) -> int:
- script_file = args.project_dir / 'website' / (endpoint + '.php')
-
- if not script_file.exists():
- LOG.error("Cannot find API script file.\n\n"
- "Make sure to run 'nominatim' from the project directory \n"
- "or use the option --project-dir.")
- raise UsageError("API script not found.")
-
- return run_api_script(endpoint, args.project_dir,
- phpcgi_bin=args.phpcgi_path, params=params)
-
class APISearch:
"""\
Execute a search query.
# Basic environment set by root program.
config: Configuration
project_dir: Path
- phpcgi_path: Path
# Global switches
version: bool
output_all_postcodes: bool
language: Optional[str]
restrict_to_country: Optional[str]
- restrict_to_osm_node: Optional[int]
- restrict_to_osm_way: Optional[int]
- restrict_to_osm_relation: Optional[int]
# Arguments to 'refresh'
postcodes: bool
--- /dev/null
+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+# This file is part of Nominatim. (https://nominatim.org)
+#
+# Copyright (C) 2023 by the Nominatim developer community.
+# For a full list of authors see the git log.
+"""
+Implementation of the 'export' subcommand.
+"""
+from typing import Optional, List, cast
+import logging
+import argparse
+import asyncio
+import csv
+import sys
+
+import sqlalchemy as sa
+
+from nominatim.clicmd.args import NominatimArgs
+import nominatim.api as napi
+from nominatim.api.results import create_from_placex_row, ReverseResult, add_result_details
+from nominatim.api.types import LookupDetails
+from nominatim.errors import UsageError
+
+# Do not repeat documentation of subcommand classes.
+# pylint: disable=C0111
+# Using non-top-level imports to avoid eventually unused imports.
+# pylint: disable=E0012,C0415
+# Needed for SQLAlchemy
+# pylint: disable=singleton-comparison
+
+LOG = logging.getLogger()
+
+RANK_RANGE_MAP = {
+ 'country': (4, 4),
+ 'state': (5, 9),
+ 'county': (10, 12),
+ 'city': (13, 16),
+ 'suburb': (17, 21),
+ 'street': (26, 26),
+ 'path': (27, 27)
+}
+
+RANK_TO_OUTPUT_MAP = {
+ 4: 'country',
+ 5: 'state', 6: 'state', 7: 'state', 8: 'state', 9: 'state',
+ 10: 'county', 11: 'county', 12: 'county',
+ 13: 'city', 14: 'city', 15: 'city', 16: 'city',
+ 17: 'suburb', 18: 'suburb', 19: 'suburb', 20: 'suburb', 21: 'suburb',
+ 26: 'street', 27: 'path'}
+
+class QueryExport:
+ """\
+ Export places as CSV file from the database.
+
+
+ """
+
+ def add_args(self, parser: argparse.ArgumentParser) -> None:
+ group = parser.add_argument_group('Output arguments')
+ group.add_argument('--output-type', default='street',
+ choices=('country', 'state', 'county',
+ 'city', 'suburb', 'street', 'path'),
+ help='Type of places to output (default: street)')
+ group.add_argument('--output-format',
+ default='street;suburb;city;county;state;country',
+ help=("Semicolon-separated list of address types "
+ "(see --output-type). Additionally accepts:"
+ "placeid,postcode"))
+ group.add_argument('--language',
+ help=("Preferred language for output "
+ "(use local name, if omitted)"))
+ group = parser.add_argument_group('Filter arguments')
+ group.add_argument('--restrict-to-country', metavar='COUNTRY_CODE',
+ help='Export only objects within country')
+ group.add_argument('--restrict-to-osm-node', metavar='ID', type=int,
+ dest='node',
+ help='Export only children of this OSM node')
+ group.add_argument('--restrict-to-osm-way', metavar='ID', type=int,
+ dest='way',
+ help='Export only children of this OSM way')
+ group.add_argument('--restrict-to-osm-relation', metavar='ID', type=int,
+ dest='relation',
+ help='Export only children of this OSM relation')
+
+
+ def run(self, args: NominatimArgs) -> int:
+ return asyncio.run(export(args))
+
+
+async def export(args: NominatimArgs) -> int:
+ """ The actual export as a asynchronous function.
+ """
+
+ api = napi.NominatimAPIAsync(args.project_dir)
+
+ try:
+ output_range = RANK_RANGE_MAP[args.output_type]
+
+ writer = init_csv_writer(args.output_format)
+
+ async with api.begin() as conn, api.begin() as detail_conn:
+ t = conn.t.placex
+
+ sql = sa.select(t.c.place_id, t.c.osm_type, t.c.osm_id, t.c.name,
+ t.c.class_, t.c.type, t.c.admin_level,
+ t.c.address, t.c.extratags,
+ t.c.housenumber, t.c.postcode, t.c.country_code,
+ t.c.importance, t.c.wikipedia, t.c.indexed_date,
+ t.c.rank_address, t.c.rank_search,
+ t.c.centroid)\
+ .where(t.c.linked_place_id == None)\
+ .where(t.c.rank_address.between(*output_range))
+
+ parent_place_id = await get_parent_id(conn, args.node, args.way, args.relation)
+ if parent_place_id:
+ taddr = conn.t.addressline
+
+ sql = sql.join(taddr, taddr.c.place_id == t.c.place_id)\
+ .where(taddr.c.address_place_id == parent_place_id)\
+ .where(taddr.c.isaddress)
+
+ if args.restrict_to_country:
+ sql = sql.where(t.c.country_code == args.restrict_to_country.lower())
+
+ results = []
+ for row in await conn.execute(sql):
+ result = create_from_placex_row(row, ReverseResult)
+ if result is not None:
+ results.append(result)
+
+ if len(results) == 1000:
+ await dump_results(detail_conn, results, writer, args.language)
+ results = []
+
+ if results:
+ await dump_results(detail_conn, results, writer, args.language)
+ finally:
+ await api.close()
+
+ return 0
+
+
+def init_csv_writer(output_format: str) -> 'csv.DictWriter[str]':
+ fields = output_format.split(';')
+ writer = csv.DictWriter(sys.stdout, fieldnames=fields, extrasaction='ignore')
+ writer.writeheader()
+
+ return writer
+
+
+async def dump_results(conn: napi.SearchConnection,
+ results: List[ReverseResult],
+ writer: 'csv.DictWriter[str]',
+ lang: Optional[str]) -> None:
+ await add_result_details(conn, results,
+ LookupDetails(address_details=True))
+
+
+ locale = napi.Locales([lang] if lang else None)
+
+ for result in results:
+ data = {'placeid': result.place_id,
+ 'postcode': result.postcode}
+
+ result.localize(locale)
+ for line in (result.address_rows or []):
+ if line.isaddress and line.local_name:
+ if line.category[1] == 'postcode':
+ data['postcode'] = line.local_name
+ elif line.rank_address in RANK_TO_OUTPUT_MAP:
+ data[RANK_TO_OUTPUT_MAP[line.rank_address]] = line.local_name
+
+ writer.writerow(data)
+
+
+async def get_parent_id(conn: napi.SearchConnection, node_id: Optional[int],
+ way_id: Optional[int],
+ relation_id: Optional[int]) -> Optional[int]:
+ """ Get the place ID for the given OSM object.
+ """
+ if node_id is not None:
+ osm_type, osm_id = 'N', node_id
+ elif way_id is not None:
+ osm_type, osm_id = 'W', way_id
+ elif relation_id is not None:
+ osm_type, osm_id = 'R', relation_id
+ else:
+ return None
+
+ t = conn.t.placex
+ sql = sa.select(t.c.place_id).limit(1)\
+ .where(t.c.osm_type == osm_type)\
+ .where(t.c.osm_id == osm_id)\
+ .where(t.c.rank_address > 0)\
+ .order_by(t.c.rank_address)
+
+ for result in await conn.execute(sql):
+ return cast(int, result[0])
+
+ raise UsageError(f'Cannot find a place {osm_type}{osm_id}.')
from pathlib import Path
from nominatim.config import Configuration
+from nominatim.db.connection import Connection
from nominatim.data.place_info import PlaceInfo
from nominatim.typing import Protocol
"""
+ @abstractmethod
+ def most_frequent_words(self, conn: Connection, num: int) -> List[str]:
+ """ Return a list of the `num` most frequent full words
+ in the database.
+ """
+
+
class TokenizerModule(Protocol):
""" Interface that must be exported by modules that implement their
own tokenizer.
self.loader.make_token_analysis())
+ def most_frequent_words(self, conn: Connection, num: int) -> List[str]:
+ """ Return a list of the `num` most frequent full words
+ in the database.
+ """
+ with conn.cursor() as cur:
+ cur.execute("""SELECT word, sum((info->>'count')::int) as count
+ FROM word WHERE type = 'W'
+ GROUP BY word
+ ORDER BY count DESC LIMIT %s""", (num,))
+ return list(s[0].split('@')[0] for s in cur)
+
+
def _install_php(self, phpdir: Path, overwrite: bool = True) -> None:
""" Install the php script for the tokenizer.
"""
return LegacyNameAnalyzer(self.dsn, normalizer)
+ def most_frequent_words(self, conn: Connection, num: int) -> List[str]:
+ """ Return a list of the `num` most frequent full words
+ in the database.
+ """
+ with conn.cursor() as cur:
+ cur.execute(""" SELECT word FROM word WHERE word is not null
+ ORDER BY search_name_count DESC LIMIT %s""", (num,))
+ return list(s[0] for s in cur)
+
+
def _install_php(self, config: Configuration, overwrite: bool = True) -> None:
""" Install the php script for the tokenizer.
"""
"""
Helper functions for executing external programs.
"""
-from typing import Any, Union, Optional, Mapping, IO
-from pathlib import Path
+from typing import Any, Mapping, IO
import logging
import os
import subprocess
import urllib.request as urlrequest
-from urllib.parse import urlencode
-from nominatim.config import Configuration
from nominatim.typing import StrPath
from nominatim.version import NOMINATIM_VERSION
from nominatim.db.connection import get_pg_env
LOG = logging.getLogger()
-def run_legacy_script(script: StrPath, *args: Union[int, str],
- config: Configuration,
- throw_on_fail: bool = False) -> int:
- """ Run a Nominatim PHP script with the given arguments.
-
- Returns the exit code of the script. If `throw_on_fail` is True
- then throw a `CalledProcessError` on a non-zero exit.
- """
- cmd = ['/usr/bin/env', 'php', '-Cq',
- str(config.lib_dir.php / 'admin' / script)]
- cmd.extend([str(a) for a in args])
-
- env = config.get_os_env()
- env['NOMINATIM_DATADIR'] = str(config.lib_dir.data)
- env['NOMINATIM_SQLDIR'] = str(config.lib_dir.sql)
- env['NOMINATIM_CONFIGDIR'] = str(config.config_dir)
- env['NOMINATIM_DATABASE_MODULE_SRC_PATH'] = str(config.lib_dir.module)
- if not env['NOMINATIM_OSM2PGSQL_BINARY']:
- env['NOMINATIM_OSM2PGSQL_BINARY'] = str(config.lib_dir.osm2pgsql)
-
- proc = subprocess.run(cmd, cwd=str(config.project_dir), env=env,
- check=throw_on_fail)
-
- return proc.returncode
-
-def run_api_script(endpoint: str, project_dir: Path,
- extra_env: Optional[Mapping[str, str]] = None,
- phpcgi_bin: Optional[Path] = None,
- params: Optional[Mapping[str, Any]] = None) -> int:
- """ Execute a Nominatim API function.
-
- The function needs a project directory that contains the website
- directory with the scripts to be executed. The scripts will be run
- using php_cgi. Query parameters can be added as named arguments.
-
- Returns the exit code of the script.
- """
- log = logging.getLogger()
- webdir = str(project_dir / 'website')
- query_string = urlencode(params or {})
-
- env = dict(QUERY_STRING=query_string,
- SCRIPT_NAME=f'/{endpoint}.php',
- REQUEST_URI=f'/{endpoint}.php?{query_string}',
- CONTEXT_DOCUMENT_ROOT=webdir,
- SCRIPT_FILENAME=f'{webdir}/{endpoint}.php',
- HTTP_HOST='localhost',
- HTTP_USER_AGENT='nominatim-tool',
- REMOTE_ADDR='0.0.0.0',
- DOCUMENT_ROOT=webdir,
- REQUEST_METHOD='GET',
- SERVER_PROTOCOL='HTTP/1.1',
- GATEWAY_INTERFACE='CGI/1.1',
- REDIRECT_STATUS='CGI')
-
- if extra_env:
- env.update(extra_env)
-
- if phpcgi_bin is None:
- cmd = ['/usr/bin/env', 'php-cgi']
- else:
- cmd = [str(phpcgi_bin)]
-
- proc = subprocess.run(cmd, cwd=str(project_dir), env=env,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- check=False)
-
- if proc.returncode != 0 or proc.stderr:
- if proc.stderr:
- log.error(proc.stderr.decode('utf-8').replace('\\n', '\n'))
- else:
- log.error(proc.stdout.decode('utf-8').replace('\\n', '\n'))
- return proc.returncode or 1
-
- result = proc.stdout.decode('utf-8')
- content_start = result.find('\r\n\r\n')
-
- print(result[content_start + 4:].replace('\\n', '\n'))
-
- return 0
-
-
def run_php_server(server_address: str, base_dir: StrPath) -> None:
""" Run the built-in server from the given directory.
"""
cli.nominatim(module_dir='',
osm2pgsql_path=str(self.build_dir / 'osm2pgsql' / 'osm2pgsql'),
cli_args=cmdline,
- phpcgi_path='',
environ=self.test_env)
--- /dev/null
+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+# This file is part of Nominatim. (https://nominatim.org)
+#
+# Copyright (C) 2023 by the Nominatim developer community.
+# For a full list of authors see the git log.
+"""
+Provides dummy implementations of ASGIAdaptor for testing.
+"""
+from collections import namedtuple
+
+import nominatim.api.v1.server_glue as glue
+from nominatim.config import Configuration
+
+class FakeError(BaseException):
+
+ def __init__(self, msg, status):
+ self.msg = msg
+ self.status = status
+
+ def __str__(self):
+ return f'{self.status} -- {self.msg}'
+
+FakeResponse = namedtuple('FakeResponse', ['status', 'output', 'content_type'])
+
+class FakeAdaptor(glue.ASGIAdaptor):
+
+ def __init__(self, params=None, headers=None, config=None):
+ self.params = params or {}
+ self.headers = headers or {}
+ self._config = config or Configuration(None)
+
+
+ def get(self, name, default=None):
+ return self.params.get(name, default)
+
+
+ def get_header(self, name, default=None):
+ return self.headers.get(name, default)
+
+
+ def error(self, msg, status=400):
+ return FakeError(msg, status)
+
+
+ def create_response(self, status, output):
+ return FakeResponse(status, output, self.content_type)
+
+
+ def config(self):
+ return self._config
+
--- /dev/null
+
+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+# This file is part of Nominatim. (https://nominatim.org)
+#
+# Copyright (C) 2023 by the Nominatim developer community.
+# For a full list of authors see the git log.
+"""
+Test data types for search queries.
+"""
+import pytest
+
+import nominatim.api.search.query as nq
+
+def test_token_range_equal():
+ assert nq.TokenRange(2, 3) == nq.TokenRange(2, 3)
+ assert not (nq.TokenRange(2, 3) != nq.TokenRange(2, 3))
+
+
+@pytest.mark.parametrize('lop,rop', [((1, 2), (3, 4)),
+ ((3, 4), (3, 5)),
+ ((10, 12), (11, 12))])
+def test_token_range_unequal(lop, rop):
+ assert not (nq.TokenRange(*lop) == nq.TokenRange(*rop))
+ assert nq.TokenRange(*lop) != nq.TokenRange(*rop)
+
+
+def test_token_range_lt():
+ assert nq.TokenRange(1, 3) < nq.TokenRange(10, 12)
+ assert nq.TokenRange(5, 6) < nq.TokenRange(7, 8)
+ assert nq.TokenRange(1, 4) < nq.TokenRange(4, 5)
+ assert not(nq.TokenRange(5, 6) < nq.TokenRange(5, 6))
+ assert not(nq.TokenRange(10, 11) < nq.TokenRange(4, 5))
+
+
+def test_token_rankge_gt():
+ assert nq.TokenRange(3, 4) > nq.TokenRange(1, 2)
+ assert nq.TokenRange(100, 200) > nq.TokenRange(10, 11)
+ assert nq.TokenRange(10, 11) > nq.TokenRange(4, 10)
+ assert not(nq.TokenRange(5, 6) > nq.TokenRange(5, 6))
+ assert not(nq.TokenRange(1, 2) > nq.TokenRange(3, 4))
+ assert not(nq.TokenRange(4, 10) > nq.TokenRange(3, 5))
+
+
+def test_token_range_unimplemented_ops():
+ with pytest.raises(TypeError):
+ nq.TokenRange(1, 3) <= nq.TokenRange(10, 12)
+ with pytest.raises(TypeError):
+ nq.TokenRange(1, 3) >= nq.TokenRange(10, 12)
--- /dev/null
+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+# This file is part of Nominatim. (https://nominatim.org)
+#
+# Copyright (C) 2023 by the Nominatim developer community.
+# For a full list of authors see the git log.
+"""
+Tests for the deletable v1 API call.
+"""
+import json
+from pathlib import Path
+
+import pytest
+import pytest_asyncio
+
+import psycopg2.extras
+
+from fake_adaptor import FakeAdaptor, FakeError, FakeResponse
+
+import nominatim.api.v1.server_glue as glue
+import nominatim.api as napi
+
+@pytest_asyncio.fixture
+async def api():
+ api = napi.NominatimAPIAsync(Path('/invalid'))
+ yield api
+ await api.close()
+
+
+class TestDeletableEndPoint:
+
+ @pytest.fixture(autouse=True)
+ def setup_deletable_table(self, temp_db_cursor, table_factory, temp_db_with_extensions):
+ psycopg2.extras.register_hstore(temp_db_cursor)
+ table_factory('import_polygon_delete',
+ definition='osm_id bigint, osm_type char(1), class text, type text',
+ content=[(345, 'N', 'boundary', 'administrative'),
+ (781, 'R', 'landuse', 'wood'),
+ (781, 'R', 'landcover', 'grass')])
+ table_factory('placex',
+ definition="""place_id bigint, osm_id bigint, osm_type char(1),
+ class text, type text, name HSTORE, country_code char(2)""",
+ content=[(1, 345, 'N', 'boundary', 'administrative', {'old_name': 'Former'}, 'ab'),
+ (2, 781, 'R', 'landuse', 'wood', {'name': 'Wood'}, 'cd'),
+ (3, 781, 'R', 'landcover', 'grass', None, 'cd')])
+
+
+
+ @pytest.mark.asyncio
+ async def test_deletable(self, api):
+ a = FakeAdaptor()
+
+ resp = await glue.deletable_endpoint(api, a)
+ results = json.loads(resp.output)
+
+ results.sort(key=lambda r: r['place_id'])
+
+ assert results == [{'place_id': 1, 'country_code': 'ab', 'name': None,
+ 'osm_id': 345, 'osm_type': 'N',
+ 'class': 'boundary', 'type': 'administrative'},
+ {'place_id': 2, 'country_code': 'cd', 'name': 'Wood',
+ 'osm_id': 781, 'osm_type': 'R',
+ 'class': 'landuse', 'type': 'wood'},
+ {'place_id': 3, 'country_code': 'cd', 'name': None,
+ 'osm_id': 781, 'osm_type': 'R',
+ 'class': 'landcover', 'type': 'grass'}]
+
--- /dev/null
+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+# This file is part of Nominatim. (https://nominatim.org)
+#
+# Copyright (C) 2023 by the Nominatim developer community.
+# For a full list of authors see the git log.
+"""
+Tests for the deletable v1 API call.
+"""
+import json
+import datetime as dt
+from pathlib import Path
+
+import pytest
+import pytest_asyncio
+
+import psycopg2.extras
+
+from fake_adaptor import FakeAdaptor, FakeError, FakeResponse
+
+import nominatim.api.v1.server_glue as glue
+import nominatim.api as napi
+
+@pytest_asyncio.fixture
+async def api():
+ api = napi.NominatimAPIAsync(Path('/invalid'))
+ yield api
+ await api.close()
+
+
+class TestPolygonsEndPoint:
+
+ @pytest.fixture(autouse=True)
+ def setup_deletable_table(self, temp_db_cursor, table_factory, temp_db_with_extensions):
+ psycopg2.extras.register_hstore(temp_db_cursor)
+
+ self.now = dt.datetime.now()
+ self.recent = dt.datetime.now() - dt.timedelta(days=3)
+
+ table_factory('import_polygon_error',
+ definition="""osm_id bigint,
+ osm_type character(1),
+ class text,
+ type text,
+ name hstore,
+ country_code character varying(2),
+ updated timestamp without time zone,
+ errormessage text,
+ prevgeometry geometry(Geometry,4326),
+ newgeometry geometry(Geometry,4326)""",
+ content=[(345, 'N', 'boundary', 'administrative',
+ {'name': 'Foo'}, 'xx', self.recent,
+ 'some text', None, None),
+ (781, 'R', 'landuse', 'wood',
+ None, 'ds', self.now,
+ 'Area reduced by lots', None, None)])
+
+
+ @pytest.mark.asyncio
+ async def test_polygons_simple(self, api):
+ a = FakeAdaptor()
+
+ resp = await glue.polygons_endpoint(api, a)
+ results = json.loads(resp.output)
+
+ results.sort(key=lambda r: (r['osm_type'], r['osm_id']))
+
+ assert results == [{'osm_type': 'N', 'osm_id': 345,
+ 'class': 'boundary', 'type': 'administrative',
+ 'name': 'Foo', 'country_code': 'xx',
+ 'errormessage': 'some text',
+ 'updated': self.recent.isoformat(sep=' ', timespec='seconds')},
+ {'osm_type': 'R', 'osm_id': 781,
+ 'class': 'landuse', 'type': 'wood',
+ 'name': None, 'country_code': 'ds',
+ 'errormessage': 'Area reduced by lots',
+ 'updated': self.now.isoformat(sep=' ', timespec='seconds')}]
+
+
+ @pytest.mark.asyncio
+ async def test_polygons_days(self, api):
+ a = FakeAdaptor()
+ a.params['days'] = '2'
+
+ resp = await glue.polygons_endpoint(api, a)
+ results = json.loads(resp.output)
+
+ assert [r['osm_id'] for r in results] == [781]
+
+
+ @pytest.mark.asyncio
+ async def test_polygons_class(self, api):
+ a = FakeAdaptor()
+ a.params['class'] = 'landuse'
+
+ resp = await glue.polygons_endpoint(api, a)
+ results = json.loads(resp.output)
+
+ assert [r['osm_id'] for r in results] == [781]
+
+
+
+ @pytest.mark.asyncio
+ async def test_polygons_reduced(self, api):
+ a = FakeAdaptor()
+ a.params['reduced'] = '1'
+
+ resp = await glue.polygons_endpoint(api, a)
+ results = json.loads(resp.output)
+
+ assert [r['osm_id'] for r in results] == [781]
--- /dev/null
+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+# This file is part of Nominatim. (https://nominatim.org)
+#
+# Copyright (C) 2023 by the Nominatim developer community.
+# For a full list of authors see the git log.
+"""
+Tests for export CLI function.
+"""
+import pytest
+
+import nominatim.cli
+
+@pytest.fixture
+def run_export(tmp_path, capsys):
+ def _exec(args):
+ assert 0 == nominatim.cli.nominatim(module_dir='MODULE NOT AVAILABLE',
+ osm2pgsql_path='OSM2PGSQL NOT AVAILABLE',
+ cli_args=['export', '--project-dir', str(tmp_path)]
+ + args)
+ return capsys.readouterr().out.split('\r\n')
+
+ return _exec
+
+
+@pytest.fixture(autouse=True)
+def setup_database_with_context(apiobj):
+ apiobj.add_placex(place_id=332, osm_type='W', osm_id=4,
+ class_='highway', type='residential', name='Street',
+ country_code='pl', postcode='55674',
+ rank_search=27, rank_address=26)
+ apiobj.add_address_placex(332, fromarea=False, isaddress=False,
+ distance=0.0034,
+ place_id=1000, osm_type='N', osm_id=3333,
+ class_='place', type='suburb', name='Smallplace',
+ country_code='pl', admin_level=13,
+ rank_search=24, rank_address=23)
+ apiobj.add_address_placex(332, fromarea=True, isaddress=True,
+ place_id=1001, osm_type='N', osm_id=3334,
+ class_='place', type='city', name='Bigplace',
+ country_code='pl',
+ rank_search=17, rank_address=16)
+
+
+def test_export_default(run_export):
+ csv = run_export([])
+
+ assert csv == ['street,suburb,city,county,state,country', 'Street,,Bigplace,,,', '']
+
+
+def test_export_output_type(run_export):
+ csv = run_export(['--output-type', 'city'])
+
+ assert csv == ['street,suburb,city,county,state,country', ',,Bigplace,,,', '']
+
+
+def test_export_output_format(run_export):
+ csv = run_export(['--output-format', 'placeid;street;nothing;postcode'])
+
+ assert csv == ['placeid,street,nothing,postcode', '332,Street,,55674', '']
+
+
+def test_export_restrict_to_node_good(run_export):
+ csv = run_export(['--restrict-to-osm-node', '3334'])
+
+ assert csv == ['street,suburb,city,county,state,country', 'Street,,Bigplace,,,', '']
+
+
+def test_export_restrict_to_node_not_address(run_export):
+ csv = run_export(['--restrict-to-osm-node', '3333'])
+
+ assert csv == ['street,suburb,city,county,state,country', '']
"""
Tests for the Python web frameworks adaptor, v1 API.
"""
-from collections import namedtuple
import json
import xml.etree.ElementTree as ET
from pathlib import Path
import pytest
-from nominatim.config import Configuration
+from fake_adaptor import FakeAdaptor, FakeError, FakeResponse
+
import nominatim.api.v1.server_glue as glue
import nominatim.api as napi
import nominatim.api.logging as loglib
-class FakeError(BaseException):
-
- def __init__(self, msg, status):
- self.msg = msg
- self.status = status
-
- def __str__(self):
- return f'{self.status} -- {self.msg}'
-
-FakeResponse = namedtuple('FakeResponse', ['status', 'output', 'content_type'])
-
-class FakeAdaptor(glue.ASGIAdaptor):
-
- def __init__(self, params=None, headers=None, config=None):
- self.params = params or {}
- self.headers = headers or {}
- self._config = config or Configuration(None)
-
-
- def get(self, name, default=None):
- return self.params.get(name, default)
-
-
- def get_header(self, name, default=None):
- return self.headers.get(name, default)
-
-
- def error(self, msg, status=400):
- return FakeError(msg, status)
-
-
- def create_response(self, status, output):
- return FakeResponse(status, output, self.content_type)
-
-
- def config(self):
- return self._config
-
# ASGIAdaptor.get_int/bool()
--- /dev/null
+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+# This file is part of Nominatim. (https://nominatim.org)
+#
+# Copyright (C) 2023 by the Nominatim developer community.
+# For a full list of authors see the git log.
+"""
+Tests for warm-up CLI function.
+"""
+import pytest
+
+import nominatim.cli
+
+@pytest.fixture(autouse=True)
+def setup_database_with_context(apiobj, table_factory):
+ table_factory('word',
+ definition='word_id INT, word_token TEXT, type TEXT, word TEXT, info JSONB',
+ content=[(55, 'test', 'W', 'test', None),
+ (2, 'test', 'w', 'test', None)])
+
+ apiobj.add_data('properties',
+ [{'property': 'tokenizer', 'value': 'icu'},
+ {'property': 'tokenizer_import_normalisation', 'value': ':: lower();'},
+ {'property': 'tokenizer_import_transliteration', 'value': "'1' > '/1/'; 'ä' > 'ä '"},
+ ])
+
+
+@pytest.mark.parametrize('args', [['--search-only'], ['--reverse-only']])
+def test_warm_all(tmp_path, args):
+ assert 0 == nominatim.cli.nominatim(module_dir='MODULE NOT AVAILABLE',
+ osm2pgsql_path='OSM2PGSQL NOT AVAILABLE',
+ cli_args=['admin', '--project-dir', str(tmp_path),
+ '--warm'] + args)
@pytest.fixture
-def cli_call(src_dir):
+def cli_call():
""" Call the nominatim main function with the correct paths set.
Returns a function that can be called with the desired CLI arguments.
"""
def _call_nominatim(*args):
return nominatim.cli.nominatim(module_dir='MODULE NOT AVAILABLE',
osm2pgsql_path='OSM2PGSQL NOT AVAILABLE',
- phpcgi_path='/usr/bin/php-cgi',
cli_args=args)
return _call_nominatim
-@pytest.fixture
-def mock_run_legacy(monkeypatch):
- mock = MockParamCapture()
- monkeypatch.setattr(nominatim.cli, 'run_legacy_script', mock)
- return mock
-
-
@pytest.fixture
def mock_func_factory(monkeypatch):
def get_mock(module, func):
assert func.last_kwargs['host'] == '127.0.0.1'
assert func.last_kwargs['port'] == 8088
-def test_cli_export_command(cli_call, mock_run_legacy):
- assert cli_call('export', '--output-all-postcodes') == 0
-
- assert mock_run_legacy.called == 1
- assert mock_run_legacy.last_args[0] == 'export.php'
-
-
-@pytest.mark.parametrize("param,value", [('output-type', 'country'),
- ('output-format', 'street;city'),
- ('language', 'xf'),
- ('restrict-to-country', 'us'),
- ('restrict-to-osm-node', '536'),
- ('restrict-to-osm-way', '727'),
- ('restrict-to-osm-relation', '197532')
- ])
-def test_export_parameters(src_dir, tmp_path, param, value, monkeypatch):
- (tmp_path / 'admin').mkdir()
- (tmp_path / 'admin' / 'export.php').write_text(f"""<?php
- exit(strpos(implode(' ', $_SERVER['argv']), '--{param} {value}') >= 0 ? 0 : 10);
- """)
-
- monkeypatch.setattr(nominatim.paths, 'PHPLIB_DIR', tmp_path)
-
- assert nominatim.cli.nominatim(module_dir='MODULE NOT AVAILABLE',
- osm2pgsql_path='OSM2PGSQL NOT AVAILABLE',
- phpcgi_path='/usr/bin/php-cgi',
- cli_args=['export', '--' + param, value]) == 0
-
-
class TestCliWithDb:
import nominatim.clicmd.admin
-@pytest.mark.parametrize("params", [('--warm', ),
- ('--warm', '--reverse-only'),
- ('--warm', '--search-only')])
-def test_admin_command_legacy(cli_call, mock_func_factory, params):
- mock_run_legacy = mock_func_factory(nominatim.clicmd.admin, 'run_legacy_script')
-
- assert cli_call('admin', *params) == 0
-
- assert mock_run_legacy.called == 1
-
-
def test_admin_command_check_database(cli_call, mock_func_factory):
mock = mock_func_factory(nominatim.tools.check_database, 'check_database')
import nominatim.tools.exec_utils as exec_utils
import nominatim.paths
-class TestRunLegacyScript:
-
- @pytest.fixture(autouse=True)
- def setup_nominatim_env(self, tmp_path, monkeypatch):
- tmp_phplib_dir = tmp_path / 'phplib'
- tmp_phplib_dir.mkdir()
- (tmp_phplib_dir / 'admin').mkdir()
-
- monkeypatch.setattr(nominatim.paths, 'PHPLIB_DIR', tmp_phplib_dir)
-
- self.phplib_dir = tmp_phplib_dir
- self.config = Configuration(tmp_path)
- self.config.set_libdirs(module='.', osm2pgsql='default_osm2pgsql',
- php=tmp_phplib_dir)
-
-
- def mk_script(self, code):
- codefile = self.phplib_dir / 'admin' / 't.php'
- codefile.write_text('<?php\n' + code + '\n')
-
- return 't.php'
-
-
- @pytest.mark.parametrize("return_code", (0, 1, 15, 255))
- def test_run_legacy_return_exit_code(self, return_code):
- fname = self.mk_script('exit({});'.format(return_code))
- assert return_code == \
- exec_utils.run_legacy_script(fname, config=self.config)
-
-
- def test_run_legacy_return_throw_on_fail(self):
- fname = self.mk_script('exit(11);')
- with pytest.raises(subprocess.CalledProcessError):
- exec_utils.run_legacy_script(fname, config=self.config,
- throw_on_fail=True)
-
-
- def test_run_legacy_return_dont_throw_on_success(self):
- fname = self.mk_script('exit(0);')
- assert exec_utils.run_legacy_script(fname, config=self.config,
- throw_on_fail=True) == 0
-
- def test_run_legacy_use_given_module_path(self):
- fname = self.mk_script("exit($_SERVER['NOMINATIM_DATABASE_MODULE_PATH'] == '' ? 0 : 23);")
-
- assert exec_utils.run_legacy_script(fname, config=self.config) == 0
-
-
- def test_run_legacy_do_not_overwrite_module_path(self, monkeypatch):
- monkeypatch.setenv('NOMINATIM_DATABASE_MODULE_PATH', 'other')
- fname = self.mk_script(
- "exit($_SERVER['NOMINATIM_DATABASE_MODULE_PATH'] == 'other' ? 0 : 1);")
-
- assert exec_utils.run_legacy_script(fname, config=self.config) == 0
-
-
- def test_run_legacy_default_osm2pgsql_binary(self, monkeypatch):
- fname = self.mk_script("exit($_SERVER['NOMINATIM_OSM2PGSQL_BINARY'] == 'default_osm2pgsql' ? 0 : 23);")
-
- assert exec_utils.run_legacy_script(fname, config=self.config) == 0
-
-
- def test_run_legacy_override_osm2pgsql_binary(self, monkeypatch):
- monkeypatch.setenv('NOMINATIM_OSM2PGSQL_BINARY', 'somethingelse')
-
- fname = self.mk_script("exit($_SERVER['NOMINATIM_OSM2PGSQL_BINARY'] == 'somethingelse' ? 0 : 23);")
-
- assert exec_utils.run_legacy_script(fname, config=self.config) == 0
-
-
-class TestRunApiScript:
-
- @staticmethod
- @pytest.fixture(autouse=True)
- def setup_project_dir(tmp_path):
- webdir = tmp_path / 'website'
- webdir.mkdir()
- (webdir / 'test.php').write_text("<?php\necho 'OK\n';")
-
-
- @staticmethod
- def test_run_api(tmp_path):
- assert exec_utils.run_api_script('test', tmp_path) == 0
-
- @staticmethod
- def test_run_api_execution_error(tmp_path):
- assert exec_utils.run_api_script('badname', tmp_path) != 0
-
- @staticmethod
- def test_run_api_with_extra_env(tmp_path):
- extra_env = dict(SCRIPT_FILENAME=str(tmp_path / 'website' / 'test.php'))
- assert exec_utils.run_api_script('badname', tmp_path, extra_env=extra_env) == 0
-
- @staticmethod
- def test_custom_phpcgi(tmp_path, capfd):
- assert exec_utils.run_api_script('test', tmp_path, phpcgi_bin='env',
- params={'q' : 'Berlin'}) == 0
- captured = capfd.readouterr()
-
- assert '?q=Berlin' in captured.out
-
- @staticmethod
- def test_fail_on_error_output(tmp_path):
- # Starting PHP 8 the PHP CLI no longer has STDERR defined as constant
- php = """
- <?php
- if(!defined('STDERR')) define('STDERR', fopen('php://stderr', 'wb'));
- fwrite(STDERR, 'WARNING'.PHP_EOL);
- """
- (tmp_path / 'website' / 'bad.php').write_text(php)
-
- assert exec_utils.run_api_script('bad', tmp_path) == 1
### run_osm2pgsql
# Now you can install all packages needed for Nominatim:
- sudo apt install -y php-cgi
sudo apt install -y build-essential cmake g++ libboost-dev libboost-system-dev \
libboost-filesystem-dev libexpat1-dev zlib1g-dev \
libbz2-dev libpq-dev liblua5.3-dev lua5.3 lua-dkjson \
# Now you can install all packages needed for Nominatim:
- sudo apt install -y php-cgi
sudo apt install -y build-essential cmake g++ libboost-dev libboost-system-dev \
libboost-filesystem-dev libexpat1-dev zlib1g-dev \
libbz2-dev libpq-dev liblua5.3-dev lua5.3 lua-dkjson \