#!/usr/bin/env python3 import argparse import sys import json from typing import Collection import pywikiapi import mwparserfromhell import requests import luadata import tomli from SPARQLWrapper import SPARQLWrapper, JSON USER_AGENT = 'osmwiki-overpass-import' OSMWIKI_ENDPOINT = 'https://wiki.openstreetmap.org/w/api.php' OVERPASS_ENDPOINT = 'https://lz4.overpass-api.de/api/interpreter' WIKIDATA_ENDPOINT = 'https://query.wikidata.org/sparql' osmwiki = pywikiapi.Site(OSMWIKI_ENDPOINT) parser = argparse.ArgumentParser() parser.add_argument('--update', action='store_true') args = parser.parse_args() def find_template_calls(template_name): for page in osmwiki.query_pages( generator='embeddedin', geititle='Template:' + template_name, prop='revisions', rvprop='content', rvslots='main', ): doc = mwparserfromhell.parse(page['revisions'][0]['slots']['main']['content']) for template in doc.filter_templates(): if template.name.matches(template_name): yield page, template # step 1: find relation ids by looking for template calls relation_ids = {} for page, tpl in find_template_calls('list relations'): if not tpl.has(1): continue for arg in tpl.get(1).split(): if arg.isdigit(): relation_ids[int(arg)] = relation_ids.get(arg, False) or tpl.has('length') else: # TODO: only if verbose output is enabled print( f'[warning] found unexpected argument "{arg}" in {page["title"]}', file=sys.stderr, ) # step 2: query overpass turbo about relations sess = requests.session() sess.headers['user-agent'] = USER_AGENT relations = {} overpass_info = None def get_length(route, relations): """ The length operator of Overpass QL does not support superrelations (relations containing relations) and just returns 0 for those, so we need to implement our own recursion. """ if route['members']: # If there are any members of type relation we recurse, # ensuring that we still report a sensible result when # ways are mistakenly added directly to superrelations. # TODO: gracefully handle KeyError exception return sum(get_length(relations[m], relations) for m in route['members']) else: return float(route['tags']['length']) def query_lengths_from_wikidata(wikidata_ids): query = """SELECT ?id ?length WHERE { VALUES ?id { %s } ?id p:P2043/psn:P2043/wikibase:quantityAmount ?length . }""" % ' '.join( f'wd:' + x for x in wikidata_ids ) sparql = SPARQLWrapper(WIKIDATA_ENDPOINT, agent=USER_AGENT) sparql.setQuery(query) sparql.setReturnFormat(JSON) return { res['id']['value'].rsplit('/', maxsplit=1)[1]: float(res['length']['value']) for res in sparql.query().convert()["results"]["bindings"] } def query_overpass(relation_ids: Collection[int], with_length: bool): # TODO: split query if it gets too large query = '[out:json]; (' for rel_id in relation_ids: query += f'relation({rel_id});' if with_length: query += 'rel(r);' # 1. recursion query += 'rel(r);' # 2. recursion query += 'rel(r);' # 3. recursion query += ');' if with_length: query += 'convert result ::=::, ::id=id(), length=length(), member_ids=per_member(ref()), member_types=per_member(mtype());' query += 'out body;' res = sess.get(OVERPASS_ENDPOINT, params=dict(data=query)) res.raise_for_status() KEYS = ('name', 'name:en', 'wikidata', 'wikipedia', 'website') res = res.json() all_relations = {rel['id']: rel for rel in res['elements']} if with_length: for rel in all_relations.values(): member_ids = [int(x) for x in rel['tags']['member_ids'].split(';')] member_types = rel['tags']['member_types'].split(';') rel['members'] = [ m_id for m_id, m_type in zip(member_ids, member_types) if m_type == 'relation' ] def wikidata_ids(): for rel_id in relation_ids: wikidata_id = all_relations[rel_id]['tags'].get('wikidata') if wikidata_id: yield wikidata_id wikidata_lengths = query_lengths_from_wikidata(wikidata_ids()) for rel_id in relation_ids: rel = all_relations.get(rel_id) if not rel: # TODO: report 404 in generated data print( f'[warning] could not find relation {rel_id}', file=sys.stderr, ) continue data = dict(tags={k: v for k, v in rel['tags'].items() if k in KEYS}) if with_length: data['mapped_length'] = round(get_length(rel, all_relations) / 1000, 1) wikidata_id = rel['tags'].get('wikidata') if wikidata_id in wikidata_lengths: data['wikidata_length'] = round(wikidata_lengths[wikidata_id] / 1000, 1) relations[rel_id] = data global overpass_info overpass_info = res['osm3s'] query_overpass([rel_id for rel_id, length in relation_ids.items() if not length], False) query_overpass([rel_id for rel_id, length in relation_ids.items() if length], True) # step 3: serialize data as Lua text = f'''\ -- This page is automatically generated by a Python script using the Overpass API. -- {overpass_info['copyright']} -- The wikidata_length data is queried from www.wikidata.org and available under Creative Commons CC0 License. return ''' + luadata.serialize( dict(relations=relations) ) if args.update: with open('credentials.toml', 'rb') as f: creds = tomli.load(f) osmwiki.login(creds['username'], creds['password']) csrf_token = osmwiki('query', meta='tokens')['query']['tokens']['csrftoken'] osmwiki( 'edit', title='Module:Report/data', text=text, token=csrf_token, summary=f"update (osm_base = {overpass_info['timestamp_osm_base']})", ) else: print(text)