Source code for straph.parser.parser

# Copyright (C) 2017-2021 Léo Rannou - Sorbonne Université/LIP6 - Thales
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import csv
import dateutil.parser as du
import dpkt
import json
import math
import os
import socket
import time
from collections import defaultdict
from sortedcollections import SortedSet
from tqdm import tqdm

from straph import stream as sg

# TODO : parse PCAP (adpat pcap_to_csv and shit), see pcap_reader.
# TODO : parse net, to finish (for Pajek datasets).

__nb_2_protocol__ = {0: 'IPv6_HbH',  # IPv6 Hop by Hop
                     1: 'ICMP',  # Internet Control Message
                     2: 'IGMP',  # Internet Group Management
                     3: 'GGP',  #  Gateway-to-Gateway
                     4: 'IPv4',
                     5: 'ST2',  #  Sream v2 aka "IPv5"
                     6: 'TCP',  # Transmission Control
                     7: 'CBT',
                     8: 'EGP',  # Exterior Gateway Protocol
                     17: 'UDP',  # User Datagram
                     41: 'IPv6',
                     43: 'IPv6_Route',  #  Routing header for IPv6
                     47: 'GRE',  # Generic Routing encapsulation
                     50: 'ESP',  # Encap Security Payload
                     51: 'AH',  # Authenfication Header
                     58: 'IPv6_ICMP',  # IPV6 ICMP
                     103: 'PIM',  # Protocol Independent Multicast
                     }


