Source code for straph.components.strongly_connected_component

# Copyright (C) 2017-2021 Léo Rannou - Sorbonne Université/LIP6 - Thales
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import copy

from straph import dags as cdag
from straph.components import ConnectedComponent


[docs]def compute_strongly_connected_components(S, format="object_with_links", condensation_dag=False, isolated_nodes=True, streaming_output=None, free_memory=False): """ Compute Strongly Connected Components (SCC) of a ``StreamGraph``. :param free_memory: :param streaming_output: :param isolated_nodes: :param S: A Stream Graph :param format: Format of the output can be "cluster" or "scc_object" :param condensation_dag: Boolean, true if we want to output the Condensation DAG, false otherwise :return: """ node_2_status = {} # Dictionary associating a node to his current status : (current degree, number current comp) tmp_components = [] # List of current strongly connected components (object) final_components = [] # Clusters : [(t0,t1,u)]... cnt_scc_id = 0 # Condensation DAG if condensation_dag: condensation_dag = cdag.CondensationDag() else: condensation_dag = None # id_wcc = S.id E = S.ordered_batch_links(free_memory=free_memory) if streaming_output: opt = open(streaming_output, 'w') else: opt = None for batch in E: # print("\n Batch :",batch) c = batch[0][0] if c == 1: # ARRIVAL cnt_scc_id = process_batch_link_arrival(batch, node_2_status, tmp_components, final_components, cnt_scc_id, condensation_dag=condensation_dag, format=format, streaming_output=opt) else: # DEPARTURE cnt_scc_id = process_batch_link_departure(batch, node_2_status, tmp_components, final_components, cnt_scc_id, condensation_dag=condensation_dag, format=format, streaming_output=opt) # Add isolated Nodes if isolated_nodes: for c in S.isolated_nodes(): if format == "cluster": final_components.append([c]) if condensation_dag: condensation_dag.add_node([c]) elif format == "object" or format == "object_with_links": c = StronglyConnectedComponent(id=cnt_scc_id, times=[c[0], c[1]], nodes={c[2]}) final_components.append(c) if condensation_dag: condensation_dag.add_node(c) elif format == "streaming": c = (c[0], c[1], 1) if streaming_output: opt.write(str(c[0]) + ";" + str(c[1]) + ";" + str(1)) opt.write("\n") else: final_components.append(c) cnt_scc_id += 1 if condensation_dag: condensation_dag.set_id(id_wcc) return final_components, condensation_dag else: return final_components
[docs]def merge_scc(l, node_2_status, tmp_components, final_components, condensation_dag=None, cnt_scc_id=None, format="cluster", streaming_output=None): t0, t1, u, v = l id_comp_u = node_2_status[u][1] id_comp_v = node_2_status[v][1] if len(tmp_components[id_comp_v].nodes) > len(tmp_components[id_comp_u].nodes): # If a component is bigger than another we merge into the bigger one. id_comp_u, id_comp_v = id_comp_v, id_comp_u comp_1 = tmp_components[id_comp_u] comp_2 = tmp_components[id_comp_v] if comp_1.times[0] != t0: cnt_scc_id = close_component(comp_1, t0, final_components, cnt_scc_id, condensation_dag=condensation_dag, format=format, streaming_output=streaming_output) # predecessor_in_dag[n_comp_1] += [cnt_scc_id - 1] # previous closed component comp_1.set_begin_time(t0) if comp_2.times[0] != t0: cnt_scc_id = close_component(comp_2, t0, final_components, cnt_scc_id, condensation_dag=condensation_dag, format=format, streaming_output=streaming_output) # predecessor_in_dag[n_comp_1] += [cnt_scc_id - 1] # previous closed component (n_comp_1 because we merge) for n in comp_2.nodes: node_2_status[n][1] = id_comp_u # Actualize referencement before deletion of 2nd comp comp_1.merge(comp_2) comp_1.add_link(l) # Add the current link node_2_status[u][0] += 1 node_2_status[v][0] += 1 tmp_components[id_comp_v] = None return cnt_scc_id
[docs]def update_scc(node_to_update, node_to_add, l, node_2_status, tmp_components, final_components, condensation_dag=None, cnt_scc_id=None, format="cluster", streaming_output=None ): """ :param streaming_output: :param format: :param final_components: :param node_to_update: :param node_to_add: :param l: :param node_2_status: :param tmp_components: :param condensation_dag: :param cnt_scc_id: :return: """ t0, t1 = l[0], l[1] id_current_comp = node_2_status[node_to_update][1] current_comp = tmp_components[id_current_comp] if current_comp.times[0] != t0: cnt_scc_id = close_component(current_comp, t0, final_components, cnt_scc_id, condensation_dag=condensation_dag, format=format, streaming_output=streaming_output) # predecessor_in_dag[n_current_comp] += [cnt_scc_id - 1] # previous closed component current_comp.set_begin_time(t0) # Input a new begining time current_comp.add_node(node_to_add) # Add the node to the comp current_comp.add_link(l) # Actualize the component with the new link node_2_status[node_to_add] = [1, id_current_comp] node_2_status[node_to_update][0] += 1 return cnt_scc_id
[docs]def create_scc(l, node_2_status, tmp_components, format="cluster"): """ Create a Strongly Connected Component from the link *l* :param format: :param l: :param node_2_status: :param tmp_components: :return: """ t0, t1, u, v = l new_id_comp = len(tmp_components) node_2_status[u] = [1, new_id_comp] node_2_status[v] = [1, new_id_comp] if format == "object_with_links": lks = [[t0, t1, u, v]] else: lks = None tmp_components.append(StronglyConnectedComponent(times=[t0, t0], nodes={u, v}, active_links={(u, v)}, links=lks))
[docs]def close_component(comp, t, final_components, cnt_scc_id, condensation_dag=None, format="cluster", streaming_output=None): """ Close current component :param streaming_output: :param comp: :param t: :param final_components: :param cnt_scc_id: :param condensation_dag: :param format: :return: """ c = None if format == "object" or format == "object_with_links": copy_comp = copy.copy(comp) copy_comp.set_end_time(t) # Put an end time to the previous component copy_comp.id = cnt_scc_id final_components.append(copy_comp) c = copy_comp elif format == "cluster": c = [(comp.times[0], t, n) for n in comp.nodes] final_components.append(c) elif format == "streaming": n_nodes = len(comp.nodes) if streaming_output: streaming_output.write(str(comp.times[0]) + ";" + str(t) + ";" + str(n_nodes)) streaming_output.write("\n") else: c = (comp.times[0], t, n_nodes) final_components.append(c) if condensation_dag: condensation_dag.add_node(c) cnt_scc_id += 1 return cnt_scc_id
[docs]class StronglyConnectedComponent(ConnectedComponent): def __init__(self, id=None, times=None, nodes=None, active_links=None, links=None ): """ A basic constructor for a connected component object :param id: an identifier (nojoke) :param times: [beginning time, ending time] :param nodes: A set of nodes present in the component :param active_links: A set of links present in the component (Only useful during components computation) :param links: a list of 'segmented' links """ super().__init__(id, times, nodes, active_links, links) def __repr__(self): rep = "Id SCC :" + str(self.id) + " time window :" + str(self.times) + "\n" rep += "Nodes :" + str(self.nodes) + "\n" rep += "Links :" + str(self.links) + "\n" return rep def __copy__(self): comp_copy = StronglyConnectedComponent() comp_copy.times = copy.copy(self.times) if self.links: comp_copy.links = [copy.copy(l) for l in self.links] else: comp_copy.links = None comp_copy.nodes = copy.copy(self.nodes) return comp_copy
[docs] def split(self): R = super().split() # (current_nodes,current_links,active_links) return [StronglyConnectedComponent(nodes=c[0], links=c[1], active_links=c[2]) for c in R]
[docs] def get_stable_components(self, format="object"): """ Compute the stable connected components included in the current ``StronglyConnectedComponent`` :param format: :return: A list of ``StableConnectedComponents`` objects """ stable_components = [] if self.links and len(self.get_interactions_times()) > 1: interact_times = self.get_interactions_times() time_to_pos = {t: i for t, i in zip(interact_times, range(len(interact_times)))} inter_nodes = [set() for _ in range(len(interact_times) - 1)] inter_links = [[] for _ in range(len(interact_times) - 1)] for l in self.links: t0, t1, u, v = l t0 = max(t0, self.times[0]) t1 = min(t1, self.times[1]) for i in range(time_to_pos[t0], time_to_pos[t1]): inter_nodes[i].add(u) inter_nodes[i].add(v) inter_links[i].append((t0, t1, u, v)) if format == "object" or format == "object_with_links": for j in range(len(interact_times) - 1): c = StronglyConnectedComponent(id=(self.id, j), times=(interact_times[j], interact_times[j + 1]), nodes=set( [u for u in inter_nodes[time_to_pos[interact_times[j]]]]), links=[l for l in inter_links[time_to_pos[interact_times[j]]]] ) stable_components.append(c) if format == "cluster": for j in range(len(interact_times) - 1): c = [(interact_times[j], interact_times[j + 1], u) for u in inter_nodes[time_to_pos[interact_times[j]]]] stable_components.append(c) else: if format == "object" or format == "object_with_links": if self.links: stable_components = [StronglyConnectedComponent(id=self.id, times=self.times, nodes=self.nodes, links=self.links)] else: stable_components = [StronglyConnectedComponent(id=self.id, times=self.times, nodes=self.nodes)] if format == "cluster": stable_components = [[(self.times[0], self.times[1], u) for u in self.nodes]] return stable_components