#!/usr/bin/env python3 import sys import os import geoip2.database import ipaddress # Constants DB_PATH = "<%= node[:geoipupdate][:directory] %>/GeoLite2-Country.mmdb" # Default region when continent doesn't match any in the dictionary DEFAULT_REGION = "eu-central-1" # Mapping of continents to AWS regions CONTINENT_TO_AWS_REGION = { "NA": "us-west-2", # North America "OC": "us-west-2", # Oceania "SA": "us-west-2", # South America } # Global to store last known modification time and database reader last_mod_time = None reader = None def is_valid_ip(ip_str): """Check if a string is a valid IPv4 or IPv6 address.""" try: ipaddress.ip_address(ip_str) return True except ValueError: return False def get_reader(): """Get the geoip2 database reader. Reload if the DB file has changed.""" global last_mod_time global reader if not os.path.exists(DB_PATH): return None # Database file missing current_mod_time = os.path.getmtime(DB_PATH) # If file has changed or reader isn't initialized, reload it if reader is None or current_mod_time != last_mod_time: if reader: reader.close() # Close the existing reader before reinitializing reader = geoip2.database.Reader(DB_PATH) last_mod_time = current_mod_time return reader def get_continent_from_ip(ip_address): """Return the continent for a given IP address.""" if not is_valid_ip(ip_address): return None reader = get_reader() if reader is None: return None # No continent as DB is missing try: response = reader.country(ip_address) return response.continent.code except: return None # Indicates invalid IP address or other issues def determine_aws_region(continent_code): """Determine AWS region based on the continent code using a dictionary.""" return CONTINENT_TO_AWS_REGION.get(continent_code, DEFAULT_REGION) def main(): """Main function to process IP addresses from stdin and return AWS regions.""" for line in sys.stdin: ip_address = line.strip() continent_code = get_continent_from_ip(ip_address) aws_region = determine_aws_region(continent_code) sys.stdout.write(f"{aws_region}\n") sys.stdout.flush() if __name__ == "__main__": main()