Commit 3a043935 authored by Jonathon Duerig's avatar Jonathon Duerig

Added support for queue-length-in-time. Fixed a bug where unusually small...

Added support for queue-length-in-time. Fixed a bug where unusually small bandwidth measurements were sent during slow start.
parent dff9ce53
......@@ -18,6 +18,7 @@ CODE_BANDWIDTH = 1
CODE_DELAY = 2
CODE_LOSS = 3
CODE_LIST_DELAY = 4
CODE_MAX_DELAY = 5
PACKET_WRITE = 1
PACKET_SEND_BUFFER = 2
......@@ -206,6 +207,9 @@ def receive_characteristic(conn):
buffer = conn.recv(value*8)
# Dummynet isn't quite set up to deal with this yet, so ignore it.
sys.stdout.write('Ignoring delay list of size: ' + str(value) + '\n')
elif command == CODE_MAX_DELAY:
sys.stdout.write('Max Delay: ' + str(value) + '\n');
set_max_delay(value, dest);
else:
sys.stdout.write('Other: ' + str(command) + ', ' + str(value) + '\n');
return True
......@@ -238,6 +242,12 @@ def set_delay(milliseconds, dest):
def set_loss(probability, dest):
return set_link(this_ip, dest, 'plr=' + str(probability))
def set_max_delay(delay, dest):
hertz = 10000.0
milliseconds = 1000.0
return set_link(this_ip, dest, 'MAXINQ=' + str(int(
(hertz/milliseconds)*delay )))
def set_link(source, dest, ending):
command = ('/usr/testbed/bin/tevc -e ' + this_experiment + ' now '
+ emulated_to_interface[source] + ' modify dest=' + dest + ' '
......
/usr/testbed/bin/tevc -e tbres/pelab-generated now elabc-elab-1 modify dest=10.0.0.2 delay=5
/usr/testbed/bin/tevc -e tbres/pelab-generated now elabc-elab-2 modify dest=10.0.0.1 delay=5
/usr/testbed/bin/tevc -e tbres/pelab-generated now elabc-elab-1 modify dest=10.0.0.2 bandwidth=10000
/usr/testbed/bin/tevc -e tbres/pelab-generated now elabc-elab-2 modify dest=10.0.0.1 bandwidth=10000
/usr/testbed/bin/tevc -e tbres/pelab-generated now elabc-elab-1 modify dest=10.0.0.2 delay=$1
/usr/testbed/bin/tevc -e tbres/pelab-generated now elabc-elab-2 modify dest=10.0.0.1 delay=$1
/usr/testbed/bin/tevc -e tbres/pelab-generated now elabc-elab-1 modify dest=10.0.0.2 bandwidth=$2
/usr/testbed/bin/tevc -e tbres/pelab-generated now elabc-elab-2 modify dest=10.0.0.1 bandwidth=$2
/usr/testbed/bin/tevc -e tbres/pelab-generated now elabc-elab-1 modify dest=10.0.0.2 lpr=0.0
/usr/testbed/bin/tevc -e tbres/pelab-generated now elabc-elab-2 modify dest=10.0.0.1 lpr=0.0
/usr/testbed/bin/tevc -e tbres/pelab-generated now plabc-plab-1 modify dest=10.1.0.2 delay=$1
/usr/testbed/bin/tevc -e tbres/pelab-generated now plabc-plab-2 modify dest=10.1.0.1 delay=$1
/usr/testbed/bin/tevc -e tbres/pelab-generated now plabc-plab-1 modify dest=10.1.0.2 bandwidth=1500
/usr/testbed/bin/tevc -e tbres/pelab-generated now plabc-plab-2 modify dest=10.1.0.1 bandwidth=1500
/usr/testbed/bin/tevc -e tbres/pelab-generated now plabc-plab-1 modify dest=10.1.0.2 lpr=0.0
/usr/testbed/bin/tevc -e tbres/pelab-generated now plabc-plab-2 modify dest=10.1.0.1 lpr=0.0
#/usr/testbed/bin/tevc -e tbres/pelab-generated now plabc-plab-1 modify dest=10.1.0.2 delay=$1
#/usr/testbed/bin/tevc -e tbres/pelab-generated now plabc-plab-2 modify dest=10.1.0.1 delay=$1
#/usr/testbed/bin/tevc -e tbres/pelab-generated now plabc-plab-1 modify dest=10.1.0.2 bandwidth=1500
#/usr/testbed/bin/tevc -e tbres/pelab-generated now plabc-plab-2 modify dest=10.1.0.1 bandwidth=1500
#/usr/testbed/bin/tevc -e tbres/pelab-generated now plabc-plab-1 modify dest=10.1.0.2 lpr=0.0
#/usr/testbed/bin/tevc -e tbres/pelab-generated now plabc-plab-2 modify dest=10.1.0.1 lpr=0.0
#/usr/testbed/bin/tevc -e tbres/pelab now rlink-router0 modify dest=10.4.0.1 delay=$1
#/usr/testbed/bin/tevc -e tbres/pelab now rlink-router1 modify dest=10.1.0.1 delay=$1
......
......@@ -257,6 +257,8 @@ int insert_fake(unsigned long ip, unsigned short port)
throughput[index].isValid = 0;
last_through[index] = 0;
buffer_full[index] = 0;
max_delay[index] = 0;
last_max_delay[index] = 0;
loss_records[index].loss_counter=0;
loss_records[index].total_counter=0;
last_loss_rates[index]=0;
......@@ -335,11 +337,16 @@ void reconnect_receiver(int index)
sniff_rcvdb[index].start = 0;
sniff_rcvdb[index].end = 0;
throughput[index].isValid = 0;
last_through[index] = 0;
buffer_full[index] = 0;
max_delay[index] = 0;
last_max_delay[index] = 0;
loss_records[index].loss_counter=0;
loss_records[index].total_counter=0;
last_loss_rates[index]=0;
delays[index]=0;
last_delays[index]=0;
delay_count[index]=0;
max_throughput[index] = 0;
base_rtt[index] = LONG_MAX;
// remove_delay_samples(index);
......
......@@ -458,6 +458,30 @@ u_int16_t handle_IP(u_char *args, const struct pcap_pkthdr* pkthdr, const u_char
if (record_id != -1) { //new ack received
int delay = 0;
struct tcp_info info;
int info_size = sizeof(info);
int error = 0;
if (is_live)
{
error = getsockopt(rcvdb[path_id].sockfd,
SOL_TCP, TCP_INFO, &info,
&info_size);
if (error == -1)
{
perror("getsockopt() TCP_INFO");
clean_exit(1);
}
if (info.tcpi_snd_cwnd < info.tcpi_snd_ssthresh
&& bandwidth_method == BANDWIDTH_BUFFER)
{
// We are in slow start. This means that even if the
// socket buffer is full, we are still not sending
// at the ABW rate.
buffer_full[path_id] = 0;
last_through[path_id] = INT_MAX;
logWrite(DELAY_DETAIL, NULL, "Buffer Clear");
}
}
msecs = floor((pkthdr->ts.tv_usec-sniff_rcvdb[path_id].records[record_id].captime.tv_usec)/1000.0+0.5);
delay = (pkthdr->ts.tv_sec-sniff_rcvdb[path_id].records[record_id].captime.tv_sec)*1000 + msecs;
......@@ -467,21 +491,18 @@ u_int16_t handle_IP(u_char *args, const struct pcap_pkthdr* pkthdr, const u_char
(delay_count[path_id])++;
if (delay < base_rtt[path_id])
{
base_rtt[path_id] = delay;
base_rtt[path_id] = delay;
}
logWrite(DELAY_DETAIL, NULL, "delay: %d, max_delay: %d",
delay, max_delay[path_id]);
if (delay > max_delay[path_id])
{
max_delay[path_id] = delay + delay/10 + 1;
}
if (is_live && delay_count[path_id] > 0
&& bandwidth_method == BANDWIDTH_VEGAS)
{
int bandwidth = 0;
int info_size = sizeof(info);
int error = getsockopt(rcvdb[path_id].sockfd,
SOL_TCP, TCP_INFO, &info,
&info_size);
if (error == -1)
{
perror("getsockopt() TCP_INFO");
clean_exit(1);
}
bandwidth = (info.tcpi_unacked/*tcpi_snd_cwnd*/
* info.tcpi_snd_mss * 8)
/ base_rtt[path_id];
......
......@@ -52,6 +52,7 @@ extern "C"
#define CODE_DELAY 0x00000002
#define CODE_LOSS 0x00000003
#define CODE_LIST_DELAY 0x00000004
#define CODE_MAX_DELAY 0x00000005
//magic numbers for alternative algorithms
#define BANDWIDTH_AVERAGE 0
......@@ -174,6 +175,9 @@ extern int buffer_full[CONCURRENT_RECEIVERS];
extern unsigned long max_throughput[CONCURRENT_RECEIVERS];
extern unsigned long base_rtt[CONCURRENT_RECEIVERS];
extern int max_delay[CONCURRENT_RECEIVERS];
extern int last_max_delay[CONCURRENT_RECEIVERS];
extern void sniff(void);
extern void init_pcap(int to_ms, unsigned short port, char * device);
extern void append_delay_sample(int path_id, long sample_value,
......
......@@ -32,6 +32,8 @@ loss_record loss_records[CONCURRENT_RECEIVERS]; //loss is calculated at the send
unsigned long last_loss_rates[CONCURRENT_RECEIVERS]; //loss per billion
int last_through[CONCURRENT_RECEIVERS];
int buffer_full[CONCURRENT_RECEIVERS];
int max_delay[CONCURRENT_RECEIVERS];
int last_max_delay[CONCURRENT_RECEIVERS];
int flag_testmode=0;
int bandwidth_method = BANDWIDTH_VEGAS;
enum {TEST_NOTTESTING, TEST_NOTSTARTED, TEST_RUNNING, TEST_DONE } test_state;
......@@ -814,86 +816,102 @@ char * save_receiver_address(char * buf, int index)
return buf;
}
int send_delay_to_monitor(int monitor, int index)
int send_simple_record_to_monitor(int monitor, int index, unsigned long type,
unsigned long value)
{
int buffer_size = 3*SIZEOF_LONG + 2*sizeof(unsigned short);
char outbuf_delay[buffer_size];
unsigned long delay = 0;
unsigned long tmpulong;
// delay = delays[index];
// if (delay_count[index] > 0)
// {
// delay /= delay_count[index];
// }
// else
// {
// delay = last_delays[index];
// }
// Insert the address info
char * buf = save_receiver_address(outbuf_delay, index);
delay = base_rtt[index];
if (delay_count[index] == 0)
{
delay = last_delays[index];
}
logWrite(CONTROL_SEND, NULL,
"Sending type(%d) value(%d) about stream(%hu:%s:%hu)",
type, value, rcvdb[index].source_port,
ipToString(rcvdb[index].ip), rcvdb[index].dest_port);
// If measurement changed since last send
// if (abs((long)delays[index] - (long)last_delays[index])
// > (long)(last_delays[index]/5)) {
if (delay != last_delays[index])
{
// Insert the address info
char * buf = save_receiver_address(outbuf_delay, index);
// Insert the code number for delay
type = htonl(type);
memcpy(buf, &type, SIZEOF_LONG);
buf += SIZEOF_LONG;
logWrite(CONTROL_SEND, NULL, "Sending delay(%d) about stream(%hu:%s:%hu)",
delay, rcvdb[index].source_port,
ipToString(rcvdb[index].ip), rcvdb[index].dest_port);
// Insert the delay value
value = htonl(value);
memcpy(buf, &value, SIZEOF_LONG);
buf += SIZEOF_LONG;
// Insert the code number for delay
tmpulong = htonl(CODE_DELAY);
memcpy(buf, &tmpulong, SIZEOF_LONG);
buf += SIZEOF_LONG;
return send_all(monitor, outbuf_delay, buffer_size) != 0;
}
// Insert the delay value
tmpulong = htonl(delay);
memcpy(buf, &tmpulong, SIZEOF_LONG);
buf += SIZEOF_LONG;
int send_max_delay_to_monitor(int monitor, int index)
{
int result = 1;
int delay = 0;
if (send_all(monitor, outbuf_delay, buffer_size) == 0){
return 0;
delay = max_delay[index];
if (delay == 0 || buffer_full[index] == 0)
{
delay = last_max_delay[index];
}
if (delay != last_max_delay[index])
{
result = send_simple_record_to_monitor(monitor, index, CODE_MAX_DELAY,
delay);
if (result)
{
logWrite(CONTROL_SEND, NULL, "Successfully sent max_delay measurement");
}
}
last_max_delay[index] = delay;
max_delay[index] = 0;
return result;
}
int send_delay_to_monitor(int monitor, int index)
{
int result = 1;
unsigned long delay = 0;
logWrite(CONTROL_SEND, NULL, "Sending delay success");
delay = base_rtt[index];
if (delay_count[index] == 0)
{
delay = last_delays[index];
}
if (delay != last_delays[index])
{
result = send_simple_record_to_monitor(monitor, index, CODE_DELAY, delay);
if (result)
{
static struct timeval earlier = {0, 0};
struct timeval now;
gettimeofday(&now, NULL);
if (earlier.tv_sec != 0)
{
logWrite(TCPTRACE_SEND, NULL, "RTT!orange");
logWrite(TCPTRACE_SEND, NULL, "RTT!line %.6f %d %.6f %d",
earlier.tv_sec + earlier.tv_usec/1000000000.0,
last_delays[index],
now.tv_sec + now.tv_usec/1000000000.0,
last_delays[index]);
}
static struct timeval earlier = {0, 0};
struct timeval now;
logWrite(CONTROL_SEND, NULL, "Successfully sent delay measurement");
gettimeofday(&now, NULL);
if (earlier.tv_sec != 0)
{
logWrite(TCPTRACE_SEND, NULL, "RTT!orange");
logWrite(TCPTRACE_SEND, NULL, "RTT!line %.6f %d %.6f %d",
now.tv_sec + now.tv_usec/1000000000.0,
earlier.tv_sec + earlier.tv_usec/1000000000.0,
last_delays[index],
now.tv_sec + now.tv_usec/1000000000.0,
delay);
earlier = now;
last_delays[index]);
}
logWrite(TCPTRACE_SEND, NULL, "RTT!orange");
logWrite(TCPTRACE_SEND, NULL, "RTT!line %.6f %d %.6f %d",
now.tv_sec + now.tv_usec/1000000000.0,
last_delays[index],
now.tv_sec + now.tv_usec/1000000000.0,
delay);
earlier = now;
}
// printf("Sent delay: %ld\n", delays[i]);
last_delays[index] = delay;
delays[index] = 0;
delay_count[index] = 0;
base_rtt[index] = LONG_MAX;
}
last_delays[index] = delay;
delays[index] = 0;
delay_count[index] = 0;
base_rtt[index] = LONG_MAX;
return 1;
return result;
}
int send_bandwidth_to_monitor(int monitor, int index)
......@@ -910,7 +928,7 @@ int send_bandwidth_to_monitor(int monitor, int index)
bandwidth = max_throughput[index];
} else if (bandwidth_method == BANDWIDTH_BUFFER) {
bandwidth = throughputTick(&throughput[index]);
if (buffer_full[index] != 1 && bandwidth <= last_through[index])
if (buffer_full[index] == 0 && bandwidth <= last_through[index])
{
bandwidth = last_through[index];
}
......@@ -1051,6 +1069,9 @@ int send_monitor(int sockfd) {
// if (result == 1) {
// result = for_each_to_monitor(send_delay_list_to_monitor, sockfd);
// }
if (result == 1) {
result = for_each_to_monitor(send_max_delay_to_monitor, sockfd);
}
return result;
}
......@@ -1169,6 +1190,7 @@ void handle_packet_buffer(struct timeval * deadline, fd_set * write_fds_copy)
{
// There are bytes that were unwritten.
buffer_full[index] = 1;
logWrite(DELAY_DETAIL, NULL, "Buffer Full");
}
++write_buffer_index;
......@@ -1210,7 +1232,7 @@ void handle_packet_buffer(struct timeval * deadline, fd_set * write_fds_copy)
}
if (delta <= 0)
{
fprintf(stderr, "Delta is below 0! delta: %d, packet.delta %d"
fprintf(stderr, "Delta is below 0! delta: %d, packet.delta %ld"
", write_delta_total: %d\n",
delta, packet.delta, write_delta_total);
clean_exit(1);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment