...
 
Commits (2)
......@@ -165,7 +165,7 @@ def start_sniffer_on_network(graph: networkx.Graph,
# First the command is echoed
echos = ssh_helper.unchecked_run_commands_on_many_hosts(sessions, commands)
# Next we check for potential outputs
outputs = list(map(lambda s: ssh_helper.get_output(s, timeout=1), sessions))
outputs = list(map(lambda s: ssh_helper._get_output(s, timeout=1), sessions))
# Some real basic error checking
errors = None
......
......@@ -42,6 +42,9 @@ SILK_CONF_SENSOR_LINE_TEMPLATE="sensor {uuid} {name} \"{description}\""
FILE_PUSH_COMMAND_TEMPLATE= "sudo mkdir -p /{path}/ && sudo chown $USER /{path}/ && cat <<EOF >/{path}/{filename}\n{data}\nEOF"
COLLECTOR_LAUNCH_COMMAND="mkdir -p /{data_path}/{log_dir} && kill $(cat /{data_path}/{log_dir}/rwflowpack.pid) 2>/dev/null;" \
"rwflowpack --sensor-configuration=/{data_path}/sensors.conf --root-directory=/{data_path}/ --log-directory=/{data_path}/{log_dir}/"
SENSOR_CONF_PROBE_BLOCK_TEMPLATE="""probe {name} netflow-v9
listen-on-port {portnum}
protocol udp
......@@ -84,6 +87,11 @@ def _update_netgraph(netgraph, border_routers: List[str], listening_ports: List[
netgraph._node[router]['sensor_port'] = port
def _write_collector_launch(session, collector_launch_command: str) -> str:
output = ssh_helper.run_commands_on_many_hosts([session], [collector_launch_command])[0]
return output
def _write_sensors_conf(session, sensor_lines: List[SensorLine], port_nums: List[int]) -> None:
probes: List[str] = []
groups: List[str] = []
......@@ -108,7 +116,7 @@ def _write_sensors_conf(session, sensor_lines: List[SensorLine], port_nums: List
command = FILE_PUSH_COMMAND_TEMPLATE.format(filename="sensors.conf", path="/data/", data=sensors_conf)
ssh_helper.run_command_on_host(session, command)
ssh_helper.run_commands_on_many_hosts([session], [command])
def _write_silk_conf(session, sensor_lines: List[SensorLine]) -> None:
......@@ -132,7 +140,15 @@ def _write_silk_conf(session, sensor_lines: List[SensorLine]) -> None:
command = FILE_PUSH_COMMAND_TEMPLATE.format(filename="silk.conf", path="/data/", data=silk_conf)
ssh_helper.run_command_on_host(session, command)
ssh_helper.run_commands_on_many_hosts([session], [command])
def _build_collector_launch_command() -> str:
return COLLECTOR_LAUNCH_COMMAND.format(
data_path="/data/",
log_dir="/logs/",
)
def _build_port_nums(sensor_lines: List[SensorLine]) -> List[int]:
"""
......@@ -143,6 +159,7 @@ def _build_port_nums(sensor_lines: List[SensorLine]) -> List[int]:
port_nums = [18000 + line.uuid for line in sensor_lines]
return port_nums
def _build_sensor_lines(border_routers: List[str]) -> List[SensorLine]:
"""
Assign uuid, names, and descriptions for all border routers
......@@ -181,9 +198,12 @@ def configure(netgraph: networkx.Graph, controller_node: str, border_routers: Li
sensor_lines: List[SensorLine] = _build_sensor_lines(border_routers)
port_nums: List[int] = _build_port_nums(sensor_lines)
launch_command: str = _build_collector_launch_command()
_write_silk_conf(collector_session, sensor_lines)
_write_sensors_conf(collector_session, sensor_lines, port_nums)
_write_collector_launch(collector_session, launch_command)
_update_netgraph(netgraph, border_routers, port_nums)
......
......@@ -118,7 +118,7 @@ def network_graph_logout(netgraph):
del netgraph._node[node]['session']
def get_output(session, encoding=sys.stdout.encoding, timeout=None):
def _get_output(session, encoding=sys.stdout.encoding, timeout=None):
"""
Decode the raw bytes written by the SSH session
......@@ -131,7 +131,7 @@ def get_output(session, encoding=sys.stdout.encoding, timeout=None):
return str(session.before.decode(encoding))
def run_command_on_host(session, command):
def _run_command_on_host(session, command):
"""
Run a specified command on a single host
......@@ -158,12 +158,12 @@ def unchecked_run_commands_on_many_hosts(sessions: List[pxssh.pxssh], commands:
session = sessions[host_idx]
command = commands[host_idx]
run_command_on_host(session, command)
_run_command_on_host(session, command)
for host_idx in range(0, num_hosts):
session = sessions[host_idx]
output = get_output(session)
output = _get_output(session)
outputs.append(output)
return outputs
......@@ -199,10 +199,9 @@ def get_exit_codes(sessions) -> List[int]:
"""
Get the exit code of the last command run in each session
"""
commands = ["echo $?" for session in sessions]
outputs = unchecked_run_commands_on_many_hosts(sessions, commands)
codes_strs = [code.split()[2] for code in outputs] # Get just the return code (not the echo'ed command)
codes_strs = [list(filter(lambda line: len(line) > 0, code.split("\r\n")))[-1] for code in outputs] # Get just the return code (not the echo'ed command)
codes = [int(code) for code in codes_strs]
return codes
......