Source code for straph.dags.condensation_dag

# Copyright (C) 2017-2020 Léo Rannou - Sorbonne Université/LIP6 - Thales
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import copy
import math
import matplotlib.patches as mpatch
import matplotlib.pyplot as plt
import random
from collections import defaultdict, deque

from straph import components as comp
from straph import paths as pt
from straph import stream as sg
from straph.dags.dag import Dag
from straph.dags.stable_dag import StableDag


[docs]def compute_all_foremost_paths(dict_id_wcc_to_dag, index_node_to_scc, source, destination, duration_threshold=None, start_comp=None): """ Compute a foremost path between 'source' and 'destination" starting at 'begin time'. :param dict_id_wcc_to_dag: :param index_node_to_scc: :param source: :param destination: :param duration_threshold: :param start_comp: :return: """ start = source[0] source = source[1] # print("Foremost path from ", source, " to ", destination, " starting at time ", start) # Get starting component if start_comp: id_wcc, start_comp = start_comp else: id_wcc, start_comp = None, None for (i, id_scc) in index_node_to_scc[source]: id_wcc = i c = dict_id_wcc_to_dag[id_wcc].id_comp_to_comp[id_scc] if c.times[0] <= start <= c.times[1]: if destination in c.nodes: return [[id_scc]], 0, id_wcc start_comp = id_scc break if start_comp is None: print("Node : " + str(source) + " does not exist in the Stream Graph at time " + str(start) + " !") adj_list = defaultdict(list) end_time_comp = [] G = dict_id_wcc_to_dag[id_wcc] if G.c_links: for l in G.c_links: # ONLY IF the destination is accesible from begin time c = G.id_comp_to_comp[l[1]] if c.times[0] >= start: adj_list[l[0]].append(l[1]) end_time_comp.append(c.times[1]) if duration_threshold is None: if not end_time_comp: return None, None, None duration_threshold = max(end_time_comp) + 1 - start # print("duration threshold :",duration_threshold) # print(" adj_list : ", adj_list) # Custom BFS on DAG def bfs_scc(a_l, start_cmp, dst): path_queue = [(start_cmp, [start_cmp])] foremost_duration = duration_threshold # This variable, once assigned, is used as a threshold (yeah) while path_queue: (v, path) = path_queue.pop(0) # print(" v : ",v) if len(path_queue) > 0 and len(path_queue) % 10000 == 0: print(" len path queue :", len(path_queue)) if v in a_l and a_l[v]: for cc in a_l[v]: cmp = dict_id_wcc_to_dag[id_wcc].id_comp_to_comp[cc] print(" comp id :", cc) print(" comp times :", cmp.times) if dst in cmp.nodes: # print("PATH Found :",path) foremost_duration = min(foremost_duration, cmp.times[0] - start) yield path + [cc] # return path + [c] elif cmp.times[0] <= start + foremost_duration: path_queue.append((cc, path + [cc])) # print(" Start BFS") foremost_paths = list(bfs_scc(adj_list, start_comp, destination)) if not foremost_paths: return None, None, None path_times = [] for p in foremost_paths: t = [] for c in p: t.append(dict_id_wcc_to_dag[id_wcc].id_comp_to_comp[c].times[0]) path_times.append(t) # print("Path times :", path_times) min_t = min([p[-1] for p in path_times]) # print("Foremost time : ", min_t) fm_paths = [] for p, t in zip(foremost_paths, path_times): if t[-1] == min_t: fm_paths.append(p) return fm_paths, min_t - start, id_wcc
[docs]class CondensationDag(Dag): def __init__(self, id=None, times=None, c_nodes=None, c_links=None, id_comp_to_comp=None, node_to_id_comp=None, segmented_node_to_id_comp=None, adj_list=None, ): """ A basic constructor for a ``CondensationDag`` :param id: :param times: :param c_nodes : A list of nodes id (each node represents a ``StronglyConnectedComponent`` object: a set of nodes, a begin time, an end time) :param c_links : A list of directed link (each directed links connects two adjacent ``StronglyConnectedComponent``) :param id_comp_to_comp: :param node_to_id_comp: :param segmented_node_to_id_comp: :param adj_list: """ super().__init__(id, times, c_nodes, c_links, id_comp_to_comp, node_to_id_comp, segmented_node_to_id_comp, adj_list)
[docs] def get_stable_dag(self): # Add stables parts as stable connected components stable_DAG = StableDag() stable_DAG.set_id(self.id) stable_DAG.times = self.times cnt_c_nodes = 0 for cmp in self.c_nodes: stable_comps = cmp.get_stable_components(format="object") for c in stable_comps: c.id = cnt_c_nodes cnt_c_nodes += 1 stable_DAG.add_nodes(stable_comps) return stable_DAG
################################ # FORMAT # ################################
[docs] def cluster_to_object(self): new_cnodes = [] for id_cc, cc in self.id_comp_to_comp.items(): assert type(cc) == list new_cnodes = comp.StronglyConnectedComponent(id=id_cc, times=(cc[0][0], cc[0][1]), nodes=set([c[2] for c in cc])) self.c_nodes = new_cnodes self.id_comp_to_comp = {cc.id: cc for cc in new_cnodes}
############################### # Paths Methods # ###############################
[docs] def path_induced_substream(self, path, node_to_label=None, path_bounds=None): """ Transform a path in the condensation dag into a substream :param path: Sequence of identifiers of ``StronglyConnectedComponent`` objects in the current \ ``CondensationDag`` :param node_to_label: :param path_bounds: :return: """ if type(path[0]) is int: path = [self.id_comp_to_comp[id_scc] for id_scc in path] new_nodes = {} nodes_to_new_nodes = defaultdict(lambda: len(nodes_to_new_nodes)) new_node_to_label = {} new_node_to_id = {} new_links = {} t0_min, t1_max = math.inf, -math.inf for c in path: t0, t1 = c.times # Initial Bounds if path_bounds is not None: t0, t1 = max(path_bounds[0], t0), min(path_bounds[1], t1) # Bounds t0_min, t1_max = min(t0, t0_min), max(t1, t1_max) for n in c.nodes: if n in nodes_to_new_nodes: new_n = nodes_to_new_nodes[n] if t0 <= new_nodes[new_n][-1]: new_nodes[new_n][-1] = t1 else: new_nodes[new_n] += [t0, t1] else: new_n = nodes_to_new_nodes[n] new_node_to_id[new_n] = n if node_to_label: new_node_to_label[new_n] = node_to_label[n] new_nodes[new_n] = [t0, t1] if c.links is not None: for l in c.links: lt0, lt1 = l[0], l[1] if lt1 < t0 or lt0 > t1: # We do not consider links that end or begin outside the bounds continue lt0, lt1 = max(t0, lt0), min(t1, lt1) u, v = l[2], l[3] new_u, new_v = nodes_to_new_nodes[u], nodes_to_new_nodes[v] if (new_u, new_v) in new_links: if lt0 <= new_links[(new_u, new_v)][-1]: new_links[(new_u, new_v)][-1] = lt1 else: new_links[(new_u, new_v)] += [lt0, lt1] else: new_links[(new_u, new_v)] = [lt0, lt1] # print("l :",(node_to_label[new_u],node_to_label[new_v])) F = sg.StreamGraph(times=[t0_min, t1_max], nodes=list(new_nodes.keys()), node_presence=list(new_nodes.values()), node_to_label=new_node_to_label, node_to_id=new_node_to_id, links=list(new_links.keys()), link_presence=list(new_links.values())) return F
#################################################### # 1. Source-Destination Time to reach/ Latencies # #################################################### # TODO: Need to update functions in 1. with below functions in 2.
[docs] def temporal_node_to_scc(self, node): """ Return the ``StronglyConnectedComponent`` containing the temporal source *node*. :param node: :return: """ n = node[2] t0, t1 = node[0], node[1] for c in self.c_nodes: if n in c.nodes and t0 <= c.times[0] <= c.times[1] <= t1: return c print("Node : " + str(n) + " does not exist in the Stream Graph at time " + str(t0) + " !")
[docs] def node_to_scc(self, n): list_scc = [] for c in self.c_nodes: if n in c.nodes: list_scc.append(c) return list_scc
[docs] def time_to_reach(self, source, destination): if type(self.c_nodes[0]) == list: self.cluster_to_object() if type(source) is int: return self._time_to_reach(source, destination) else: return self._time_to_reach_temporal_nodes(source, destination)
def _time_to_reach(self, source, destination): # TODO : to finish return def _time_to_reach_temporal_nodes(self, source, destination): """ Return the time to reach the *destination* from the temporal source node *source* in the SG. :param source: :param destination: :return: """ ttr = math.inf a_l = self.adjacency_list() start_comp = self.temporal_node_to_scc(source) if start_comp is None: return ttr st = source[0] print("Start comp:", start_comp) queue = deque([start_comp]) # comp visited = {start_comp.id} while queue: cmp = queue.popleft() if destination[2] in cmp.nodes and \ destination[0] <= cmp.times[0] <= cmp.times[1] <= destination[1]: ttr = max(min(ttr, cmp.times[0] - st), 0) if cmp.id in a_l and a_l[cmp.id]: for c_id in a_l[cmp.id]: if c_id not in visited: c = self.id_comp_to_comp[c_id] if c.times[0] <= st + ttr: queue.append(c) visited.add(c_id) return ttr
[docs] def latency(self, source, destination): if type(self.c_nodes[0]) == list: self.cluster_to_object() if type(source) is int: raise ValueError("Source node is not yet supported as input for latency computation in CondensationDag.") else: return self._latency_temporal_nodes(source, destination)
def _latency_temporal_nodes(self, source, destination): """ Return the latency between the temporal node *source* and the temporal node *destination* in the SG. :param source: :param destination: :return: """ latency = math.inf unvisited = set() a_l = self.adjacency_list() for c in self.c_nodes: # On itere sur les comp contenant source if source[2] in c.nodes and source[0] <= c.times[0] <= c.times[1] <= source[1]: unvisited.add(c.id) if destination[2] in c.nodes and destination[0] <= c.times[0] <= c.times[1] <= destination[1]: latency = 0 return latency while len(unvisited) != 0: start_comp = self.id_comp_to_comp[unvisited.pop()] st = start_comp.times[1] # starting time queue = deque([(start_comp, st)]) # comp, starting time visited = {start_comp.id} while queue: cmp, st = queue.popleft() # We can reset the starting time if source in cmp.nodes and source[0] <= cmp.times[0] <= cmp.times[1] <= source[1]: st = cmp.times[1] unvisited.discard(cmp.id) # Update latencies if destination[2] in cmp.nodes and destination[0] <= cmp.times[0] <= cmp.times[1] <= destination[1]: latency = max(min(latency, cmp.times[0] - st), 0) if cmp.id in a_l and a_l[cmp.id]: for c_id in a_l[cmp.id]: if c_id not in visited: c = self.id_comp_to_comp[c_id] if c.times[0] <= st + latency: queue.append((c, st)) visited.add(c.id) return latency ##################################################### # 2. Single-Source Time to reach and Latencies # #####################################################
[docs] def times_to_reach_ss(self, source): if type(self.c_nodes[0]) == list: self.cluster_to_object() if type(source) is int: return self._times_to_reach_ss(source) else: return self._times_to_reach_temporal_nodes_ss(source)
[docs] def postprocess_ttr(self, source, ttr_comp): ttr = {} for n in self.node_to_id_comp: potential_ttr = [ttr_comp[c] for c in self.node_to_id_comp[n] if c in ttr_comp] if potential_ttr: ttr[n] = min(potential_ttr) ttr[source] = 0 return ttr
def _times_to_reach_ss(self, source): id_start_comp = self.node_to_id_comp[source][0] # The first SCC where the source appears start_comp = self.id_comp_to_comp[id_start_comp] ttr_comp = self._times_to_reach_comp_ss(id_start_comp, start_time=start_comp.times[0]) ttr = self.postprocess_ttr(source, ttr_comp) return ttr def _times_to_reach_temporal_nodes_ss(self, source): potential_start_comp = self.segmented_node_to_id_comp[source] id_start_comp = None for id_comp in potential_start_comp: comp_t0, comp_t1 = self.id_comp_to_comp[id_comp].times if comp_t0 <= source[0] <= comp_t1: id_start_comp = id_comp break ttr_comp = self._times_to_reach_comp_ss(id_start_comp, start_time=source[0]) ttr = self.postprocess_ttr(source[2], ttr_comp) return ttr def _times_to_reach_comp_ss(self, id_start_comp, start_time): """ Return the times to reach from the temporal source node :*source* to every other node in the SG. :param id_start_comp: :param start_time: :return: """ a_l = self.adjacency_list() times_to_reach_comp = {id_start_comp: 0} queue = deque([id_start_comp]) visited = {id_start_comp} while queue: id_comp = queue.popleft() cmp = self.id_comp_to_comp[id_comp] times_to_reach_comp[id_comp] = cmp.times[0] - start_time if id_comp in a_l and a_l[id_comp]: for c_id in a_l[id_comp]: if c_id not in visited: queue.append(c_id) visited.add(c_id) return times_to_reach_comp
[docs] def latencies_ss(self, source): if type(self.c_nodes[0]) == list: self.cluster_to_object() if type(source) is int: return self._latencies_ss(source) else: return self._latencies_temporal_nodes_ss(source)
[docs] def postprocess_latencies(self, latencies_comp, source): latencies = {} for n in self.node_to_id_comp: potential_latencies = [latencies_comp[c] for c in self.node_to_id_comp[n] if c in latencies_comp] if potential_latencies: latencies[n] = min(potential_latencies) latencies[source] = 0 return latencies
def _latencies_ss(self, source): ids_start_comp = self.node_to_id_comp[source] # SCCs where the source appears latencies_comp = self._latencies_comp_ss(ids_start_comp) latencies = self.postprocess_latencies(latencies_comp, source) return latencies def _latencies_temporal_nodes_ss(self, source): ids_start_comp = self.segmented_node_to_id_comp[source] # SCC where the source appears latencies_comp = self._latencies_comp_ss(ids_start_comp) latencies = self.postprocess_latencies(latencies_comp, source[2]) return latencies def _latencies_comp_ss(self, ids_start_comp): """ Return the latencies from the temporal source node: *source* to every other node in the SG. :param ids_start_comp: :return: """ a_l = self.adjacency_list() latencies = {i: 0 for i in ids_start_comp} # clefs : destination nodes ; valeurs : latency set_start_comps = set(ids_start_comp) unvisited = set(ids_start_comp) visited_to_st = {} # TODO : unvisited and set_start _comps necessaire ? while len(unvisited) != 0: id_start_comp = unvisited.pop() st = self.id_comp_to_comp[id_start_comp].times[1] # starting time visited_to_st[id_start_comp] = st queue = deque([(id_start_comp, st)]) # comp, starting time while queue: id_comp, st = queue.popleft() cmp = self.id_comp_to_comp[id_comp] if id_comp in set_start_comps: # We can reset the starting time st = cmp.times[1] unvisited.discard(id_comp) else: if id_comp in latencies: latencies[id_comp] = min(cmp.times[0] - st, latencies[id_comp]) else: latencies[id_comp] = cmp.times[0] - st if id_comp in a_l and a_l[id_comp]: for c_id in a_l[id_comp]: if c_id not in visited_to_st or st > visited_to_st[c_id]: # TODO : Verifier que la condition s'applique bien # On ne doit pas avoir c_id plusieurs fois dans la queue !! # Ou tester en sortie de pile ! # We leave later :) queue.append((c_id, st)) visited_to_st[c_id] = st return latencies ############################################## # 3. Arbitrary Foremost and fastest Paths # ##############################################
[docs] def foremost_path(self, source, destination): """ Compute a foremost path between 'source' and 'destination" starting at 'begin time'. :param source: :param destination: :return: """ st = source[0] start_comp = self.temporal_node_to_scc(source) adj_list = self.adjacency_list() # Custom BFS on DAG def bfs_scc(a_l, start_cmp, dst): path_queue = deque([(start_cmp, [start_cmp.id])]) ttr = math.inf # This variable, once assigned, is used as a threshold (yeah) visited = {start_cmp.id} while path_queue: (cmp, path) = path_queue.popleft() if cmp.id in a_l and a_l[cmp.id]: for v_id in a_l[cmp.id]: if v_id not in visited: v = self.id_comp_to_comp[v_id] visited.add(v_id) if dst[2] in v.nodes and dst[0] <= v.times[0] <= v.times[1] <= dst[1]: ttr = max(min(ttr, v.times[0] - st), 0) yield path + [v_id] elif v.times[0] <= st + ttr: path_queue.append((v, path + [v_id])) # print(" Start BFS") foremost_paths = list(bfs_scc(adj_list, start_comp, destination)) if not foremost_paths: return None, None path_times = [] for p in foremost_paths: t = [] for c_id in p: c = self.id_comp_to_comp[c_id] t.append(c.times[0]) path_times.append(t) # print("Path times :", path_times) min_t = min([p[-1] for p in path_times]) # print("Foremost time : ", min_t) fm_paths = [] for p, t in zip(foremost_paths, path_times): if t[-1] == min_t: fm_paths.append(p) return fm_paths, min_t - st
[docs] def fastest_path(self, source, destination): """ Compute the fastest path between 'source' and 'destination' :param source: :param destination: :return: """ a_l = self.adjacency_list() ids_start_comp = set(self.segmented_node_to_id_comp[source]) unvisited = copy.copy(ids_start_comp) # SCC where the source appears visited_to_st = {} latency = math.inf fastest_paths = None while len(unvisited) != 0: id_start_comp = unvisited.pop() start_comp = self.id_comp_to_comp[id_start_comp] st = start_comp.times[1] # starting time visited_to_st[id_start_comp] = st path_queue = deque([((id_start_comp, st), [id_start_comp])]) #  ((id comp, start time),path) while path_queue: e, path = path_queue.popleft() id_comp, st = e cmp = self.id_comp_to_comp[id_comp] if id_comp in ids_start_comp: # We can reset the starting time st = cmp.times[1] path = [id_comp] unvisited.discard(id_comp) if destination[2] in cmp.nodes and destination[0] <= cmp.times[0] <= cmp.times[1] <= destination[1]: new_latency = max(cmp.times[0] - st, 0) if new_latency < latency: fastest_paths = path latency = new_latency if id_comp in a_l and a_l[id_comp]: for c_id in a_l[id_comp]: if c_id not in visited_to_st: visited_to_st[c_id] = st path_queue.append(((c_id, st), path + [c_id])) elif st > visited_to_st[c_id]: # We leave later :) path_queue.append(((c_id, st), path + [c_id])) visited_to_st[c_id] = st return fastest_paths, latency
############################################ # 4. All foremost path and fastest paths # ############################################
[docs] def all_fastest_paths(self, source, destination): """ Compute the fastest path between 'source' and 'destination' :param source: :param destination: :return: """ a_l = self.adjacency_list() ids_start_comp = set(self.segmented_node_to_id_comp[source]) unvisited = copy.copy(ids_start_comp) # SCC where the source appears # We store visited component along with the starting time corresponding with the path that reached them visited_to_st = {} latency = math.inf fastest_paths = None while len(unvisited) != 0: id_start_comp = unvisited.pop() start_comp = self.id_comp_to_comp[id_start_comp] st = start_comp.times[1] # starting time visited_to_st[id_start_comp] = st path_queue = deque([((id_start_comp, st), [id_start_comp])]) #  ((id comp, start time),path) while path_queue: e, path = path_queue.popleft() id_comp, st = e cmp = self.id_comp_to_comp[id_comp] if id_comp in ids_start_comp: # We can reset the starting time st = cmp.times[1] path = [id_comp] unvisited.discard(id_comp) if destination[2] in cmp.nodes and destination[0] <= cmp.times[0] <= cmp.times[1] <= destination[1]: new_latency = max(cmp.times[0] - st, 0) if new_latency < latency: fastest_paths = [path] latency = new_latency elif new_latency == latency: fastest_paths.append(path) if id_comp in a_l and a_l[id_comp]: for c_id in a_l[id_comp]: if c_id not in visited_to_st: # We haven't seen the comp :) visited_to_st[c_id] = st path_queue.append(((c_id, st), path + [c_id])) elif st > visited_to_st[c_id]: # We leave later :) path_queue.append(((c_id, st), path + [c_id])) visited_to_st[c_id] = st return fastest_paths, latency
[docs] def all_fastest_paths_ss(self, source): """ Compute the fastest path between 'source' and all other nodes (in condensation DAG) :param source: :return: """ a_l = self.adjacency_list() ids_start_comp = set(self.segmented_node_to_id_comp[source]) unvisited = copy.copy(ids_start_comp) # SCC where the source appears latencies_comp = {i: 0 for i in ids_start_comp} # keys : destination nodes ;values : latency fastest_paths = {i: {i} for i in ids_start_comp} while len(unvisited) != 0: id_start_comp = unvisited.pop() start_comp = self.id_comp_to_comp[id_start_comp] st = start_comp.times[1] # starting time path_queue = deque([((id_start_comp, st), [id_start_comp])]) #  ((id comp,start_time),path) while path_queue: e, path = path_queue.popleft() id_comp, st = e cmp = self.id_comp_to_comp[id_comp] new_latency = cmp.times[0] - st if id_comp in ids_start_comp: # We can reset the starting time st = cmp.times[1] path = [id_comp] unvisited.discard(id_comp) else: if id_comp not in latencies_comp or latencies_comp[id_comp] > new_latency: latencies_comp[id_comp] = new_latency fastest_paths[id_comp] = {tuple(path)} elif latencies_comp[id_comp] == new_latency: fastest_paths[id_comp].add(tuple(path)) else: continue if id_comp in a_l and a_l[id_comp]: for c_id in a_l[id_comp]: # We haven't seen the comp or its worth it to continue :) cmp = self.id_comp_to_comp[c_id] if c_id not in latencies_comp or latencies_comp[c_id] >= cmp.times[0] - st: path_queue.append(((c_id, st), path + [c_id])) fastest_paths_nodes, latencies_nodes = self.postprocess_fastest_paths(fastest_paths, latencies_comp) del fastest_paths_nodes[source[2]] return fastest_paths_nodes, latencies_nodes
[docs] def postprocess_fastest_paths(self, fastest_paths_comp, latencies_comp): # FP and Latencies in DAG to FP and Lat in Stream : latencies_nodes = defaultdict(lambda: math.inf) fastest_paths_nodes = {} for n in self.node_to_id_comp: lat = math.inf for c in self.node_to_id_comp[n]: if c in latencies_comp: if latencies_comp[c] < lat: fastest_paths_nodes[n] = fastest_paths_comp[c].copy() latencies_nodes[n] = latencies_comp[c] lat = latencies_comp[c] elif latencies_comp[c] == lat: fastest_paths_nodes[n] |= fastest_paths_comp[c] return fastest_paths_nodes, latencies_nodes
############################################### # 5. Shortest Fastest Path # # (Hybrids Methods) # ###############################################
[docs] def shortest_fastest_path_ss(self, source, node_to_label=None): """ Compute Shortest Fastest Paths in a single source manner in a ``StreamGraph`` using the current ``CondensationDag``. :param source: :param node_to_label: :return: """ lengths = defaultdict(lambda: math.inf) fastest_paths_nodes, latencies_nodes = self.all_fastest_paths_ss(source) lengths[source[2]] = 0 for destination in fastest_paths_nodes: list_fp = fastest_paths_nodes[destination] for fp in list_fp: if type(fp) == int: fp = [fp] # print("\nfp :",fp) # print("destination :",destination) # assert source[2] in self.id_comp_to_comp[fp[0]].nodes # assert destination in self.id_comp_to_comp[fp[-1]].nodes # If length of the cdag path > 1 : last time of the first comp; first time of the last comp # Because they are SCC and all stream nodes are reachables at any instant. path_bounds = False if len(fp) > 1: path_bounds = (self.id_comp_to_comp[fp[0]].times[1], self.id_comp_to_comp[fp[-1]].times[0]) # print("Path_bounds :",path_bounds) # print("latencies destination :",latencies_nodes[destination]) # assert path_bounds[1] - path_bounds[0] == latencies_nodes[destination] # else: # assert latencies_nodes[destination] == 0 # print("Path_bounds :",path_bounds) substream = self.path_induced_substream(fp, path_bounds=path_bounds, node_to_label=node_to_label) #  Get Substream for DAG paths stream_node_to_substream_node = {v: k for k, v in substream.node_to_id.items()} substream_source = stream_node_to_substream_node[source[2]] substream_destination = stream_node_to_substream_node[destination] # assert destination in stream_node_to_substream_node # assert source[2] in stream_node_to_substream_node #  Get source in Substream b, e = None, None npres = substream.node_presence[substream_source] for nt0, nt1 in zip(npres[::2], npres[1::2]): if source[0] <= nt0 <= nt1 <= source[1]: b, e = nt0, nt1 break # print("Source in stream :",source) # print("Source in substream :",(b,e,substream_source)) # print("Destination in substream :",substream_destination) # substream.plot() # plt.show() #  Compute distances in Substream # print("Latency in Stream :",latencies_nodes[destination]) # L,D = substream.latencies_and_lengths(((b, e, substream_source))) D = substream.distances((b, e, stream_node_to_substream_node[source[2]])) # if substream_destination not in D: # print("nb wcc substream :",substream.number_of_weakly_connected_component()) # wcc = substream.weakly_connected_components() # adj_list = substream.instant_graph(substream.times[0], label=False) # plot_adjacency_list(substream, adj_list, label = False) # substream.plot() # plt.show() # if fp == [8, 25, 12, 13] and destination == 3 and source[2] == 7: # # adj_list = substream.instant_graph(substream.times[0], label=False) # # plot_adjacency_list(substream, adj_list, label = False) # self.plot() # substream.plot() # plt.show() # print("Latency in substream :",L[substream_destination]) # print("length in substream :",D[substream_destination]) # assert L[substream_destination] == latencies_nodes[destination] lengths[destination] = min(lengths[destination], D[substream_destination]) # print("Length SFP HYBRID :",lengths[destination]) return latencies_nodes, lengths
# END PATHS # ######################################### # Reachability Queries # #########################################
[docs] def is_reachable(self, source, target): if self.latency(source, target): return True else: return False
######################################### # Plot Functions # #########################################
[docs] def plot_foremost_path(self, path, ttr, source, destination, col="#8f246b"): """ Draw a path on the current ``CondensationDag`` :param path: A Stream Graph :param ttr: :param source: :param destination: :param col: :return: """ if type(path[0]) is int: path = [self.id_comp_to_comp[id_scc] for id_scc in path] S = self.path_induced_substream(path) S.plot() rectangles = [] for c in path: t0, t1 = c.times[0], c.times[1] for n in c.nodes: if n == source[2]: color = '#4d79ff' elif n == destination[2]: color = '#00cc00' else: color = col rectangles.append(mpatch.Rectangle((t0, n - 0.15), width=t1 - t0, height=0.3, edgecolor=color, facecolor=color, alpha=1 )) # Plot a single rectangle for COMP rectangles.append(mpatch.Rectangle((t0, min(c.nodes) + 0.15), width=t1 - t0, height=max(c.nodes) - min(c.nodes) - 0.3, edgecolor=col, facecolor=col, alpha=0.3 )) ax = plt.gca() for r in rectangles: ax.add_artist(r) # Plot temporal source and temporal destination plt.plot([source[0]], [source[2]], color='#000099', marker='^', alpha=1, markersize=13) plt.plot([source[0] + ttr], [destination[2]], color='#004d00', marker='v', alpha=1, markersize=13) plt.title("Foremost Path from " + str(source) + " to " + str(destination))
# plt.tight_layout()
[docs] def plot_fastest_path(self, path, latency, source, destination, col="#8f246b"): """ Draw a path on the current ``CondensationDag`` :param path: :param latency: :param source: :param destination: :param col: :return: """ if type(path[0]) is int: path = [self.id_comp_to_comp[id_scc] for id_scc in path] S = self.path_induced_substream(path) S.plot() rectangles = [] for c in path: t0, t1 = c.times[0], c.times[1] for n in c.nodes: if n == source[2]: color = '#4d79ff' elif n == destination[2]: color = '#00cc00' else: color = col rectangles.append(mpatch.Rectangle((t0, n - 0.15), width=t1 - t0, height=0.3, edgecolor=color, facecolor=color, alpha=1 )) # Plot a single rectangle for COMP rectangles.append(mpatch.Rectangle((t0, min(c.nodes) + 0.15), width=t1 - t0, height=max(c.nodes) - min(c.nodes) - 0.3, edgecolor=col, facecolor=col, alpha=0.3 )) source_comp = path[0] ax = plt.gca() for r in rectangles: ax.add_artist(r) # Plot temporal source and temporal destination plt.plot([source_comp.times[1]], [source[2]], color='#000099', marker='o', alpha=1, markersize=13) plt.plot([source_comp.times[1] + latency], [destination[2]], color='#004d00', marker='d', alpha=1, markersize=13) plt.title("Fastest Path from " + str(source) + " to " + str(destination))
# plt.tight_layout()
[docs] def refactor_path(self, path_comp, path_times, source, destination): print("Path comp :", path_comp) print("Path Time :", path_times) P = pt.Path([], []) cur_node = source for i in range(len(path_comp) - 1): print("Current comp:", path_comp[i]) print("Next comp:", path_comp[i + 1]) cur_comp = self.c_nodes[path_comp[i]] next_comp = self.c_nodes[path_comp[i + 1]] intersec_nodes = cur_comp.nodes & next_comp.nodes if cur_node in intersec_nodes: # No need to jump continue else: next_node = random.choice(list(intersec_nodes)) print("Intersec_nodes :", intersec_nodes) print("current node :", cur_node, " next node:", next_node) path_inside_comp = cur_comp.random_path(cur_node, next_node) print("path_inside_comp", path_inside_comp) cur_node = next_node for j in range(len(path_inside_comp[1]) - 1): P.add_link((path_inside_comp[1][j], path_inside_comp[1][j + 1]), path_times[i]) print() path_inside_comp = self.c_nodes[path_comp[-1]].random_path(cur_node, destination) for j in range(len(path_inside_comp[1]) - 1): P.add_link((path_inside_comp[1][j], path_inside_comp[1][j + 1]), path_inside_comp[0]) print("path times :", P.times) print("path nodes :", P.links) return P