Rework opendata import script

* Homogeneize the handling of opendata files.
* Use a clear distinction between geometry (the position of the report)
and geo_shape (the shape of the report area).
* Keep track of OpenData sources, fix #51.
* Keep track of the area covered by a given report.
This commit is contained in:
Lucas Verney 2018-10-30 16:58:57 +01:00
parent 20dd75b152
commit c5b64cfc72
4 changed files with 266 additions and 121 deletions

View File

@ -31,6 +31,10 @@ def run_migration():
'report', 'source',
peewee.CharField(max_length=255)
),
migrator.add_column(
'report', 'shape_geojson',
peewee.TextField(default=None, null=True)
),
)
query = Report.select()
for report in query:

View File

@ -2,6 +2,7 @@
"""
Import French opendata about works on roads.
"""
import json
import logging
import os
import sys
@ -15,7 +16,8 @@ import requests
from functools import partial
from shapely.geometry import Point, shape
from shapely.geometry import LineString, MultiPolygon, Point, Polygon
from shapely.geometry import mapping, shape
from shapely.ops import transform
from server.models import db, Report
@ -34,17 +36,34 @@ REPORT_DOWNVOTES_THRESHOLD = 1
def preprocess_lille(data):
out = []
for item in data:
try:
fields = item['fields']
new_item = {
'fields': fields,
'geometry': {
'type': 'Point',
# Use lng, lat
'coordinates': fields['geo_point_2d'][::-1]
},
'recordid': item['recordid'],
'source': 'opendata-lille'
}
new_fields = new_item['fields']
# Homogeneize geo_shape
new_fields['geo_shape'] = new_item['geometry']
# Homogeneize start date spelling
fields['date_debut'] = fields['date_demarrage']
# Homogeneize geo shape
fields['geo_shape'] = fields['geometry']
new_fields['date_debut'] = new_fields['date_demarrage']
out.append(new_item)
except KeyError as exc:
logging.warning('Invalid item in Lille data: %s.', exc)
logging.warning(
'Invalid item %s in Lille data: %s.',
item.get('recordid', '?'),
exc
)
continue
return data
return out
def preprocess_loiret(data):
@ -59,35 +78,41 @@ def preprocess_loiret(data):
[item['geometry']['x'], item['geometry']['y']]
]
]
# In Loiret, multiple paths are for multiple LineStrings
for path in paths:
if len(path) == 1:
geometry = {
geo_shape = {
'type': 'Point',
'coordinates': path[0]
}
else:
geometry = {
geo_shape = {
'type': 'LineString',
'coordinates': path
}
new_item = {
'fields': item['attributes'],
'geometry': geometry,
'recordid': item['attributes']['OBJECTID']
'geometry': shape(geo_shape).centroid,
'recordid': item['attributes']['OBJECTID'],
'source': 'opendata-loiret'
}
fields = new_item['fields']
new_fields = new_item['fields']
# Homogeneize geo_shape
new_fields['geo_shape'] = geo_shape
# Homogeneize start and end date spelling
fields['date_debut'] = arrow.get(
float(fields['STARTDATE']) / 1000
new_fields['date_debut'] = arrow.get(
float(new_fields['STARTDATE']) / 1000
).isoformat()
fields['date_fin'] = arrow.get(
float(fields['ENDDATE']) / 1000
new_fields['date_fin'] = arrow.get(
float(new_fields['ENDDATE']) / 1000
).isoformat()
# Homogeneize geo shape
fields['geo_shape'] = geometry
out.append(new_item)
except KeyError as exc:
logging.warning('Invalid item in Loiret data: %s.', exc)
logging.warning(
'Invalid item %s in Loiret data: %s.',
item.get('attributes', {}).get('OBJECTID', '?'),
exc
)
if RAISE_ON_EXCEPT:
raise
continue
@ -100,18 +125,23 @@ def preprocess_lyon(data):
try:
new_item = {
'fields': item['properties'],
'geometry': item['geometry'],
'recordid': item['properties']['identifiant']
'geometry': shape(item['geometry']).centroid,
'recordid': item['properties']['identifiant'],
'source': 'opendata-lyon'
}
fields = new_item['fields']
new_fields = new_item['fields']
# Homogeneize geo_shape
new_fields['geo_shape'] = item['geometry']
# Homogeneize start date and end date spelling
fields['date_debut'] = fields['debutchantier']
fields['date_fin'] = fields['finchantier']
# Homogeneize geo shape
fields['geo_shape'] = item['geometry']
new_fields['date_debut'] = new_fields['debutchantier']
new_fields['date_fin'] = new_fields['finchantier']
out.append(new_item)
except KeyError as exc:
logging.warning('Invalid item in Lyon data: %s.', exc)
logging.warning(
'Invalid item %s in Lyon data: %s.',
item.get('properties', {}).get('identifiant', '?'),
exc
)
if RAISE_ON_EXCEPT:
raise
continue
@ -124,18 +154,23 @@ def preprocess_montpellier(data):
try:
new_item = {
'fields': item['properties'],
'geometry': item['geometry'],
'recordid': item['properties']['numero']
'geometry': shape(item['geometry']).centroid,
'recordid': item['properties']['numero'],
'source': 'opendata-montpellier'
}
fields = new_item['fields']
new_fields = new_item['fields']
# Homogeneize geo_shape
new_fields['geo_shape'] = item['geometry']
# Homogeneize start date and end date spelling
fields['date_debut'] = fields['datedebut']
fields['date_fin'] = fields['datefin']
# Homogeneize geo shape
fields['geo_shape'] = item['geometry']
new_fields['date_debut'] = new_fields['datedebut']
new_fields['date_fin'] = new_fields['datefin']
out.append(new_item)
except KeyError as exc:
logging.warning('Invalid item in Montpellier data: %s.', exc)
logging.warning(
'Invalid item %s in Montpellier data: %s.',
item.get('properties', {}).get('numero', '?'),
exc
)
if RAISE_ON_EXCEPT:
raise
continue
@ -156,21 +191,49 @@ def preprocess_nancy(data):
new_item = {
'fields': item['attributes'],
'geometry': geometry,
'recordid': item['attributes']['OBJECTID']
'recordid': item['attributes']['OBJECTID'],
'source': 'opendata-nancy'
}
fields = new_item['fields']
new_fields = new_item['fields']
# Homogeneize geo_shape
new_fields['geo_shape'] = geometry
# Homogeneize start and end date spelling
fields['date_debut'] = arrow.get(
float(fields['DATE_DEBUT']) / 1000
new_fields['date_debut'] = arrow.get(
float(new_fields['DATE_DEBUT']) / 1000
).isoformat()
fields['date_fin'] = arrow.get(
float(fields['DATE_FIN']) / 1000
new_fields['date_fin'] = arrow.get(
float(new_fields['DATE_FIN']) / 1000
).isoformat()
# Homogeneize geo shape
fields['geo_shape'] = geometry
out.append(new_item)
except KeyError as exc:
logging.warning('Invalid item in Nancy data: %s.', exc)
logging.warning(
'Invalid item %s in Nancy data: %s.',
item.get('attributes', {}).get('OBJECTID', '?'),
exc
)
if RAISE_ON_EXCEPT:
raise
continue
return out
def preprocess_paris(data):
out = []
for item in data:
try:
new_item = {
'fields': item['fields'],
'geometry': item['geometry'],
'recordid': item['recordid'],
'source': 'opendata-paris',
}
out.append(new_item)
except KeyError as exc:
logging.warning(
'Invalid item %s in Paris data: %s.',
item.get('recordid', '?'),
exc
)
if RAISE_ON_EXCEPT:
raise
continue
@ -183,17 +246,22 @@ def preprocess_rennes(data):
try:
new_item = {
'fields': item['properties'],
'geometry': item['geometry'],
'recordid': item['properties']['id']
'geometry': shape(item['geometry']),
'recordid': item['properties']['id'],
'source': 'opendata-rennes'
}
fields = new_item['fields']
new_fields = new_item['fields']
# Homogeneize geo_shape
new_fields['geo_shape'] = item['geometry']
# Homogeneize start date spelling
fields['date_debut'] = fields['date_deb']
# Homogeneize geo shape
fields['geo_shape'] = item['geometry']
new_fields['date_debut'] = new_fields['date_deb']
out.append(new_item)
except KeyError as exc:
logging.warning('Invalid item in Rennes data: %s.', exc)
logging.warning(
'Invalid item %s in Rennes data: %s.',
item.get('properties', {}).get('id', '?'),
exc
)
if RAISE_ON_EXCEPT:
raise
continue
@ -206,15 +274,19 @@ def preprocess_seine_saint_denis(data):
try:
new_item = {
'fields': item['properties'],
'geometry': item['geometry'],
'recordid': item['properties']['id']
'geometry': shape(item['geometry']).centroid,
'recordid': item['properties']['id'],
'source': 'opendata-seine_saint_denis'
}
fields = new_item['fields']
# Homogeneize geo shape
fields['geo_shape'] = item['geometry']
# Homogeneize geo_shape
new_item['fields']['geo_shape'] = item['geometry']
out.append(new_item)
except KeyError as exc:
logging.warning('Invalid item in Seine-Saint-Denis data: %s.', exc)
logging.warning(
'Invalid item %s in Seine-Saint-Denis data: %s.',
item.get('properties', {}).get('id', '?'),
exc
)
if RAISE_ON_EXCEPT:
raise
continue
@ -222,35 +294,61 @@ def preprocess_seine_saint_denis(data):
def preprocess_sicoval(data):
out = []
for item in data:
fields = item['fields']
try:
new_item = {
'fields': item['fields'],
'geometry': item['geometry'],
'recordid': item['recordid'],
'source': 'opendata-sicoval'
}
new_fields = new_item['fields']
# Homogeneize geo_shape
new_fields['geo_shape'] = new_fields['geoshape2']
# Homogeneize start date and end date spelling
fields['date_debut'] = fields['startdate']
fields['date_fin'] = fields['enddate']
# Homogeneize geo shape
fields['geo_shape'] = fields['geoshape2']
new_fields['date_debut'] = new_fields['startdate']
new_fields['date_fin'] = new_fields['enddate']
out.append(new_item)
except KeyError as exc:
logging.warning('Invalid item in Sicoval data: %s.', exc)
logging.warning(
'Invalid item %s in Sicoval data: %s.',
item.get('recordid', '?'),
exc
)
if RAISE_ON_EXCEPT:
raise
continue
return data
return out
def preprocess_toulouse(data):
out = []
for item in data:
try:
fields = item['fields']
new_item = {
'fields': item['fields'],
'geometry': item['geometry'],
'recordid': item['recordid'],
'source': 'opendata-toulouse'
}
new_fields = new_item['fields']
# Homogeneize geo_shape
new_fields['geo_shape'] = item['geometry']
# Homogeneize start date and end date spelling
fields['date_debut'] = fields['datedebut']
fields['date_fin'] = fields['datefin']
new_fields['date_debut'] = new_fields['datedebut']
new_fields['date_fin'] = new_fields['datefin']
out.append(new_item)
except KeyError as exc:
logging.warning('Invalid item in Toulouse data: %s.', exc)
logging.warning(
'Invalid item %s in Toulouse data: %s.',
item.get('recordid', '?'),
exc
)
if RAISE_ON_EXCEPT:
raise
continue
return data
return out
def preprocess_versailles(data):
@ -265,6 +363,7 @@ def preprocess_versailles(data):
[item['geometry']['x'], item['geometry']['y']]
]
]
# In Versailles, multiple paths are for multiple LineStrings
for path in paths:
if len(path) == 1:
geometry = {
@ -278,22 +377,27 @@ def preprocess_versailles(data):
}
new_item = {
'fields': item['attributes'],
'geometry': geometry,
'recordid': item['attributes']['OBJECTID']
'geometry': shape(geometry).centroid,
'recordid': item['attributes']['OBJECTID'],
'source': 'opendata-versailles'
}
fields = new_item['fields']
new_fields = new_item['fields']
# Homogeneize geo_shape
new_fields['geo_shape'] = geometry
# Homogeneize start and end date spelling
fields['date_debut'] = arrow.get(
float(fields['STARTDATE']) / 1000
new_fields['date_debut'] = arrow.get(
float(new_fields['STARTDATE']) / 1000
).isoformat()
fields['date_fin'] = arrow.get(
float(fields['ENDDATE']) / 1000
new_fields['date_fin'] = arrow.get(
float(new_fields['ENDDATE']) / 1000
).isoformat()
# Homogeneize geo shape
fields['geo_shape'] = geometry
out.append(new_item)
except KeyError as exc:
logging.warning('Invalid item in Versailles data: %s.', exc)
logging.warning(
'Invalid item %s in Versailles data: %s.',
item.get('attributes', {}).get('OBJECTID', '?'),
exc
)
if RAISE_ON_EXCEPT:
raise
continue
@ -306,6 +410,7 @@ OPENDATA_URLS = {
# https://opendata.lillemetropole.fr/explore/dataset/troncons-de-voirie-impactes-par-des-travaux-en-temps-reel/
# Licence Ouverte (Etalab) : https://www.etalab.gouv.fr/wp-content/uploads/2014/05/Licence_Ouverte.pdf
"lille": {
# https://opendata.lillemetropole.fr/explore/dataset/carrefours-de-voirie-impactes-par-des-travaux-temps-reel/information/
"preprocess": preprocess_lille,
"url": "https://opendata.lillemetropole.fr/explore/dataset/troncons-de-voirie-impactes-par-des-travaux-en-temps-reel/download/?format=json&timezone=Europe/Berlin"
},
@ -341,7 +446,7 @@ OPENDATA_URLS = {
# https://opendata.paris.fr/explore/dataset/chantiers-perturbants/
# Licence ODbL : http://opendatacommons.org/licenses/odbl/
"paris": {
"preprocess": None,
"preprocess": preprocess_paris,
"url": "https://opendata.paris.fr/explore/dataset/chantiers-perturbants/download/?format=json&timezone=Europe/Berlin",
},
# Work in Rennes
@ -406,13 +511,13 @@ def process_opendata(name, data, report_type=REPORT_TYPE):
(
# Either with an expiration_datetime in the future
(
(Report.expiration_datetime != None) &
(Report.expiration_datetime is not None) &
(Report.expiration_datetime > UTC_now())
) |
# Or without expiration_datetime but which are still active (shown
# on the map)
(
(Report.expiration_datetime == None) &
(Report.expiration_datetime is None) &
(Report.downvotes < REPORT_DOWNVOTES_THRESHOLD)
)
)
@ -425,8 +530,6 @@ def process_opendata(name, data, report_type=REPORT_TYPE):
for item in data:
try:
fields = item['fields']
geometry = shape(item['geometry'])
position = geometry.centroid # Report position
# Check that the work is currently being done
now = arrow.now('Europe/Paris')
@ -439,6 +542,41 @@ def process_opendata(name, data, report_type=REPORT_TYPE):
)
continue
# Report geographical shape
if 'geo_shape' in fields:
geo_shape = shape(fields['geo_shape'])
else:
geo_shape = shape(item['geometry'])
if isinstance(geo_shape, Point):
# Single point for the item, add a report on it
positions = [geo_shape]
elif isinstance(geo_shape, LineString):
# LineString (e.g. along a way), add a report at each end
# TODO: More positions, put a report at each intersection?
positions = [
Point(geo_shape.coords[0]),
Point(geo_shape.coords[-1])
]
elif isinstance(geo_shape, Polygon):
# TODO: Finer position, put a report at intersections?
positions = [
geo_shape.centroid,
]
elif isinstance(geo_shape, MultiPolygon):
# TODO: Finer position, put a report at intersections?
positions = [
p.centroid
for p in geo_shape
]
else:
logging.warning(
'Unsupported geometry for record %s: %s.',
(item['recordid'], geo_shape)
)
continue
for position in positions:
# Check if this precise position is already in the database
if transform(project, position) in current_reports_points:
logging.info(
@ -448,20 +586,17 @@ def process_opendata(name, data, report_type=REPORT_TYPE):
)
continue
# Check no similar reports is nearby
if 'geo_shape' in fields:
geo_shape = shape(fields['geo_shape'])
else:
geo_shape = geometry
geo_shape = transform(project, geo_shape)
overlap_area = geo_shape.buffer(MIN_DISTANCE_REPORT_DETAILS)
overlap_area = transform(project, position).buffer(
MIN_DISTANCE_REPORT_DETAILS
)
is_already_inserted = False
for report_point in current_reports_points:
if report_point.within(geo_shape):
if report_point.within(overlap_area):
# A similar report is already there
is_already_inserted = True
logging.info(
('Ignoring record %s, a similar report is already in '
'the database.'),
('Ignoring record %s, a similar report is already '
'in the database.'),
item['recordid']
)
break
@ -475,12 +610,15 @@ def process_opendata(name, data, report_type=REPORT_TYPE):
expiration_datetime = end_date.replace(microsecond=0).naive
# Add the report to the db
logging.info('Adding record %s to the database.', item['recordid'])
logging.info('Adding record %s to the database.',
item['recordid'])
Report.create(
type=report_type,
expiration_datetime=expiration_datetime,
lat=lat,
lng=lng
lng=lng,
source=item['source'],
shape_geojson=json.dumps(mapping(geo_shape))
)
except KeyError as exc:
logging.warning(
@ -490,6 +628,7 @@ def process_opendata(name, data, report_type=REPORT_TYPE):
exc
)
if __name__ == '__main__':
db.connect()
for name, item in OPENDATA_URLS.items():

View File

@ -51,6 +51,7 @@ class Report(BaseModel):
upvotes = peewee.IntegerField(default=0)
downvotes = peewee.IntegerField(default=0)
source = peewee.CharField(max_length=255)
shape_geojson = peewee.TextField(default=None, null=True)
def to_json(self):
return {

View File

@ -170,7 +170,8 @@ def post_report():
type=payload['type'],
lat=payload['lat'],
lng=payload['lng'],
source=payload.get('source', 'unknown')
source=payload.get('source', 'unknown'),
shape_geojson=payload.get('shape_geojson', None)
)
# Handle expiration
if r.type in ['accident', 'gcum']: