1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
|
#!/usr/bin/env python3
import argparse
import sys
import json
from collections.abc import Collection
import pywikiapi
import mwparserfromhell
import requests
import luadata
import tomli
OSMWIKI_ENDPOINT = 'https://wiki.openstreetmap.org/w/api.php'
OVERPASS_ENDPOINT = 'https://lz4.overpass-api.de/api/interpreter'
osmwiki = pywikiapi.Site(OSMWIKI_ENDPOINT)
parser = argparse.ArgumentParser()
parser.add_argument('--update', action='store_true')
args = parser.parse_args()
def find_template_calls(template_name):
for page in osmwiki.query_pages(
generator='embeddedin',
geititle='Template:' + template_name,
prop='revisions',
rvprop='content',
rvslots='main',
):
doc = mwparserfromhell.parse(page['revisions'][0]['slots']['main']['content'])
for template in doc.filter_templates():
if template.name.matches(template_name):
yield page, template
# step 1: find relation ids by looking for template calls
relation_ids = {}
for page, tpl in find_template_calls('list relations'):
if not tpl.has(1):
continue
for arg in tpl.get(1).split():
if arg.isdigit():
relation_ids[int(arg)] = relation_ids.get(arg, False) or tpl.has('length')
else:
# TODO: only if verbose output is enabled
print(
f'[warning] found unexpected argument "{arg}" in {page["title"]}',
file=sys.stderr,
)
# step 2: query overpass turbo about relations
sess = requests.session()
sess.headers['user-agent'] = 'osmwiki-overpass-bridge'
relations = {}
overpass_info = None
def get_length(route, relations):
"""
The length operator of Overpass QL does not support superrelations
(relations containing relations) and just returns 0 for those,
so we need to implement our own recursion.
"""
if route['members']:
# If there are any members of type relation we recurse,
# ensuring that we still report a sensible result when
# ways are mistakenly added directly to superrelations.
# TODO: gracefully handle KeyError exception
return sum(get_length(relations[m], relations) for m in route['members'])
else:
return float(route['tags']['length'])
def query_overpass(relation_ids: Collection[int], with_length: bool):
# TODO: split query if it gets too large
query = '[out:json]; ('
for rel_id in relation_ids:
query += f'relation({rel_id});'
if with_length:
query += 'rel(r);' # 1. recursion
query += 'rel(r);' # 2. recursion
query += 'rel(r);' # 3. recursion
query += ');'
if with_length:
query += 'convert result ::=::, ::id=id(), length=length(), member_ids=per_member(ref()), member_types=per_member(mtype());'
query += 'out body;'
res = sess.get(OVERPASS_ENDPOINT, params=dict(data=query))
res.raise_for_status()
KEYS = ('name', 'name:en', 'wikidata', 'wikipedia', 'website')
res = res.json()
all_relations = {rel['id']: rel for rel in res['elements']}
if with_length:
for rel in all_relations.values():
member_ids = [int(x) for x in rel['tags']['member_ids'].split(';')]
member_types = rel['tags']['member_types'].split(';')
rel['members'] = [
m_id
for m_id, m_type in zip(member_ids, member_types)
if m_type == 'relation'
]
for rel_id in relation_ids:
rel = all_relations[rel_id]
data = dict(tags={k: v for k, v in rel['tags'].items() if k in KEYS})
if with_length:
data['mapped_length'] = get_length(rel, all_relations)
relations[rel_id] = data
global overpass_info
overpass_info = res['osm3s']
query_overpass([rel_id for rel_id, length in relation_ids.items() if not length], False)
query_overpass([rel_id for rel_id, length in relation_ids.items() if length], True)
# step 3: serialize data as Lua
text = f'''\
-- This page is automatically generated by a Python script using the Overpass API.
-- {overpass_info['copyright']}
-- osm_base = {overpass_info['timestamp_osm_base']}
return ''' + luadata.serialize(
dict(relations=relations)
)
if args.update:
with open('credentials.toml', 'rb') as f:
creds = tomli.load(f)
osmwiki.login(creds['username'], creds['password'])
csrf_token = osmwiki('query', meta='tokens')['query']['tokens']['csrftoken']
osmwiki('edit', title='Module:Report/data', text=text, token=csrf_token)
else:
print(text)
|