diff --git a/pelab/monitor/monitor.py b/pelab/monitor/monitor.py index 4c53a0b7ee23dfa558f95401ab191ddcfe4d5fcb..44ba142a551f71c6d1de39f98f20434ffb07edf8 100755 --- a/pelab/monitor/monitor.py +++ b/pelab/monitor/monitor.py @@ -14,6 +14,15 @@ import socket import select import re +CODE_BANDWIDTH = 1 +CODE_DELAY = 2 +CODE_LOSS = 3 +CODE_LIST_DELAY = 4 + +PACKET_WRITE = 1 +PACKET_SEND_BUFFER = 2 +PACKET_RECEIVE_BUFFER = 3 + emulated_to_real = {} real_to_emulated = {} emulated_to_interface = {} @@ -21,7 +30,7 @@ ip_mapping_filename = '' this_experiment = '' this_ip = '' stub_ip = '' -netmon_output_version = 1 +netmon_output_version = 2 total_size = 0 last_total = -1 @@ -96,6 +105,7 @@ def populate_ip_tables(): def get_next_packet(): global total_size, last_total + event_code = PACKET_WRITE line = sys.stdin.readline() if line == "": raise EOFError @@ -110,11 +120,11 @@ def get_next_packet(): linexp = re.compile('^(\d+\.\d+) > (\d+):(\d+\.\d+\.\d+\.\d+):(\d+) (\((\d+)\))?') match = linexp.match(line) - conexp = re.compile('^(New|Closed): (\d+):(\d+\.\d+\.\d+\.\d+):(\d+)') + conexp = re.compile('^(New|Closed|SO_RCVBUF|SO_SNDBUF): (\d+):(\d+\.\d+\.\d+\.\d+):(\d+) ((\d+))?') cmatch = conexp.match(line) - if (match) : + if match: localport = 0 # We may not get this one - if (netmon_output_version == 1): + if netmon_output_version == 1: time = float(match.group(1)) ipaddr = match.group(2) remoteport = int(match.group(3)) @@ -122,7 +132,7 @@ def get_next_packet(): size = int(match.group(5)) elif (netmon_output_version == 2): time = float(match.group(1)) - localport = match.group(2) + localport = int(match.group(2)) ipaddr = match.group(3) remoteport = int(match.group(4)) size_given = match.group(5) != '' @@ -133,16 +143,29 @@ def get_next_packet(): if not size_given: size = 0 total_size = total_size + size - return (ipaddr, localport, remoteport, time, size) + return (ipaddr, localport, remoteport, time, size, event_code) elif ((netmon_output_version == 2) and cmatch): # # Watch for new or closed connections # event = cmatch.group(1) - localport = cmatch.group(2) + localport = int(cmatch.group(2)) ipaddr = cmatch.group(3) + remoteport = int(cmatch.group(4)) + value_given = cmatch.group(5) != '' + value = int(cmatch.group(6)) + if not value_given: + value = 0 + sys.stdout.write("Got a connection event: " + event + "\n") - return None + if event == 'SO_RCVBUF': + event_code = PACKET_RECEIVE_BUFFER + elif event == 'SO_SNDBUF': + event_code = PACKET_SEND_BUFFER + sys.stdout.write('Packet send buffer was set to: ' + str(value) + '\n') + else: + return None + return (ipaddr, localport, remoteport, 0, value, event_code) else: sys.stdout.write('skipped line in the wrong format: ' + line) return None @@ -158,25 +181,39 @@ def receive_characteristic(conn): # sys.stdout.write('received: ' + str(dest) + ' ' + str(source_port) + ' ' # + str(dest_port) + ' ' + str(command) + ' ' + str(value) # + '\n') - if command == 1: + if command == CODE_BANDWIDTH: # value is bandwidth in kbps + sys.stdout.write('Bandwidth: ' + str(value) + '\n'); set_bandwidth(value, dest) - elif command == 2: + elif command == CODE_DELAY: # value is delay in milliseconds + sys.stdout.write('Delay: ' + str(value) + '\n'); set_delay(value, dest) - elif command == 3: + elif command == CODE_LOSS: # value is packet loss in packets per billion + sys.stdout.write('Loss: ' + str(value) + '\n'); set_loss(value/1000000000.0, dest) + elif command == CODE_LIST_DELAY: + # value is the number of samples + buffer = conn.recv(value*8) + # Dummynet isn't quite set up to deal with this yet, so ignore it. + sys.stdout.write('Ignoring delay list of size: ' + str(value) + '\n') + else: + sys.stdout.write('Other: ' + str(command) + ', ' + str(value) + '\n'); return True elif len(buffer) == 0: return False def set_bandwidth(kbps, dest): - sys.stdout.write(' bandwidth=' + str(kbps) + '\n') +# sys.stdout.write(' bandwidth=' + str(kbps) + '\n') return set_link(this_ip, dest, 'bandwidth=' + str(kbps)) # Set delay on the link. We are given round trip time. def set_delay(milliseconds, dest): + now = time.time() + sys.stderr.write('purple\n') + sys.stderr.write('line ' + ('%0.6f' % now) + ' 0 ' + ('%0.6f' % now) + + ' ' + str(milliseconds) + '\n') # Set the delay from here to there to 1/2 rtt. error = set_link(this_ip, dest, 'delay=' + str(milliseconds/2)) if error == 0: @@ -204,11 +241,16 @@ def send_destinations(conn, packet_list): prev_time = packet_list[0][3] for packet in packet_list: ip = ip_to_int(emulated_to_real[packet[0]]) + delta = int((packet[3] - prev_time) * 1000) + if packet[3] == 0: + delta = 0 output = (output + save_int(ip) + save_short(packet[1]) + save_short(packet[2]) - + save_int(int((packet[3] - prev_time) * 1000)) - + save_int(packet[4])) - prev_time = packet[3] + + save_int(delta) + + save_int(packet[4]) + + save_short(packet[5])) + if packet[3] != 0: + prev_time = packet[3] conn.sendall(output) def load_int(str): diff --git a/pelab/monitor/run-iperf.sh b/pelab/monitor/run-iperf.sh new file mode 100644 index 0000000000000000000000000000000000000000..759c78372e21a3e64fde42121e5d9a5266575737 --- /dev/null +++ b/pelab/monitor/run-iperf.sh @@ -0,0 +1 @@ +sh instrument.sh ../iperf -i 0.5 -c elab1 -t 30 diff --git a/pelab/reset.sh b/pelab/reset.sh new file mode 100644 index 0000000000000000000000000000000000000000..e83979fa5e9e52879fcf03861fe8b60a2a6938be --- /dev/null +++ b/pelab/reset.sh @@ -0,0 +1,12 @@ +/usr/testbed/bin/tevc -e tbres/pelab now elabc-elab0 modify dest=10.0.0.2 delay=5 +/usr/testbed/bin/tevc -e tbres/pelab now elabc-elab1 modify dest=10.0.0.1 delay=5 +/usr/testbed/bin/tevc -e tbres/pelab now elabc-elab0 modify dest=10.0.0.2 bandwidth=100000 +/usr/testbed/bin/tevc -e tbres/pelab now elabc-elab1 modify dest=10.0.0.1 bandwidth=100000 +/usr/testbed/bin/tevc -e tbres/pelab now elabc-elab0 modify dest=10.0.0.2 lpr=0.0 +/usr/testbed/bin/tevc -e tbres/pelab now elabc-elab1 modify dest=10.0.0.1 lpr=0.0 +/usr/testbed/bin/tevc -e tbres/pelab now planetc-planet0 modify dest=10.1.0.2 delay=5 +/usr/testbed/bin/tevc -e tbres/pelab now planetc-planet1 modify dest=10.1.0.1 delay=5 +/usr/testbed/bin/tevc -e tbres/pelab now planetc-planet0 modify dest=10.1.0.2 bandwidth=10000 +/usr/testbed/bin/tevc -e tbres/pelab now planetc-planet1 modify dest=10.1.0.1 bandwidth=10000 +/usr/testbed/bin/tevc -e tbres/pelab now planetc-planet0 modify dest=10.1.0.2 lpr=0.0 +/usr/testbed/bin/tevc -e tbres/pelab now planetc-planet1 modify dest=10.1.0.1 lpr=0.0 diff --git a/pelab/stub/Makefile b/pelab/stub/Makefile index 7a4887da680ebae656fa9c769fc272be958035e2..b584b897c1d7d72cc7e8ec483fc0e4fc83c7e56d 100644 --- a/pelab/stub/Makefile +++ b/pelab/stub/Makefile @@ -1,15 +1,25 @@ -all: stubd stub-monitor +all: stubd stub-monitor dumb-client dumb-server -stubd: stubd.o stub-pcap.o lookup.o stub.h - g++ -g -Wall stubd.o stub-pcap.o lookup.o -lm -lpcap -o stubd +stubd: stubd.o stub-pcap.o lookup.o log.o + g++ -g -Wall stubd.o stub-pcap.o lookup.o log.o -lm -lpcap -o stubd stub-monitor: stub-monitor.c stub.h gcc -g -Wall stub-monitor.c -o stub-monitor -stubd.o: stubd.c stub.h +stubd.o: stubd.c stub.h log.h gcc -g -Wall -c stubd.c -stub-pcap.o: stub-pcap.c stub.h +stub-pcap.o: stub-pcap.c stub.h log.h gcc -g -Wall -c stub-pcap.c -lookup.o: lookup.cc stub.h +lookup.o: lookup.cc stub.h log.h g++ -g -Wall -c lookup.cc +log.o: log.c log.h + gcc -g -Wall -c log.c +dumb-client: dumb-client.o + gcc -g -Wall dumb-client.o -lm -lpcap -o dumb-client +dumb-client.o: dumb-client.c stub.h + gcc -g -Wall -c dumb-client.c +dumb-server: dumb-server.o + gcc -g -Wall dumb-server.o -lm -lpcap -o dumb-server +dumb-server.o: dumb-server.c stub.h + gcc -g -Wall -c dumb-server.c clean: rm *.o stubd stubm stub-monitor diff --git a/pelab/stub/dumb-client.c b/pelab/stub/dumb-client.c new file mode 100644 index 0000000000000000000000000000000000000000..d8536d406df11b8cd413760bacddfbdcce24589a --- /dev/null +++ b/pelab/stub/dumb-client.c @@ -0,0 +1,47 @@ +// dumb-client.c + +#include "stub.h" + +int main(int argc, char * argv[]) +{ + char buffer[MAX_PAYLOAD_SIZE] = {0}; + int connection = -1; + int error = 0; + struct sockaddr_in address; + + if (argc != 3) + { + printf("Usage: %s \n", argv[0]); + return 1; + } + + connection = socket(AF_INET, SOCK_STREAM, 0); + if (connection == -1) + { + printf("Failed socket()\n"); + return 1; + } + + address.sin_family = AF_INET; + address.sin_port = htons(atoi(argv[2])); + inet_aton(argv[1], &address.sin_addr); + memset(address.sin_zero, '\0', 8); + + error = connect(connection, (struct sockaddr *)&address, sizeof(address)); + if (error == -1) + { + printf("Failed connect()\n"); + return 1; + } + + int count = 0; + + while (1) + { +// usleep(1000); + count = send(connection, buffer, MAX_PAYLOAD_SIZE, MSG_DONTWAIT); +// printf("Sent %d bytes\n", count); + } + + return 0; +} diff --git a/pelab/stub/dumb-server.c b/pelab/stub/dumb-server.c new file mode 100644 index 0000000000000000000000000000000000000000..45f380d6cd4d60d76745a946f74656693e0e3d4b --- /dev/null +++ b/pelab/stub/dumb-server.c @@ -0,0 +1,60 @@ +// dumb-server.c + +#include "stub.h" + +int main(int argc, char * argv[]) +{ + char buffer[MAX_PAYLOAD_SIZE] = {0}; + int connection = -1; + int error = 0; + struct sockaddr_in address; + int client = -1; + int address_size = sizeof(address); + + if (argc != 2) + { + printf("Usage: %s \n", argv[0]); + return 1; + } + + connection = socket(AF_INET, SOCK_STREAM, 0); + if (connection == -1) + { + printf("Failed socket()\n"); + return 1; + } + + address.sin_family = AF_INET; + address.sin_port = htons(atoi(argv[1])); + address.sin_addr.s_addr = htonl(INADDR_ANY); + memset(address.sin_zero, '\0', 8); + + error = bind(connection, (struct sockaddr *)&address, sizeof(address)); + if (error == -1) + { + printf("Failed bind()\n"); + return 1; + } + + error = listen(connection, 1); + if (error == -1) + { + printf("Failed listen()\n"); + return 1; + } + + client = accept(connection, (struct sockaddr *)&address, &address_size); + if (client == -1) + { + printf("Failed accept()\n"); + return 1; + } + + while (1) + { + recv(client, buffer, MAX_PAYLOAD_SIZE, 0); + } + + return 0; +} + diff --git a/pelab/stub/log.c b/pelab/stub/log.c new file mode 100644 index 0000000000000000000000000000000000000000..db2e724c2f527de83e9837b7c5222bce1d690620 --- /dev/null +++ b/pelab/stub/log.c @@ -0,0 +1,104 @@ +// log.c + +#include "stub.h" +#include "log.h" + +static FILE * logFile; +static int logFlags; +static int logTimestamp; + +void logInit(FILE * destFile, int flags, int useTimestamp) +{ + if (destFile == NULL || flags == LOG_NOTHING) + { + logFile = NULL; + logFlags = LOG_NOTHING; + logTimestamp = 0; + } + else + { + logFile = destFile; + logFlags = flags; + logTimestamp = useTimestamp; + } +} + +void logCleanup(void) +{ +} + +// Print the timestamp and type of logging to the logFile. +static void logPrefix(int flags, struct timeval const * timestamp) +{ + if (flags & CONTROL_SEND) + { + fprintf(logFile, "CONTROL_SEND "); + } + if (flags & CONTROL_RECEIVE) + { + fprintf(logFile, "CONTROL_RECEIVE "); + } + if (flags & TCPTRACE_SEND) + { + fprintf(logFile, "TCPTRACE_SEND "); + } + if (flags & TCPTRACE_RECEIVE) + { + fprintf(logFile, "TCPTRACE_RECEIVE "); + } + if (flags & SNIFF_SEND) + { + fprintf(logFile, "SNIFF_SEND "); + } + if (flags & SNIFF_RECEIVE) + { + fprintf(logFile, "SNIFF_RECEIVE "); + } + if (flags & PEER_WRITE) + { + fprintf(logFile, "PEER_WRITE "); + } + if (flags & PEER_READ) + { + fprintf(logFile, "PEER_READ "); + } + if (flags & MAIN_LOOP) + { + fprintf(logFile, "MAIN_LOOP "); + } + if (flags & LOOKUP_DB) + { + fprintf(logFile, "LOOKUP_DB "); + } + if (flags & DELAY_DETAIL) + { + fprintf(logFile, "DELAY_DETAIL "); + } + if (logTimestamp) + { + struct timeval now; + struct timeval const * timeptr = timestamp; + if (timeptr == NULL) + { + gettimeofday(&now, NULL); + timeptr = &now; + } + fprintf(logFile, "%d.%d ", (int)(timeptr->tv_sec), + (int)((timeptr->tv_usec)/1000)); + } + fprintf(logFile, ": "); +} + +void logWrite(int flags, struct timeval const * timestamp, + char const * format, ...) +{ + va_list va; + va_start(va, format); + if ((flags & logFlags) != 0) + { + logPrefix(flags & logFlags, timestamp); + vfprintf(logFile, format, va); + fprintf(logFile, "\n"); + } + va_end(va); +} diff --git a/pelab/stub/log.h b/pelab/stub/log.h new file mode 100644 index 0000000000000000000000000000000000000000..caa31beedc2d44cb216d40d74b78ef0bab89527a --- /dev/null +++ b/pelab/stub/log.h @@ -0,0 +1,83 @@ +// log.h + +#ifndef _LOG_H +#define _LOG_H + +#ifdef __cplusplus +extern "C" +{ +#endif + +// The logging framework. Use logInit() during initialization to set +// up logging. Then use log() to generate messages. logCleanup() +// should be called during program shutdown. + +// logInit() opens a file, sets what types of logging messages to +// actually print, and determines whether timestamps should be +// prepended onto messages. Logging is disabled if 'filename' is NULL +// or 'flags' is 0. +// +// 'filename' is the name of the file to be used for logging. It is +// opened in append mode. +// +// 'flags' is the logical or of one or more of the LOG_TYPEs defined +// below or 0. This determines which of the message types to print out. +// +// If 'useTimestamp' is 1, then timestamps are prepended to each +// message, otherwise, they are ignored. +void logInit(FILE * destFile, int flags, int useTimestamp); + +// logCleanup() cleans up the logging state. +void logCleanup(void); + +// The logWrite() function is used to print a logging message. +// +// 'flags' is the logical or of one or more of the LOG_TYPEs defined +// below. This tags the logging message so it can be selectively +// filtered. This should never be '0'. If it is '0', the message will +// not be printed. +// 'timestamp' is a pointer to a timestamp which will be prepended +// onto the logging message. If it is NULL, the current time is +// used. Note that whether or not the timestamp is printed at all is +// set elsewhere. +// 'format' printf-style formatting string. It and any extra variables +// act just like a normal printf call. This is where you actually +// write your message. It will be written to the logging file. +void logWrite(int flags, struct timeval const * timestamp, + char const * format, ...); + +enum LOG_TYPE +{ + // Information about control messages being sent to the monitor + CONTROL_SEND = 0x1, + // Information about control messages being received from the monitor + CONTROL_RECEIVE = 0x2, + // Information about sent control messages in a format used in + // tcptrace graphs. + TCPTRACE_SEND = 0x4, + // Information about received control messages in a format used in + // tcptrace graphs. + TCPTRACE_RECEIVE = 0x8, + // Information about sent packets which were sniffed by stub-pcap. + SNIFF_SEND = 0x10, + // Information about received packets which were sniffed by stub-pcap. + SNIFF_RECEIVE = 0x20, + // Information about writing to peers. + PEER_WRITE = 0x40, + // Information about reading from peers. + PEER_READ = 0x80, + // Which stages are being run in the main loop. + MAIN_LOOP = 0x100, + // Database storage and reterieval + LOOKUP_DB = 0x200, + DELAY_DETAIL = 0x400, + // Shortcuts for common cases. + LOG_NOTHING = 0x00, + LOG_EVERYTHING = 0x7ff +}; + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/pelab/stub/lookup.cc b/pelab/stub/lookup.cc index 52cc78d6d80264d989b9fbc55170cff986841311..d739a411f0964ac54e7e34764f5c55c9b2450cbd 100755 --- a/pelab/stub/lookup.cc +++ b/pelab/stub/lookup.cc @@ -48,7 +48,7 @@ struct Address }; -void init_connection(struct connection * conn) +void init_connection(connection * conn) { if (conn != NULL) { @@ -191,8 +191,8 @@ int for_each_to_monitor(send_to_monitor function, int monitor) std::map::iterator limit = stub_receivers.end(); for (; pos != limit && result == 1; ++pos) { - int index = pos->second; - result = function(monitor, index); + int index = pos->second; + result = function(monitor, index); } return result; } @@ -230,13 +230,48 @@ int insert_by_address(unsigned long ip, unsigned short source_port, return result; } +int insert_fake(unsigned long ip, unsigned short port) +{ + int result = find_by_address(ip, 0, 0); + if (result == FAILED_LOOKUP && ! empty_receivers.empty()) + { + // Update the lookup structures. + int index = * empty_receivers.begin(); + empty_receivers.erase(index); + + Address address_key = Address(ip, 0, 0); + address_receivers.insert(std::make_pair(address_key, index)); + + Stub stub_key = Stub(ip, port); + stub_receivers.insert(std::make_pair(stub_key, index)); + + // Connect to the index. + reset_receive_records(index, ip, 0, 0); + rcvdb[index].sockfd = -1; + // Set the new port and sockfd + rcvdb[index].stub_port = port; + + // Reset sniffing records + sniff_rcvdb[index].start = 0; + sniff_rcvdb[index].end = 0; + throughput[index].isValid = 0; + loss_records[index].loss_counter=0; + loss_records[index].total_counter=0; + last_loss_rates[index]=0; + delays[index]=0; + last_delays[index]=0; + result = index; + } + return result; +} + void reconnect_receiver(int index) { struct sockaddr_in dest_addr; dest_addr.sin_family = AF_INET; dest_addr.sin_port = htons(SENDER_PORT); dest_addr.sin_addr.s_addr = rcvdb[index].ip; - memset(&(dest_addr.sin_zero), '\0', 8); +// memset(&(dest_addr.sin_zero), '\0', 8); int sockfd = socket(AF_INET, SOCK_STREAM, 0); if (sockfd == -1) { @@ -250,6 +285,12 @@ void reconnect_receiver(int index) perror("connect"); clean_exit(1); } + int send_buf_size = 0; + int int_size = sizeof(send_buf_size); + getsockopt(sockfd, SOL_SOCKET, SO_SNDBUF, &send_buf_size, + (socklen_t*)&int_size); + fprintf(stderr, "Socket buffer size: %d\n", send_buf_size); + struct sockaddr_in source_addr; socklen_t len = sizeof(source_addr); int sockname_error = getsockname(sockfd, (struct sockaddr *)&source_addr, @@ -321,7 +362,6 @@ void set_pending(int index, fd_set * write_fds) { if (rcvdb[index].valid == 1) { - printf("Set sockfd: %d, maxfd: %d\n", rcvdb[index].sockfd, maxfd); pending_receivers.insert(index); FD_SET(rcvdb[index].sockfd, write_fds); } @@ -331,7 +371,6 @@ void clear_pending(int index, fd_set * write_fds) { if (rcvdb[index].valid == 1) { - printf("Clear sockfd: %d\n", rcvdb[index].sockfd); pending_receivers.erase(index); FD_CLR(rcvdb[index].sockfd, write_fds); rcvdb[index].pending = 0; diff --git a/pelab/stub/stub-pcap.c b/pelab/stub/stub-pcap.c index 011ec873e8efd34e5e66e4d1d2d7f17d53967ec7..df4313e4b5b41561d90d57b710782c60ec22b840 100644 --- a/pelab/stub/stub-pcap.c +++ b/pelab/stub/stub-pcap.c @@ -1,4 +1,5 @@ #include "stub.h" +#include "log.h" /* tcpdump header (ether.h) defines ETHER_HDRLEN) */ #ifndef ETHER_HDRLEN @@ -36,6 +37,7 @@ struct my_ip { struct in_addr ip_src,ip_dst; /* source and dest address */ }; + sniff_path sniff_rcvdb[CONCURRENT_RECEIVERS]; pcap_t* descr; int pcapfd; @@ -262,8 +264,11 @@ u_int16_t handle_IP(u_char *args, const struct pcap_pkthdr* pkthdr, const u_char u_int caplen = pkthdr->caplen; u_short len, hlen, version, tcp_hlen, ack_bit; u_long seq_start, seq_end, ack_seq, ip_src, ip_dst; + unsigned short source_port = 0; + unsigned short dest_port = 0; int path_id, record_id, msecs, end, flag_resend=0; sniff_path *path; +// struct in_addr debug_addr; /* jump pass the ethernet header */ ip = (struct my_ip*)(packet + sizeof(struct ether_header)); @@ -328,8 +333,29 @@ u_int16_t handle_IP(u_char *args, const struct pcap_pkthdr* pkthdr, const u_char seq_start = ntohl(tp->seq); seq_end = ((unsigned long)(seq_start+length)); ack_bit= ((tp)->ack & 0x0001); - - path_id = search_rcvdb(ip_dst); + source_port = htons(tp->source); + dest_port = htons(tp->dest); + +// path_id = search_rcvdb(ip_dst); + + // If there is a fake entry, the stub_port entry will be the + // destination port requested by the command line. +// debug_addr.s_addr = ip_src; +// printf("ip_src: %s ", inet_ntoa(debug_addr)); +// debug_addr.s_addr = ip_dst; +// printf("ip_dst: %s, dest_port: %d, source_port: %d\n", +// inet_ntoa(debug_addr), dest_port, source_port); + path_id = find_by_stub_port(ip_dst, dest_port); +// printf("outgoing path_id: %d\n", path_id); + if (path_id == -1 || rcvdb[path_id].source_port != 0 + || rcvdb[path_id].dest_port != 0) + { + // I contacted the receiver. Therefore, my port is unique and + // the receiver's port is fixed. The destination is the + // receiver, therefore my port is the one that is of interest. + path_id = find_by_stub_port(ip_dst, source_port); +// printf("stub path_id (outgoing): %d\n", path_id); + } if (path_id != -1) { //a monitored outgoing packet //ignore the pure outgoing ack if ((ack_bit==1) && (seq_end==seq_start)) { @@ -378,7 +404,23 @@ u_int16_t handle_IP(u_char *args, const struct pcap_pkthdr* pkthdr, const u_char } } else { - path_id = search_rcvdb(ip_src); +// path_id = search_rcvdb(ip_src); + // If there is a fake entry, and the packet is incoming, then + // the source_port will be the remote port requested on the + // command line. + path_id = find_by_stub_port(ip_src, source_port); +// printf("incoming path_id: %d\n", path_id); + + if (path_id == -1 || rcvdb[path_id].source_port != 0 + || rcvdb[path_id].dest_port != 0) + { + // I contacted the receiver, so my port is unique and their + // port is the same every time. This means that if a packet is + // coming from them, the destination port is the one of + // interest. + path_id = find_by_stub_port(ip_src, dest_port); +// printf("stub path_id (incoming): %d\n", path_id); + } if (path_id != -1) { //a monitored incoming packet if (ack_bit == 1) { //has an acknowledgement ack_seq = ntohl(tp->ack_seq); @@ -393,6 +435,8 @@ u_int16_t handle_IP(u_char *args, const struct pcap_pkthdr* pkthdr, const u_char msecs = floor((pkthdr->ts.tv_usec-sniff_rcvdb[path_id].records[record_id].captime.tv_usec)/1000.0+0.5); delays[path_id] = (pkthdr->ts.tv_sec-sniff_rcvdb[path_id].records[record_id].captime.tv_sec)*1000 + msecs; append_delay_sample(path_id, delays[path_id]); + logWrite(DELAY_DETAIL, &(pkthdr->ts), + "Delay: %lu", delays[path_id]); } pop_sniff_rcvdb(path_id, (unsigned long)(ack_seq-1)); //advance the sniff window base } //ack in rcvdb @@ -465,54 +509,75 @@ u_int16_t handle_ethernet (u_char *args,const struct pcap_pkthdr* pkthdr,const u return ether_type; } -void init_pcap(int to_ms) { - char *dev; +void init_pcap(int to_ms, unsigned short port, char * device, int is_live) { char errbuf[PCAP_ERRBUF_SIZE]; - struct bpf_program fp; /* hold compiled program */ - bpf_u_int32 maskp; /* subnet mask */ - bpf_u_int32 netp; /* ip */ - char string_filter[128]; - //struct in_addr addr; - dev = sniff_interface; //input parameter of stubd, should be "vnet" or "eth0" if (flag_debug) { - printf("The sniff_interface: %s \n", sniff_interface); - } - - /* ask pcap for the network address and mask of the device */ - pcap_lookupnet(dev,&netp,&maskp,errbuf); - //For an unknown reason, netp has the wrong 4th number - //addr.s_addr = netp; - sprintf(string_filter, "port %d and tcp", SENDER_PORT); - - /* open device for reading. - * NOTE: We use non-promiscuous */ - descr = pcap_open_live(dev, BUFSIZ, 0, to_ms, errbuf); - if(descr == NULL) { - printf("Error: pcap_open_live(): %s\n",errbuf); - exit(1); + printf("The sniff_interface: %s \n", device); } - + if (is_live) + { + logWrite(MAIN_LOOP, NULL, + "Initializing pcap live interface with device %s, and port %d", + device, port); + // We are running using live data. device refers to the network + // device. + struct bpf_program fp; /* hold compiled program */ + bpf_u_int32 maskp; /* subnet mask */ + bpf_u_int32 netp; /* ip */ + char string_filter[128]; + + /* ask pcap for the network address and mask of the device */ + pcap_lookupnet(device, &netp, &maskp, errbuf); + //For an unknown reason, netp has the wrong 4th number + //addr.s_addr = netp; + sprintf(string_filter, "port %d and tcp", port); + + /* open device for reading. + * NOTE: We use non-promiscuous */ + descr = pcap_open_live(device, BUFSIZ, 0, to_ms, errbuf); + if(descr == NULL) { + printf("Error: pcap_open_live(): %s\n",errbuf); + exit(1); + } - // Lets try and compile the program, optimized - if(pcap_compile(descr, &fp, string_filter, 1, maskp) == -1) { - fprintf(stderr,"Error: calling pcap_compile\n"); - exit(1); - } - // set the compiled program as the filter - if(pcap_setfilter(descr,&fp) == -1) { - fprintf(stderr,"Error: setting filter\n"); - exit(1); - } + // Lets try and compile the program, optimized + if(pcap_compile(descr, &fp, string_filter, 1, maskp) == -1) { + fprintf(stderr,"Error: calling pcap_compile\n"); + exit(1); + } + // set the compiled program as the filter + if(pcap_setfilter(descr,&fp) == -1) { + fprintf(stderr,"Error: setting filter\n"); + exit(1); + } - /* - if (pcap_setnonblock(descr, 1, errbuf) == -1){ - printf("Error: pcap_setnonblock(): %s\n",errbuf); - exit(1); + /* + if (pcap_setnonblock(descr, 1, errbuf) == -1){ + printf("Error: pcap_setnonblock(): %s\n",errbuf); + exit(1); + } + */ + } + else + { + logWrite(MAIN_LOOP, NULL, "Initializing pcap replay interface"); + // We are running offline using data recorded by tcpdump + // earlier. device is the filename. If device == '-', then we + // read from stdin. + descr = pcap_open_offline(device, errbuf); + if(descr == NULL) { + printf("Error: pcap_open_offline(): %s\n", errbuf); + exit(1); + } } - */ - pcapfd = pcap_fileno(descr); + pcapfd = pcap_get_selectable_fd(descr); + if (pcapfd == -1) + { + fprintf(stderr, "Error: pcap file descriptor is not selectable\n"); + exit(1); + } init_sniff_rcvdb(); loss_log = fopen("loss.log", "w"); //loss log diff --git a/pelab/stub/stub.h b/pelab/stub/stub.h index 4be5ba12513eaa0e229b067ee37afe23ee8ae958..2f461f07b7bb6746f2d9722f3f405d3e6b15598b 100644 --- a/pelab/stub/stub.h +++ b/pelab/stub/stub.h @@ -1,11 +1,18 @@ + #ifndef _STUB_H #define _STUB_H +#ifdef __cplusplus +extern "C" +{ +#endif + #include #include #include #include #include +#include #include #include #include @@ -32,7 +39,7 @@ #define PENDING_CONNECTIONS 10 //the pending connections the queue will hold #define CONCURRENT_SENDERS 50 //concurrent senders the stub maintains #define CONCURRENT_RECEIVERS 50 //concurrent receivers the stub maintains -#define MAX_PAYLOAD_SIZE 110000 //size of the traffic payload +#define MAX_PAYLOAD_SIZE 64000 //size of the traffic payload // This is the low water mark of the send buffer. That is, if select // says that a write buffer is writable, this is the minimum amount of @@ -45,19 +52,33 @@ #define SNIFF_WINSIZE 131071 //from min(net.core.rmem_max, max(net.ipv4.tcp_rmem)) on Plab linux #define SNIFF_TIMEOUT QUANTA/10 //in msec - //magic numbers #define CODE_BANDWIDTH 0x00000001 #define CODE_DELAY 0x00000002 #define CODE_LOSS 0x00000003 -#define CODE_LIST_SIZE 0x00000004 //indicates the start and size of a list of measurements -#define CODE_LIST_DELAY 0x00000005 //a list member that has a delay value +#define CODE_LIST_DELAY 0x00000004 + +#define MONITOR_RECORD_SIZE (sizeof(long)*3 + sizeof(unsigned short)*3) +#define PACKET_WRITE 1 +#define PACKET_SEND_BUFFER 2 +#define PACKET_RECEIVE_BUFFER 3 + +enum { FAILED_LOOKUP = -1 }; +// The int is the index of the connection. +typedef void (*handle_index)(int); +// The first int is the socket of the monitor. +// The second int is the index of the connection. +// This returns 1 for success and 0 for failure. +typedef int (*send_to_monitor)(int, int); typedef struct { short valid; int sockfd; unsigned long ip; + unsigned short stub_port; + unsigned short source_port; + unsigned short dest_port; time_t last_usetime; //last monitor access time int pending; // How many bytes are pending to this peer? } connection; @@ -88,17 +109,21 @@ typedef struct { extern short flag_debug; extern int pcapfd; -extern char sniff_interface[128]; +extern int maxfd; +extern connection snddb[CONCURRENT_SENDERS]; extern connection rcvdb[CONCURRENT_RECEIVERS]; extern sniff_path sniff_rcvdb[CONCURRENT_RECEIVERS]; extern unsigned long delays[CONCURRENT_RECEIVERS]; //delay is calculated at the sender side +extern unsigned long last_delays[CONCURRENT_RECEIVERS]; extern loss_record loss_records[CONCURRENT_RECEIVERS]; //loss is calculated at the sender side +extern unsigned long last_loss_rates[CONCURRENT_RECEIVERS]; //loss per billion extern delay_record delay_records[CONCURRENT_RECEIVERS]; //delay is calculated at the sender side -extern int search_rcvdb(unsigned long indexip); extern void sniff(void); -extern void init_pcap(int to_ms); +extern void init_pcap(int to_ms, unsigned short port, char * device, + int is_live); extern void append_delay_sample(int path_id, long sample_value); +void clean_exit(int); typedef struct { @@ -118,12 +143,86 @@ extern unsigned int throughputTick(ThroughputAckState * state); extern void throughputInit(ThroughputAckState * state, unsigned int sequence); extern unsigned int bytesThisTick(ThroughputAckState * state); -#endif +// Add a potential sender to the pool. +void add_empty_sender(int index); + +// Initialize a receiver or sender connection. +void init_connection(connection * conn); + +// Run function on the index of every valid sender that is readable. +void for_each_readable_sender(handle_index function, fd_set * read_fds_copy); + +// Try to find the sender by the IP address and the source port of the +// actual connection being created by the seinding peer. If there is a +// sender, the socket is closed and replaced with the sockfd +// argument. If the sender cannot be found, create a new sender based +// on the sockfd argument. +// Returns FAILED_LOOKUP if the sender cannot be found and there are +// no empty slots. +int replace_sender_by_stub_port(unsigned long ip, unsigned short stub_port, + int sockfd, fd_set * read_fds); +// Remove the index from the database, invalidating it. +void remove_sender_index(int index, fd_set * read_fds); +// Add a potential receiver to the pool. +void add_empty_receiver(int index); +// Run function on the index of each writable receiver which has some +// bytes pending. +void for_each_pending(handle_index function, fd_set * write_fds_copy); +// Run function on each index which will send an update to the +// monitor. Returns 1 for success and 0 for failure. +int for_each_to_monitor(send_to_monitor function, int monitor); +// Find the index of the receiver based on the IP address of the +// receiver, and the source and destination ports of the Emulab +// connection. +// Returns FAILED_LOOKUP on failure or the index of the receiver. +int find_by_address(unsigned long ip, unsigned short source_port, + unsigned short dest_port); +// Find the index of the receiver based on the above criteria. If this +// fails, create a new connection to the IP address. +// Returns FAILED_LOOKUP on failure or the index of the receiver. +// Failure happens when the receiver is not already in the database +// and there are no more empty slots in the database. +int insert_by_address(unsigned long ip, unsigned short source_port, + unsigned short dest_port); +// Insert a fake entry for purely monitoring purposes. +int insert_fake(unsigned long ip, unsigned short port); + +// Reconnect to a receiver. Resets the socket, the stub_port, and the +// sniff records. +void reconnect_receiver(int index); + + +// Reset a receiver to point to the ip source_port and dest_port +// specified. Note that this does not change the socket or stub_port +// at all (call reconnect for that). Nor does it change the sniff records. +void reset_receive_records(int index, unsigned long ip, + unsigned short source_port, + unsigned short dest_port); + +// Find the index of the receiver based on the IP address and the +// source port number of the stub connection. +// Returns FAILED_LOOKUP on failure. +int find_by_stub_port(unsigned long ip, unsigned short stub_port); + +// Put the index into the pending category. +void set_pending(int index, fd_set * write_fds); + +// Remove the index from the pending category. +void clear_pending(int index, fd_set * write_fds); + +// Remove a receiver from the database, invalidating it. +void remove_index(int index, fd_set * write_fds); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/pelab/stub/stubd.c b/pelab/stub/stubd.c index 30822dc863e7d9abde70632efc2e1e2311804aca..9de4800a5f807c118945ba286bb665028726df3a 100644 --- a/pelab/stub/stubd.c +++ b/pelab/stub/stubd.c @@ -6,8 +6,8 @@ * ****************************************************************************/ - #include "stub.h" +#include "log.h" /* * For getopt() @@ -19,12 +19,10 @@ extern int optopt; extern int opterr; extern int optreset; -void clean_exit(int); void append_delay_sample(int path_id, long sample_value); //Global short flag_debug, flag_standalone; -char sniff_interface[128]; connection rcvdb[CONCURRENT_RECEIVERS]; unsigned long delays[CONCURRENT_RECEIVERS]; //delay is calculated at the sender side unsigned long last_delays[CONCURRENT_RECEIVERS]; @@ -57,7 +55,10 @@ typedef struct { unsigned long ip; long delta; - long size; + unsigned short type; + long value; + unsigned short source_port; + unsigned short dest_port; } packet_info; packet_buffer_node * packet_buffer_head; @@ -122,11 +123,35 @@ packet_info packet_buffer_front(void) else { char * base = packet_buffer_head->buffer + packet_buffer_index; + + // Get ip memcpy(&result.ip, base, SIZEOF_LONG); - memcpy(&result.delta, base + SIZEOF_LONG, SIZEOF_LONG); + base += SIZEOF_LONG; + + // Get source port + memcpy(&result.source_port, base, sizeof(result.source_port)); + result.source_port = ntohs(result.source_port); + base += sizeof(result.source_port); + + // Get dest port + memcpy(&result.dest_port, base, sizeof(result.dest_port)); + result.dest_port = ntohs(result.dest_port); + base += sizeof(result.dest_port); + + // Get delta time + memcpy(&result.delta, base, SIZEOF_LONG); result.delta = ntohl(result.delta); - memcpy(&result.size, base + SIZEOF_LONG + SIZEOF_LONG, SIZEOF_LONG); - result.size = ntohl(result.size); + base += SIZEOF_LONG; + + // Get value + memcpy(&result.value, base, SIZEOF_LONG); + result.value = ntohl(result.value); + base += SIZEOF_LONG; + + // Get type + memcpy(&result.type, base, sizeof(result.type)); + result.type = ntohs(result.type); + base += sizeof(result.type); } return result; } @@ -140,7 +165,7 @@ int packet_buffer_more(void) // Move to the next packet, cleaning up as we go. void packet_buffer_advance(void) { - packet_buffer_index += 3*SIZEOF_LONG; + packet_buffer_index += MONITOR_RECORD_SIZE; if (packet_buffer_index >= packet_buffer_head->size) { packet_buffer_node * old_head = packet_buffer_head; @@ -155,6 +180,13 @@ void packet_buffer_advance(void) } } +char * ipToString(unsigned long ip) +{ + struct in_addr address; + address.s_addr = ip; + return inet_ntoa(address); +} + void init_random_buffer(void) { int i = 0; @@ -165,18 +197,6 @@ void init_random_buffer(void) } } -void copy_ulong(char **buf_ptr, ulong value, int flag_hton) { - ulong tmpulong; - - if (flag_hton) { - tmpulong = htonl(value); - } else { - tmpulong = value; - } - memcpy(*buf_ptr, &tmpulong, SIZEOF_LONG); - (*buf_ptr) += SIZEOF_LONG; -} - //Append a delay sample to the tail of the ith delay-record queue. void append_delay_sample(int path_id, long sample_value) { delay_sample *sample = malloc(sizeof(delay_sample)); @@ -197,62 +217,57 @@ void append_delay_sample(int path_id, long sample_value) { delay_records[path_id].sample_number++; } -//Remove and return the delay samples. -//Input: path_id. -//Output: size. -//Return: sample_buffer -char *remove_delay_samples(int path_id, int *size) { - int i, number, msecs; - unsigned long interval; - char *sample_buffer, *buf_ptr; - struct timeval *last_tvp, *this_tvp; - delay_sample *samp_ptr; - - number = delay_records[path_id].sample_number; - if (number == 0) return NULL; - - *size = 3 * SIZEOF_LONG * (number+1); - sample_buffer = (char *) malloc(*size); - if (sample_buffer == NULL) { - perror("allocate"); - clean_exit(1); +//Copy delay samples into the given buffer. +//Input: path_id, maximum count +//Output: buffer to write samples into +//Return: void +void save_delay_samples(int path_id, char * buffer, int maxCount) { + char *pos = buffer; + delay_sample *current = delay_records[path_id].head; + delay_sample * previous = NULL; + unsigned long interval = 0; + int count = 1; + + current = delay_records[path_id].head; + interval = 0; + count = 1; + while (current != NULL && count < maxCount) + { + // Format: interval, value + unsigned long tmp = htonl(interval); + memcpy(pos, &tmp, sizeof(unsigned long)); + pos += sizeof(unsigned long); + + tmp = htonl(current->value); + memcpy(pos, &tmp, sizeof(unsigned long)); + pos += sizeof(unsigned long); + + previous = current; + current = current->next; + if (current != NULL) + { + unsigned long msecs = (current->time.tv_usec + - previous->time.tv_usec)/1000; + interval = (current->time.tv_sec - previous->time.tv_sec)*1000 + msecs; + } + ++count; } - buf_ptr = sample_buffer; - - //copy list size into the buffer - //format: ip, type, value - copy_ulong(&buf_ptr, rcvdb[path_id].ip, 0); //the receiver ip - copy_ulong(&buf_ptr, CODE_LIST_SIZE, 1); - copy_ulong(&buf_ptr, number, 1); - - //copy the first delay into the buffer - //format: interval, type, value - samp_ptr = delay_records[path_id].head; - copy_ulong(&buf_ptr, 0L, 1); //zero interval - copy_ulong(&buf_ptr, CODE_LIST_DELAY, 1); - copy_ulong(&buf_ptr, samp_ptr->value, 1); - last_tvp = &(samp_ptr->time); - - //copy the following delays - for (i=1; inext; - this_tvp = &(samp_ptr->time); - msecs = floor((this_tvp->tv_usec-last_tvp->tv_usec)/1000.0+0.5); - interval = (this_tvp->tv_sec-last_tvp->tv_sec)*1000 + msecs; - copy_ulong(&buf_ptr, interval, 1); //interval in msecs - copy_ulong(&buf_ptr, CODE_LIST_DELAY, 1); - copy_ulong(&buf_ptr, samp_ptr->value, 1); - last_tvp = &(samp_ptr->time); - free(delay_records[path_id].head); //release the i-1 delay_sample - delay_records[path_id].head = samp_ptr; //use head as the pre_ptr +} + +void remove_delay_samples(int path_id) +{ + while (delay_records[path_id].head != NULL) + { + delay_sample * tmp = delay_records[path_id].head; + free(delay_records[path_id].head); + delay_records[path_id].head = tmp; } - free(delay_records[path_id].head); //release the last delay_sample delay_records[path_id].head = NULL; delay_records[path_id].tail = NULL; delay_records[path_id].sample_number = 0; - return sample_buffer; } +/* //Initialize or reset state varialbes related to a receiver connection void reset_rcv_entry(int i) { rcvdb[i].valid = 0; @@ -262,19 +277,31 @@ void reset_rcv_entry(int i) { delays[i]=0; last_delays[i]=0; } - +*/ void init(void) { int i; - for (i=0; i maxfd) { maxfd = sockfd; } - dbindex=insert_db(destaddr, sockfd, 0); //insert rcvdb + //insert rcvdb + dbindex=insert_db(destaddr, source_port, dest_port, sockfd, 0); } return dbindex; } +*/ void remove_pending(int index) { if (rcvdb[index].pending == 0) { - FD_CLR(rcvdb[index].sockfd, &write_fds); + clear_pending(index, &write_fds); } } @@ -393,37 +433,33 @@ void add_pending(int index, int size) { if (rcvdb[index].pending == 0 && size > 0) { - FD_SET(rcvdb[index].sockfd, &write_fds); + set_pending(index, &write_fds); } rcvdb[index].pending += size; } -void try_pending(int index, fd_set * write_fds_copy) +void try_pending(int index) { - if (rcvdb[index].pending > 0 && FD_ISSET(rcvdb[index].sockfd, - write_fds_copy)) + int size = 0; + int error = 0; + if (rcvdb[index].pending > LOW_WATER_MARK) { - int size = 0; - int error = 0; - if (rcvdb[index].pending > LOW_WATER_MARK) - { - size = LOW_WATER_MARK; - } - else - { - size = rcvdb[index].pending; - } - error = send(rcvdb[index].sockfd, random_buffer, size, 0); - if (error == -1) - { - perror("try_pending"); - clean_exit(1); - } - rcvdb[index].pending -= error; - total_size += error; -// printf("Total: %d, Pending: %d\n", total_size, rcvdb[index].pending); - remove_pending(index); + size = LOW_WATER_MARK; + } + else + { + size = rcvdb[index].pending; + } + error = send(rcvdb[index].sockfd, random_buffer, size, MSG_DONTWAIT); + logWrite(PEER_WRITE, NULL, "Wrote %d pending bytes", error); + if (error == -1) + { + perror("try_pending"); + clean_exit(1); } + rcvdb[index].pending -= error; + total_size += error; + remove_pending(index); } void print_header(char *buf){ @@ -476,64 +512,65 @@ int send_all(int sockfd, char *buf, int size) { } void receive_sender(int i) { - char inbuf[MAX_PAYLOAD_SIZE]; + static char inbuf[MAX_PAYLOAD_SIZE]; - if (recv(snddb[i].sockfd, inbuf, MAX_PAYLOAD_SIZE, 0)== 0) { //connection closed - snddb[i].valid = 0; //no additional clean-up because no other state varialbe is related - FD_CLR(snddb[i].sockfd, &read_fds); + if (recv(snddb[i].sockfd, inbuf, MAX_PAYLOAD_SIZE, 0)== 0) { + //connection closed + remove_sender_index(i, &read_fds); } } -void send_receiver(unsigned long destaddr, long size, fd_set * write_fds_copy){ - int index; +void send_receiver(int index, int packet_size, fd_set * write_fds_copy){ int sockfd; int error = 1, retry=0; struct in_addr addr; - index = get_rcvdb_index(destaddr); - sockfd= rcvdb[index].sockfd; + sockfd = rcvdb[index].sockfd; - if (size <= 0) { - size = 1; + if (packet_size <= 0) { + packet_size = 1; } if (rcvdb[index].pending > 0) { - add_pending(index, size); + add_pending(index, packet_size); return; } - if (size > MAX_PAYLOAD_SIZE){ - add_pending(index, size - MAX_PAYLOAD_SIZE); - size = MAX_PAYLOAD_SIZE; + if (packet_size > MAX_PAYLOAD_SIZE){ + add_pending(index, packet_size - MAX_PAYLOAD_SIZE); + packet_size = MAX_PAYLOAD_SIZE; } - error = send(sockfd, random_buffer, size, MSG_DONTWAIT); + error = send(sockfd, random_buffer, packet_size, MSG_DONTWAIT); + logWrite(PEER_WRITE, NULL, "Wrote %d bytes", error); // Handle failed connection while (error == -1 && errno == ECONNRESET && retry < 3) { - // TODO: Think hard about what resetting a connection means for sniffing - // traffic. - +/* //reset the related state variables int pending = rcvdb[index].pending; sniff_rcvdb[index].start = 0; sniff_rcvdb[index].end = 0; throughput[index].isValid = 0; FD_CLR(rcvdb[index].sockfd, &write_fds); + // TODO: Fix redo reset_rcv_entry(index); //try again - index = get_rcvdb_index(destaddr); + index = get_rcvdb_index(packet.ip, packet.source_port, packet.dest_port); rcvdb[index].pending = pending; +*/ + reconnect_receiver(index); sockfd= rcvdb[index].sockfd; - error = send(sockfd, random_buffer, size, MSG_DONTWAIT); + error = send(sockfd, random_buffer, packet_size, MSG_DONTWAIT); + logWrite(PEER_WRITE, NULL, "Wrote %d reconnected bytes", error); retry++; } //if still disconnected, reset if (error == -1 && errno == ECONNRESET) { - rcvdb[index].valid = 0; - addr.s_addr = destaddr; + remove_index(index, &write_fds); + addr.s_addr = rcvdb[index].ip; printf("Error: send_receiver() - failed send to %s three times. \n", inet_ntoa(addr)); } else if (error == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) { - add_pending(index, size); + add_pending(index, packet_size); } else if (error == -1) { perror("send_receiver: send"); @@ -542,31 +579,77 @@ void send_receiver(unsigned long destaddr, long size, fd_set * write_fds_copy){ else { total_size += error; // printf("Total: %d, Pending: %d\n", total_size, rcvdb[index].pending); - add_pending(index, size - error); + add_pending(index, packet_size - error); + } +} + +void change_socket_buffer_size(int sockfd, int optname, int value) +{ + int error = setsockopt(sockfd, SOL_SOCKET, optname, &value, sizeof(value)); + if (error == -1) + { + perror("setsockopt"); + clean_exit(1); + } +} + +void process_control_packet(packet_info packet, fd_set * write_fds_copy){ + int index = -1; + int sockfd = -1; + + index = insert_by_address(packet.ip, packet.source_port, packet.dest_port); + if (index == -1) + { + printf("No more connection slots.\n"); + clean_exit(1); + } + sockfd = rcvdb[index].sockfd; + switch(packet.type) + { + case PACKET_WRITE: + logWrite(CONTROL_RECEIVE, NULL, "Told to write %d bytes", packet.value); + send_receiver(index, packet.value, write_fds_copy); + break; + case PACKET_SEND_BUFFER: + logWrite(CONTROL_RECEIVE, NULL, "Told to set send buffer to %d bytes", + packet.value); + change_socket_buffer_size(sockfd, SO_SNDBUF, packet.value); + break; + case PACKET_RECEIVE_BUFFER: + logWrite(CONTROL_RECEIVE, NULL, "Told to set receive buffer to %d bytes", + packet.value); + change_socket_buffer_size(sockfd, SO_RCVBUF, packet.value); + break; + default: + fprintf(stderr, "Unknown control packet code: %d\n", packet.type); + clean_exit(1); } } int receive_monitor(int sockfd, struct timeval * deadline) { - char buf[MAX_PAYLOAD_SIZE]; - char *nextptr; - unsigned long tmpulong, destnum; + char buf[2*SIZEOF_LONG]; + int buffer_size = 0; + unsigned long destnum = 0; char * packet_buffer = NULL; //receive first two longs if (recv_all(sockfd, buf, 2*SIZEOF_LONG)==0) { return 0; } - nextptr = buf+SIZEOF_LONG; - memcpy(&tmpulong, nextptr, SIZEOF_LONG); - destnum = ntohl(tmpulong); - packet_buffer = malloc(destnum*3*SIZEOF_LONG); - + memcpy(&destnum, buf + SIZEOF_LONG, SIZEOF_LONG); + destnum = ntohl(destnum); //return success if no dest addr is given if (destnum == 0){ return 1; } + + logWrite(CONTROL_RECEIVE, NULL, "Received %d control records", destnum); + + buffer_size = (int)(destnum * MONITOR_RECORD_SIZE); + packet_buffer = malloc(buffer_size); + //otherwise, receive dest addrs - if (recv_all(sockfd, packet_buffer, destnum*3*SIZEOF_LONG)==0) { + if (recv_all(sockfd, packet_buffer, buffer_size)==0) { free(packet_buffer); return 0; } @@ -574,95 +657,226 @@ int receive_monitor(int sockfd, struct timeval * deadline) { { gettimeofday(deadline, NULL); } - packet_buffer_add(packet_buffer, destnum*3*SIZEOF_LONG); - -// nextptr=buf; -// for (i=0; i (long)(last_delays[index]/5)) { + // Insert the address info + char * buf = save_receiver_address(outbuf_delay, index); + + logWrite(CONTROL_SEND, NULL, "Sending delay(%d) about stream(%hu:%s:%hu)", + delays[index], ipToString(rcvdb[index].ip)); + + // Insert the code number for delay + tmpulong = htonl(CODE_DELAY); + memcpy(buf, &tmpulong, SIZEOF_LONG); + buf += SIZEOF_LONG; + + // Insert the delay value + tmpulong = htonl(delays[index]); + memcpy(buf, &tmpulong, SIZEOF_LONG); + buf += SIZEOF_LONG; + + if (send_all(monitor, outbuf_delay, buffer_size) == 0){ + return 0; + } + last_delays[index] = delays[index]; + logWrite(CONTROL_SEND, NULL, "Sending delay success"); - for (i=0; i 0) + { + char * buf = save_receiver_address(outbuf, index); + + logWrite(CONTROL_SEND, NULL, + "Sending delay list of size(%d) to monitor(%s)", + delay_records[index].sample_number, ipToString(rcvdb[index].ip)); + + memcpy(buf, &code, SIZEOF_LONG); + buf += SIZEOF_LONG; + + memcpy(buf, &count, SIZEOF_LONG); + buf += SIZEOF_LONG; + + save_delay_samples(index, buf, maxSend); + + error = send_all(monitor, outbuf, sending_size) == 0; + logWrite(CONTROL_SEND, NULL, + "Sending delay list finished with code(%d)", error); + } + + remove_delay_samples(index); + + return error; +} + +int send_monitor(int sockfd) { + int result = 1; + + if (result == 1) { + result = for_each_to_monitor(send_delay_to_monitor, sockfd); + } + if (result == 1) { + result = for_each_to_monitor(send_bandwidth_to_monitor, sockfd); + } +// if (result == 1) { +// result = for_each_to_monitor(send_loss_to_monitor, sockfd); +// } +// if (result == 1) { +// result = for_each_to_monitor(send_delay_list_to_monitor, sockfd); +// } + return result; +} + void print_measurements(void) { float loss_rate; int i; @@ -739,7 +953,7 @@ void handle_packet_buffer(struct timeval * deadline, fd_set * write_fds_copy) // printf("Sending packet to %s of size %ld\n", inet_ntoa(debug_temp), // packet.size); - send_receiver(packet.ip, packet.size, write_fds_copy); + process_control_packet(packet, write_fds_copy); packet_buffer_advance(); if (packet_buffer_more()) @@ -774,6 +988,20 @@ int have_time(struct timeval *start_tvp, struct timeval *left_tvp){ void usage() { fprintf(stderr,"Usage: stubd [-t] [-d] [-s] [remote_IPaddr]\n"); fprintf(stderr," -d: Enable debugging mode\n"); + fprintf(stderr," -f : Save logs into filename. By default, stderr is used."); + fprintf(stderr," -l