# -*- coding: utf-8 -*-
"""
© Copyright Institut National de l'Information Géographique et Forestière (2020)
Contributors:
Yann Méneroux
Marie-Dominique Van Damme
Creation date: 1th november 2020
tracklib library provides a variety of tools, operators and
functions to manipulate GPS trajectories. It is a open source contribution
of the LASTIG laboratory at the Institut National de l'Information
Géographique et Forestière (the French National Mapping Agency).
See: https://tracklib.readthedocs.io
This software is governed by the CeCILL-C license under French law and
abiding by the rules of distribution of free software. You can use,
modify and/ or redistribute the software under the terms of the CeCILL-C
license as circulated by CEA, CNRS and INRIA at the following URL
"http://www.cecill.info".
As a counterpart to the access to the source code and rights to copy,
modify and redistribute granted by the license, users are provided only
with a limited warranty and the software's author, the holder of the
economic rights, and the successive licensors have only limited
liability.
In this respect, the user's attention is drawn to the risks associated
with loading, using, modifying and/or developing or reproducing the
software by the user in light of its specific status of free software,
that may mean that it is complicated to manipulate, and that also
therefore means that it is reserved for developers and experienced
professionals having in-depth computer knowledge. Users are therefore
encouraged to load and test the software's suitability as regards their
requirements in conditions enabling the security of their systems and/or
data to be ensured and, more generally, to use and operate it in the
same conditions as regards security.
The fact that you are presently reading this means that you have had
knowledge of the CeCILL-C license and that you accept its terms.
Read network files
"""
import csv
import json
import os.path
import progressbar
import requests
from xml.dom import minidom
from tracklib.util.exceptions import *
from tracklib.core import ObsTime, ENUCoords, ECEFCoords, GeoCoords, Obs
from tracklib.algo import computeAbsCurv
from tracklib.core import Bbox, Track, Network, Node, Edge, SpatialIndex
from . import NetworkFormat
class NetworkReader:
"""
This class offers static methods to load network from file or from a wfs service.
"""
[docs]
@staticmethod
def readFromFile(path:str, formatfile:Union[str, NetworkFormat]="DEFAULT", verbose=True)->Network:
"""
Read a network from CSV file with geometry structured in coordinates.
Before to load network data, you need to have a format defined
in **network_file_format** (in tracklib/resources directory) which
describes metadata of the file. If it doesn't exists, you need to create one format.
For example, let's define a format call 'VTT' in the file *network_file_format*
which corresponds to a network associated with mountain bike tracks.
So, you add a new line like this:
*VTT, 1, 2, 3, 0, -1, 4, c, 1, True, UTF-8, GEO*
where:
- the first value 'VTT' represents the format name,
- the second value '1' represents the index of column containing edge identifier in the CSV file
- the third value '2' represents index of column containing source node identifier in the CSV file
- the fourth value '3' represents index of column containing target node identifier in the CSV file
- the fifth value '0' represents index of column containing geometry of edge in wkt format in the CSV file. You can specify also optional parameters:
- a path cost (arbitrarily set to be proportional to the length of the WKT if unlined (-1))
- an orientation index. An integer arbitrarily set to: 0 to indicate two-way, 1 direct way and -1 indirect way
- the separating characters (can be multiple characters). Can be: c (comma), b (blankspace), s (semi-column)
- the number of heading line in format
- true to manage double quote
- the encoding like utf-8
- srid: coordinate system of points (ENU, Geo or ECEF)
Parameters
-----------
:param path: file or directory
:param formatfile: name of format which describes metadata of the file
:return: network.
"""
if not os.path.isfile(path):
raise WrongArgumentError("First parameter (path) doesn't refers to a file.")
if not isinstance(formatfile, str) and not isinstance(formatfile, NetworkFormat):
raise WrongArgumentError("The second parameter is not an instantiation of a str or a NetworkFormat")
#print ("!!!", formatfile)
# Use format or read from resources
if isinstance(formatfile, NetworkFormat):
fmt = formatfile
else:
fmt = NetworkFormat(formatfile)
num_lines = sum(1 for line in open(path))
fmt.controlFormat()
if verbose:
print("Loading network...")
network = Network()
with open(path, encoding="utf-8") as csvfile:
spamreader = csv.reader(csvfile, delimiter=fmt.separator, doublequote=True)
# Header
cpt = 0
for row in spamreader:
cpt += 1
if cpt >= fmt.header:
break
if verbose:
spamreader = progressbar.progressbar(spamreader, max_value=num_lines)
for row in spamreader:
res = readLineAndAddToNetwork(row, fmt)
if res != None:
(edge, noeudIni, noeudFin) = res
network.addEdge(edge, noeudIni, noeudFin)
# Return network loaded
return network
counter = 0
NB_PER_PAGE = 1000
URL_SERVER = "https://data.geopf.fr/wfs/ows?"
URL_SERVER += "service=WFS&version=2.0.0&request=GetFeature&"
URL_SERVER += "typeName=BDTOPO_V3:troncon_de_route&"
# URL_SERVER += "srsName=EPSG:2154&"
URL_SERVER += "outputFormat=json&"
# URL_SERVER += "BBOX=44.538617443499014,5.808794912294471,45.05505710140573,6.644301708889899"
# URL_SERVER += "&count=3&startIndex=0"
resource_path = os.path.join(os.path.split(__file__)[0], "../..")
PROXY_FILE_FORMAT = os.path.join(resource_path, "resources/proxy")
[docs]
@staticmethod
def requestFromIgnGeoportail(
bbox:Bbox, tolerance=0.0001, spatialIndex=True, proxy=None
) -> Network:
"""
Parameters
-----------
:param bbox: The bounding box of the selected area (The bounding box must
be expressed in WGS84).
:param tolerance: parameter specifies the maximum distance accepted for merging two nodes (edge ends)
:param proxy: name of proxy or proxy parameters in a dictionnary. parameter connexion defined in the tracklib/resources/proxy file
"""
xmin = bbox.getXmin()
xmax = bbox.getXmax()
ymin = bbox.getYmin()
ymax = bbox.getYmax()
dx = (xmax - xmin) / 2
dy = (ymax - ymin) / 2
network = Network()
cptNode = 0
proj = "EPSG:4326"
srid = "GEO"
fmt = NetworkFormat()
fmt.createFromDict(
{
"name": "WFS",
"pos_edge_id": 0,
"pos_source": 5,
"pos_target": 6,
"pos_wkt": 1,
"pos_weight": 3,
"pos_direction": 4,
"srid": srid,
}
)
# PROXY
proxyDict = {}
if proxy != None and isinstance(proxy, str):
with open(NetworkReader.PROXY_FILE_FORMAT) as ffmt:
line = ffmt.readline().strip()
while line:
if line[0] == "#":
line = ffmt.readline().strip()
continue
tab = line.split(",")
if tab[0].strip() == proxy:
FIELDS = tab
proxyDict[FIELDS[1].lower()] = FIELDS[2]
line = ffmt.readline().strip()
ffmt.close()
if proxy != None and isinstance(proxy, dict):
proxyDict = proxy
# print (proxyDict)
nbRoute = NetworkReader.__getNbRouteEmprise(bbox, proxyDict)
if nbRoute == 0:
return None
nbiter = int(nbRoute / NetworkReader.NB_PER_PAGE) + 1
offset = 0
for j in range(nbiter):
print("PAGE " + str(j + 1) + "/" + str(nbiter))
URL_FEAT = NetworkReader.URL_SERVER
URL_FEAT += "BBOX=" + str(bbox[2]) + "," + str(bbox[0])
URL_FEAT += "," + str(bbox[3]) + "," + str(bbox[1])
URL_FEAT += "&count=" + str(NetworkReader.NB_PER_PAGE)
URL_FEAT += "&startIndex=" + str(offset)
URL_FEAT += "&RESULTTYPE=results"
URL_FEAT += "&srsname=" + proj
# print (URL_FEAT)
response = requests.get(URL_FEAT, proxies=proxyDict)
data = json.loads(response.text)
features = data["features"]
for feature in progressbar.progressbar(features):
row = []
# idd = feature["id"]
idd = feature['properties']['cleabs']
# nature = feature['properties']['nature']
fictif = feature["properties"]["fictif"]
if fictif == "True":
continue
row.append(idd)
# TODO
# pos = feature['properties']['position_par_rapport_au_sol']
TAB_OBS = []
coords = feature["geometry"]["coordinates"]
if feature["geometry"]["type"] == "LineString":
# print (str(len(coords)))
# geom = coords
# print (coords)
typeCoord = "GEOCOORDS"
if proj == "EPSG:2154":
typeCoord = "ENUCoords"
TAB_OBS = tabCoordsLineStringToObs(coords, typeCoord)
if len(TAB_OBS) < 2:
continue
track = Track(TAB_OBS)
row.append(track.toWKT())
row.append("ENU")
row.append(track.length())
# Orientation
sens = feature["properties"]["sens_de_circulation"]
orientation = Edge.DOUBLE_SENS
if sens == None or sens == "":
orientation = Edge.DOUBLE_SENS
elif sens == "Double sens" or sens == "Sans objet":
orientation = Edge.DOUBLE_SENS
elif sens == "Direct" or sens == "Sens direct":
orientation = Edge.SENS_DIRECT
elif sens == "Indirect" or sens == "Sens inverse":
orientation = Edge.SENS_INVERSE
else:
print(sens)
row.append(orientation)
# Source node
idNoeudIni = str(cptNode)
p1 = track.getFirstObs().position
candidates = selectNodes(network, Node("0", p1), tolerance)
if len(candidates) > 0:
c = candidates[0]
idNoeudIni = c.id
else:
cptNode += 1
# Target node
idNoeudFin = str(cptNode)
p2 = track.getLastObs().position
candidates = selectNodes(network, Node("0", p2), tolerance)
if len(candidates) > 0:
c = candidates[0]
idNoeudFin = c.id
else:
cptNode += 1
row.append(idNoeudIni)
row.append(idNoeudFin)
(edge, noeudIni, noeudFin) = readLineAndAddToNetwork(row, fmt)
network.addEdge(edge, noeudIni, noeudFin)
#
offset = offset + NetworkReader.NB_PER_PAGE
if spatialIndex:
network.spatial_index = SpatialIndex(
network, resolution=(dx / 1e3, dy / 1e3), margin=0.25
)
return network
# --------------------------------------------------------------------------
# Function to count road network features in bbox
# --------------------------------------------------------------------------
# Input :
# - v: a float value
# --------------------------------------------------------------------------
# Output : number
# --------------------------------------------------------------------------
@staticmethod
def __getNbRouteEmprise(bbox, proxyDict):
"""TODO"""
nb = 0
URL_HITS = NetworkReader.URL_SERVER
URL_HITS += "BBOX=" + str(bbox[2]) + "," + str(bbox[0])
URL_HITS += "," + str(bbox[3]) + "," + str(bbox[1])
URL_HITS += "&resulttype=hits"
# print (URL_HITS)
try:
res = requests.get(URL_HITS, proxies=proxyDict)
status = res.raise_for_status()
# print (status, type(status))
except requests.exceptions.RequestException:
print("ERROR: Failed to establish connection")
return 0
# x = requests.get(url, proxies = { "https" : "https://1.1.0.1:80"})
# print ('---------' + status, type(status))
dom = minidom.parseString(res.text)
result = dom.getElementsByTagName("wfs:FeatureCollection")
nb = int(result[0].attributes["numberMatched"].value)
return nb
def readLineAndAddToNetwork(row, fmt):
"""TODO"""
edge_id = str(row[fmt.pos_edge_id])
if fmt.pos_edge_id < 0:
edge_id = NetworkReader.counter
NetworkReader.counter = NetworkReader.counter + 1
geom = str(row[fmt.pos_wkt])
TAB_OBS = wktLineStringToObs(geom, fmt.srid.upper())
# Au moins 2 points
if len(TAB_OBS) < 2:
return None
track = Track(TAB_OBS)
computeAbsCurv(track)
edge = Edge(edge_id, track)
# Orientation
if (fmt.pos_direction == -1):
edge.orientation = Edge.DOUBLE_SENS
else:
orientation = int(row[fmt.pos_direction])
if orientation not in [Edge.DOUBLE_SENS, Edge.SENS_DIRECT, Edge.SENS_INVERSE]:
orientation = Edge.DOUBLE_SENS
edge.orientation = orientation
# Poids
if fmt.pos_weight == -1:
poids = track.length()
else:
poids = float(row[fmt.pos_weight])
edge.weight = poids
# Source node
source = str(row[fmt.pos_source])
noeudIni = Node(source, track.getFirstObs().position)
# Target node
target = str(row[fmt.pos_target])
noeudFin = Node(target, track.getLastObs().position)
# Return
return (edge, noeudIni, noeudFin)
# network.addEdge(edge, noeudIni, noeudFin)
def wktLineStringToObs(wkt, srid):
"""
Une polyligne de n points est modélisée par une Track (timestamp = 1970/01/01 00 :00 :00)
Cas LINESTRING()
"""
# Creation d'une liste vide
TAB_OBS = list()
# Separation de la chaine
coords_string = wkt.split("(")
coords_string = coords_string[1]
coords_string = coords_string.split(")")[0]
coords = coords_string.split(",")
for i in range(0, len(coords)):
sl = coords[i].strip().split(" ")
x = float(sl[0])
y = float(sl[1])
if len(sl) == 3:
z = float(sl[2])
else:
z = 0.0
if not srid.upper() in [
"ENUCOORDS",
"ENU",
"GEOCOORDS",
"GEO",
"ECEFCOORDS",
"ECEF",
]:
print("Error: unknown coordinate type [" + str(srid) + "]")
exit()
if srid.upper() in ["ENUCOORDS", "ENU"]:
point = ENUCoords(x, y, z)
if srid.upper() in ["GEOCOORDS", "GEO"]:
point = GeoCoords(x, y, z)
if srid.upper() in ["ECEFCOORDS", "ECEF"]:
point = ECEFCoords(x, y, z)
TAB_OBS.append(Obs(point, ObsTime()))
return TAB_OBS
def selectNodes(network, node, distance):
"""Selection des autres noeuds dans le cercle dont node.coord est le centre,
de rayon distance
:param node: le centre du cercle de recherche.
:param distance: le rayon du cercle de recherche.
:return: tableau de NODES liste des noeuds dans le cercle
"""
NODES = []
if network.spatial_index is None:
for key in network.getIndexNodes():
n = network.NODES[key]
if n.coord.distance2DTo(node.coord) <= distance:
NODES.append(n)
else:
vicinity_edges = network.spatial_index.neighborhood(node.coord, unit=1)
for e in vicinity_edges:
source = network.EDGES[network.getEdgeId(e)].source
target = network.EDGES[network.getEdgeId(e)].target
if source.coord.distance2DTo(node.coord) <= distance:
NODES.append(source)
if target.coord.distance2DTo(node.coord) <= distance:
NODES.append(target)
return NODES
def tabCoordsLineStringToObs(coords, srid):
"""TODO"""
# Creation d'une liste vide
TAB_OBS = list()
for i in range(0, len(coords)):
sl = coords[i]
x = float(sl[0])
y = float(sl[1])
if len(sl) == 3:
z = float(sl[2])
else:
z = 0.0
if not srid.upper() in [
"ENUCOORDS",
"ENU",
"GEOCOORDS",
"GEO",
"ECEFCOORDS",
"ECEF",
]:
print("Error: unknown coordinate type [" + str(srid) + "]")
exit()
if srid.upper() in ["ENUCOORDS", "ENU"]:
point = ENUCoords(x, y, z)
if srid.upper() in ["GEOCOORDS", "GEO"]:
point = GeoCoords(x, y, z)
if srid.upper() in ["ECEFCOORDS", "ECEF"]:
point = ECEFCoords(x, y, z)
TAB_OBS.append(Obs(point, ObsTime()))
return TAB_OBS