[docs]def inet_to_str(inet): """ Convert inet object to a string :param inet: inet network address :return: str: Printable/readable IP address """ # First try ipv4 and then ipv6 try: return socket.inet_ntop(socket.AF_INET, inet) except ValueError: return socket.inet_ntop(socket.AF_INET6, inet)
[docs]def datetime_to_timestamp(s): return du.parse(s).timestamp()
[docs]def pcap_to_csv(file_input, destination, protocol=None): """ Transform a pcap file to a csv :param file_input: :param destination: :param protocol: :return: """ counter = 0 dict_ip = defaultdict(lambda: len(dict_ip)) dict_label = {} if protocol: protocol = [key for key, value in __nb_2_protocol__.items() if value == protocol][0] print("protocol :", protocol) with open(destination, 'w') as output, open(file_input, 'rb') as input: writer = csv.writer(output, delimiter=';') writer.writerow(["time", "src", "dst", "protocol", "len", "src_port", "dst_port"]) for ts, pkt in tqdm(dpkt.pcap.Reader(input)): eth = dpkt.ethernet.Ethernet(pkt) # if counter == 150000000000000000000000000000: # print("Time end dkpt :", time.time() - start) # break if counter == 0: t0 = ts counter += 1 if isinstance(eth.data, bytes): continue ip = eth.data if ip.src is not None: ip_src = inet_to_str(ip.src).encode() else: continue ip_dst = inet_to_str(ip.dst).encode() id_src = dict_ip[ip_src] id_dst = dict_ip[ip_dst] dict_label[id_src] = ip_src dict_label[id_dst] = ip_dst # if counter % 1000000 == 0: # print("Counter dkpt:", counter, "time dkpt:", time.time() - start) # We ignore 'ICMP' protocols, ICMP scan useless if ip.p == 1: continue if protocol: if ip.p != protocol: continue if isinstance(eth.data, dpkt.ip6.IP6): # print("IPv6 !") len_pckt = ip.plen else: len_pckt = ip.len if isinstance(ip.data, dpkt.tcp.TCP) or isinstance(ip.data, dpkt.udp.UDP): tcp = ip.data src_port = tcp.sport dst_port = tcp.dport else: src_port = None dst_port = None # print("ip src :", ip_src, " ip dst :", ip_dst, " ip protocol :", # protocol, " ip len :", len_pckt,"src port :", tcp.sport, "dest port :", tcp.dport) writer.writerow([round(ts - t0, 6), id_src, id_dst, ip.p, len_pckt, src_port, dst_port])
[docs]def parse_net(input_file, output_file_nodes, output_file_links, link_duration=1): """ A Stream Graph reader for dataset issued by Pajek Format of interactions : .net :param input_file: :param output_file_nodes: :param output_file_links: :param link_duration: :return: """ E = defaultdict(list) W = defaultdict(list) type_node = None with open(input_file, 'r') as input_file: for line in input_file: l = line.strip().split() if l[0] == '*Vertices': type_node = True continue if l[0] == '*Edges': type_node = False continue if type_node: continue else: u, v = int(l[0]), int(l[1]) e = (u, v) if u == v: # SELF LOOP : we ignore it continue t = l[3].strip('[').strip(']').split(',') for current_time in t: current_time = int(current_time) if e in E and E[e][-1] >= current_time: # print("Extend Link Presence") E[e][-1] = max(E[e][-1], current_time + link_duration) else: E[e] += [current_time, current_time + link_duration] if u in W and W[u][-1] >= current_time: # print("Extend Node Presence") W[u][-1] = max(W[u][-1], current_time + link_duration) else: W[u] += [current_time, current_time + link_duration] if v in W and W[v][-1] >= current_time: # print("Extend Node Presence") W[v][-1] = max(W[v][-1], current_time + link_duration) else: W[v] += [current_time, current_time + link_duration] with open(output_file_links, 'w') as output_file: for k, v in E.items(): output_file.write(str(k[0]) + " " + str(k[1]) + " ") for t in v: output_file.write(str(t) + " ") output_file.write("\n") with open(output_file_nodes, 'w') as output_file: for k, v in W.items(): output_file.write(str(k) + " ") for t in v: output_file.write(str(t) + " ") output_file.write("\n") return None
[docs]def parse_csv(input_file, entry_format, **kwargs): """ Reader for .csv files :param input_file: :param entry_format: :param kwargs: :return: """ # Convert entry format t_pos, b_pos, e_pos, link_duration_pos = None, None, None, None if len(entry_format) == 3: (t_pos, u_pos, v_pos) = entry_format['t_pos'], entry_format['u_pos'], entry_format['v_pos'] elif len(entry_format) == 4 and 'link_duration_pos' in entry_format: (t_pos, link_duration_pos, u_pos, v_pos) = entry_format['t_pos'], entry_format['link_duration_pos'], \ entry_format['u_pos'], entry_format['v_pos'] elif len(entry_format) == 4 and 'b_pos' in entry_format: (b_pos, e_pos, u_pos, v_pos) = entry_format['b_pos'], entry_format['e_pos'], \ entry_format['u_pos'], entry_format['v_pos'] else: raise TypeError("Entry format is not supported, see documentation !") E = defaultdict(list) W = defaultdict(list) cnt_rows = 0 nodes_to_label = {} label_to_id = defaultdict(lambda: len(label_to_id)) min_t, max_t = math.inf, -math.inf with open(input_file, 'r') as input_file: reader = csv.reader(input_file, delimiter=kwargs['delimiter']) if kwargs['ignore_header']: next(reader, None) if kwargs['link_duration']: link_duration = kwargs['link_duration'] elif 'link_duration_pos' not in entry_format and 'b_pos' not in entry_format: link_duration = 0 print("[WARNING] No link_duration provided, links durations are set to 0.") for line in tqdm(reader, desc='Parsing CSV', total=kwargs['nrows'] - 1): cnt_rows += 1 if cnt_rows > kwargs['nrows']: break if kwargs['nodes_to_label']: # Convert Label to int u_label = line[u_pos] v_label = line[v_pos] if u_label in {'', ' '} or v_label in {'', ' '}: # print(" Blank node line:",cnt_rows) # print(" Content:",line) continue if u_label == v_label: # SELF LOOP : we ignore it continue # If we haven't these label before they are assigned to len(label_to_id) = new_id u = label_to_id[u_label] v = label_to_id[v_label] nodes_to_label[u] = u_label nodes_to_label[v] = v_label else: u = int(line[u_pos]) v = int(line[v_pos]) if u == v: # SELF LOOP : we ignore it continue if kwargs['time_is_datetime']: if 't_pos' in entry_format: t = datetime_to_timestamp(line[t_pos]) elif 'b_pos' in entry_format: b = datetime_to_timestamp(line[b_pos]) e = datetime_to_timestamp(line[e_pos]) link_duration = e - b t = b else: if 't_pos' in entry_format: t = float(line[t_pos].replace(',', '')) elif 'b_pos' in entry_format: b = float(line[b_pos].replace(',', '')) e = float(line[e_pos].replace(',', '')) link_duration = e - b t = b if 'link_duration_pos' in entry_format: link_duration = float(line[link_duration_pos].replace(',', '')) min_t, max_t = min(min_t, t), max(max_t, t + link_duration) if kwargs['is_directed']: l = (u, v) else: if (v, u) in E: l = (v, u) else: l = (u, v) if l in E and E[l][-1] >= t: E[l][-1] = max(E[l][-1], t + link_duration) else: E[l] += [t, t + link_duration] if kwargs['is_link_stream'] is False: if u in W and W[u][-1] >= t: W[u][-1] = max(W[u][-1], t + link_duration) else: W[u] += [t, t + link_duration] if v in W and W[v][-1] >= t: W[v][-1] = max(W[v][-1], t + link_duration) else: W[v] += [t, t + link_duration] else: W[u] = [min_t, max_t] W[v] = [min_t, max_t] if kwargs['is_link_stream'] is True: W = {k: [min_t, max_t] for k in W.keys()} if kwargs['delta']: delta = kwargs['delta'] chrono = time.time() W, E = approximate_events(W, E, delta) print("\t Approximate events with delta :", delta, " in ", time.time() - chrono) S = sg.StreamGraph(times=[min_t, max_t], nodes=list(W.keys()), links=list(E.keys()), node_presence=[W[k] for k in W.keys()], link_presence=[E[k] for k in E.keys()], node_to_label=nodes_to_label, node_to_id={i: i for i in W.keys()}) return S
[docs]def approximate_events(W, E, delta): """ Approximation method reducing the number of distinct event times while preserving connectivity properties of the original dataset. :param W: :param E: :param delta: :return: """ # Seems strange but avoid float imprecision event_times = sorted(set([t for np in W.values() for t in np] + [t for lp in E.values() for t in lp])) t_old = event_times[0] discretized_event_times = SortedSet() discretized_event_times.add(t_old) for t in event_times: if t - t_old >= delta: discretized_event_times.add(t) t_old = t new_W = {} for n, np in W.items(): new_W[n] = [] for t0, t1 in zip(np[::2], np[1::2]): assert t1 - t0 >= delta if t0 not in discretized_event_times: # # Catch time after t0 in discretized event times: t0 = discretized_event_times[discretized_event_times.bisect(t0)] if t1 not in discretized_event_times: # #Catch time before t1 in discretize event times: t1 = discretized_event_times[discretized_event_times.bisect(t1) - 1] # # new_W[n] += [t0, t1] # a, b = delta * math.ceil(t0 / delta), delta * math.floor(t1 / delta) # # if math.isclose(a, t0): # a = t0 # if math.isclose(b, t1): # b = t1 new_W[n] += [t0, t1] new_E = {} for l, lp in E.items(): new_E[l] = [] for t0, t1 in zip(lp[::2], lp[1::2]): assert t1 - t0 >= delta if t0 not in discretized_event_times: # # Catch time after t0 in discretized event times: t0 = discretized_event_times[discretized_event_times.bisect(t0)] if t1 not in discretized_event_times: # # Catch time before t1 in discretize event times: t1 = discretized_event_times[discretized_event_times.bisect(t1) - 1] # new_E[l] += [t0, t1] # a, b = delta * math.ceil(t0 / delta), delta * math.floor(t1 / delta) # if math.isclose(a, t0): # a = t0 # if math.isclose(b, t1): # b = t1 new_E[l] += [t0, t1] return new_W, new_E
[docs]def parse_json(input_file, entry_format, **kwargs): """ A Stream Graph reader for JSON dataset. :param input_file: :param entry_format: :param kwargs: :return: """ # Convert entry format u_pos, v_pos, t_pos, b_pos, e_pos, link_duration_pos = None, None, None, None, None, None if len(entry_format) == 3: (t_pos, u_pos, v_pos) = entry_format['t_pos'], entry_format['u_pos'], entry_format['v_pos'] elif len(entry_format) == 4 and 'link_duration_pos' in entry_format: (t_pos, link_duration_pos, u_pos, v_pos) = entry_format['t_pos'], entry_format['link_duration_pos'], \ entry_format['u_pos'], entry_format['v_pos'] elif len(entry_format) == 4 and 'b_pos' in entry_format: (b_pos, e_pos, u_pos, v_pos) = entry_format['b_pos'], entry_format['e_pos'], \ entry_format['u_pos'], entry_format['v_pos'] E = defaultdict(list) W = defaultdict(list) cnt_rows = 0 id_to_label = {} label_to_id = defaultdict(lambda: len(label_to_id)) min_t, max_t = math.inf, -math.inf with open(input_file, 'r') as input_file: reader = json.load(input_file) for line in tqdm(reader, desc="Parsing JSON", total=kwargs['nrows']): cnt_rows += 1 # if cnt_rows % 100000 == 0: # print((cnt_rows / kwargs['nrows']) * 100, "% loaded") if cnt_rows > kwargs['nrows']: break if kwargs['nodes_to_label']: # Convert Label to int u_label = line[u_pos] v_label = line[v_pos] # If we haven't these label before they are assigned to len(label_to_id = new_id) u = label_to_id[u_label] v = label_to_id[v_label] id_to_label[u] = u_label id_to_label[v] = v_label else: u = int(line[u_pos]) v = int(line[v_pos]) if kwargs['time_is_datetime']: if 't_pos' in entry_format: t = datetime_to_timestamp(line[t_pos]) elif 'b_pos' in entry_format: b = datetime_to_timestamp(line[b_pos]) e = datetime_to_timestamp(line[e_pos]) link_duration = e - b t = b else: if 't_pos' in entry_format: t = float(line[t_pos].replace(',', '')) elif 'b_pos' in entry_format: b = float(line[b_pos].replace(',', '')) e = float(line[e_pos].replace(',', '')) link_duration = e - b t = b min_t, max_t = min(min_t, t), max(max_t, t + link_duration) if kwargs['link_duration']: link_duration = kwargs['link_duration'] elif 'link_duration_pos' in entry_format: link_duration = float(line[link_duration_pos].replace(',', '')) if kwargs['is_directed']: l = (u, v) else: if (v, u) in E: l = (v, u) else: l = (u, v) if u == v: # SELF LOOP : we ignore it continue if l in E and E[l][-1] >= t: E[l][-1] = max(E[l][-1], t + link_duration) else: E[l] += [t, t + link_duration] if kwargs['is_link_stream'] is False: if u in W and W[u][-1] >= t: W[u][-1] = max(W[u][-1], t + link_duration) else: W[u] += [t, t + link_duration] if v in W and W[v][-1] >= t: W[v][-1] = max(W[v][-1], t + link_duration) else: W[v] += [t, t + link_duration] S = sg.StreamGraph(times=[min_t, max_t], nodes=list(W.keys()), links=list(E.keys()), node_presence=[W[k] for k in W.keys()], link_presence=[E[k] for k in E.keys()], node_to_label=id_to_label) return S
[docs]def parse_pcap(file_input, entry_format, **options): pcap_to_csv(file_input, "tmp.csv") S = parse_csv("tmp.csv", entry_format, **options) os.remove("tmp.csv") return S
[docs]def parser(input_file, input_format, entry_format, output_file=None, simplify_presence=False, output_format='sg', **kwargs): """ Straph's tunable parser. Compatible with several data formats: CSV, TSV, JSon and PCAP. :param simplify_presence: :param input_file: Input FILE (name only) :param input_format: Format d'entrée acceptés : JSON, CSV, PCAP :param entry_format: Format of each line to be readed (t,u,v) = (line[x],line[y],line[w]) :param output_file: Output FILE (name only) :param output_format: Format de sortie : SG,SGF,json :return: """ with open(input_file) as ipt: options = {'delimiter': ',', 'is_link_stream': False, 'is_directed': False, 'nrows': sum(1 for _ in ipt), 'link_duration': False, 'order_sgf': False, 'ignore_header': True, 'nodes_to_label': False, 'time_is_datetime': False, 'delta': None, } options.update(kwargs) if ('t_pos' in entry_format or 'link_duration_pos' in entry_format) and \ ('b_pos' in entry_format or 'e_pos' in entry_format): raise TypeError('Invalide entry format :' + str(entry_format) + ' should be of type {t_pos,u_pos,v_pos} or' ' {t_pos,link_duration_pos,u_pos,v_pos} or' '{b_pos,e_pos,u_pos,v_pos} !') if options['link_duration'] and ('b_pos' in entry_format or 'e_pos' in entry_format): raise TypeError('link_duration is incompatible with entry format : {b_pos,e_pos,u_pos,v_pos} !') if options['link_duration'] and ('link_duration_pos' in entry_format): raise TypeError('link_duration is incompatible with entry format : {t_pos,link_duration_pos,u_pos,v_pos} !') if input_format == 'csv': S = parse_csv(input_file, entry_format, **options) elif input_format == 'json': S = parse_json(input_file, entry_format, **options) elif input_format == 'pcap': S = parse_pcap(input_file, entry_format, **options) elif input_format == 'net': raise ValueError("File format 'net' not yet supported.") # S = parse_net(input_format, entry_format, **options) else: raise TypeError('Format not supported') if simplify_presence is True: S.node_presence = [[np[0], np[-1]] for np in S.node_presence] # Set nodes to be present from ther 1st intercations to their last if output_file is not None: if isinstance(output_format, str): output_format = [output_format] for of in output_format: if of == 'sg': S.write_to_sg(output_file) elif of == 'json': S.write_to_json(output_file) return S
[docs]def sort_csv(input_file, entry_format, output=None, **kwargs): with open(input_file) as ipt: options = {'delimiter': ',', 'is_link_stream': False, 'is_directed': False, 'nrows': sum(1 for _ in ipt), 'link_duration': False, 'order_sgf': False, 'ignore_header': True, 'nodes_to_label': False, 'time_is_datetime': False, 'delta': None, } options.update(kwargs) list_lines = [] with open(input_file, 'r') as input: reader = csv.reader(input, delimiter=options['delimiter']) if options['ignore_header']: next(reader, None) for line in tqdm(reader, desc='Reading CSV before sorting', total=options['nrows']): list_lines.append(line) if len(entry_format) == 3: (t_pos, u_pos, v_pos) = entry_format['t_pos'], entry_format['u_pos'], entry_format['v_pos'] if options['time_is_datetime']: list_lines = sorted(list_lines, key=lambda x: datetime_to_timestamp(x[t_pos])) else: list_lines = sorted(list_lines, key=lambda x: float(x[t_pos])) elif len(entry_format) == 4 and 'link_duration_pos' in entry_format: (t_pos, link_duration_pos, u_pos, v_pos) = entry_format['t_pos'], entry_format['link_duration_pos'], \ entry_format['u_pos'], entry_format['v_pos'] if options['time_is_datetime']: list_lines = sorted(list_lines, key=lambda x: datetime_to_timestamp(x[t_pos])) else: list_lines = sorted(list_lines, key=lambda x: float(x[t_pos])) elif len(entry_format) == 4 and 'b_pos' in entry_format: (b_pos, e_pos, u_pos, v_pos) = entry_format['b_pos'], entry_format['e_pos'], \ entry_format['u_pos'], entry_format['v_pos'] if options['time_is_datetime']: list_lines = sorted(list_lines, key=lambda x: datetime_to_timestamp(x[b_pos])) else: list_lines = sorted(list_lines, key=lambda x: float(x[b_pos])) if output is None: output = input_file with open(output, 'w', newline='') as output: writer = csv.writer(output, delimiter=options['delimiter']) for line in tqdm(list_lines, desc='Writing CSV'): writer.writerow(line)