Commit 86992f2c authored by Jonathon Duerig's avatar Jonathon Duerig

Added a capped pending buffer which retains write and timing information....

Added a capped pending buffer which retains write and timing information. Minor reporting fixes. When I checked out start-experiment, it said that there was an RCS version and merged the changes into my version. Therefore checking in this version should be correct. Mike should doublecheck.
parent da9a2e9c
sh instrument.sh ../iperf -i 0.5 -c elab1 -t 30
sh instrument.sh ../iperf -i 0.5 -c elab1 -t 30 -w 256k
#sh instrument.sh ../iperf -c elab1 -t 30
/usr/testbed/bin/tevc -e tbres/pelab now elabc-elab0 modify dest=10.0.0.2 delay=5
/usr/testbed/bin/tevc -e tbres/pelab now elabc-elab1 modify dest=10.0.0.1 delay=5
/usr/testbed/bin/tevc -e tbres/pelab now elabc-elab0 modify dest=10.0.0.2 bandwidth=100000
/usr/testbed/bin/tevc -e tbres/pelab now elabc-elab1 modify dest=10.0.0.1 bandwidth=100000
/usr/testbed/bin/tevc -e tbres/pelab now elabc-elab0 modify dest=10.0.0.2 lpr=0.0
/usr/testbed/bin/tevc -e tbres/pelab now elabc-elab1 modify dest=10.0.0.1 lpr=0.0
#/usr/testbed/bin/tevc -e tbres/pelab now planetc-planet0 modify dest=10.1.0.2 delay=5
#/usr/testbed/bin/tevc -e tbres/pelab now planetc-planet1 modify dest=10.1.0.1 delay=5
#/usr/testbed/bin/tevc -e tbres/pelab now planetc-planet0 modify dest=10.1.0.2 bandwidth=10000
#/usr/testbed/bin/tevc -e tbres/pelab now planetc-planet1 modify dest=10.1.0.1 bandwidth=10000
#/usr/testbed/bin/tevc -e tbres/pelab now planetc-planet0 modify dest=10.1.0.2 lpr=0.0
#/usr/testbed/bin/tevc -e tbres/pelab now planetc-planet1 modify dest=10.1.0.1 lpr=0.0
/usr/testbed/bin/tevc -e tbres/pelab now rlink-router0 modify dest=10.4.0.1 delay=$1
/usr/testbed/bin/tevc -e tbres/pelab now rlink-router1 modify dest=10.1.0.1 delay=$1
/usr/testbed/bin/tevc -e tbres/pelab now rlink-router0 modify dest=10.4.0.1 bandwidth=$2
/usr/testbed/bin/tevc -e tbres/pelab now rlink-router1 modify dest=10.1.0.1 bandwidth=$2
/usr/testbed/bin/tevc -e tbres/pelab-generated now elabc-elab-1 modify dest=10.0.0.2 delay=5
/usr/testbed/bin/tevc -e tbres/pelab-generated now elabc-elab-2 modify dest=10.0.0.1 delay=5
/usr/testbed/bin/tevc -e tbres/pelab-generated now elabc-elab-1 modify dest=10.0.0.2 bandwidth=10000
/usr/testbed/bin/tevc -e tbres/pelab-generated now elabc-elab-2 modify dest=10.0.0.1 bandwidth=10000
/usr/testbed/bin/tevc -e tbres/pelab-generated now elabc-elab-1 modify dest=10.0.0.2 lpr=0.0
/usr/testbed/bin/tevc -e tbres/pelab-generated now elabc-elab-2 modify dest=10.0.0.1 lpr=0.0
/usr/testbed/bin/tevc -e tbres/pelab-generated now plabc-plab-1 modify dest=10.1.0.2 delay=$1
/usr/testbed/bin/tevc -e tbres/pelab-generated now plabc-plab-2 modify dest=10.1.0.1 delay=$1
/usr/testbed/bin/tevc -e tbres/pelab-generated now plabc-plab-1 modify dest=10.1.0.2 bandwidth=1500
/usr/testbed/bin/tevc -e tbres/pelab-generated now plabc-plab-2 modify dest=10.1.0.1 bandwidth=1500
/usr/testbed/bin/tevc -e tbres/pelab-generated now plabc-plab-1 modify dest=10.1.0.2 lpr=0.0
/usr/testbed/bin/tevc -e tbres/pelab-generated now plabc-plab-2 modify dest=10.1.0.1 lpr=0.0
#/usr/testbed/bin/tevc -e tbres/pelab now rlink-router0 modify dest=10.4.0.1 delay=$1
#/usr/testbed/bin/tevc -e tbres/pelab now rlink-router1 modify dest=10.1.0.1 delay=$1
#/usr/testbed/bin/tevc -e tbres/pelab now rlink-router0 modify dest=10.4.0.1 bandwidth=$2
#/usr/testbed/bin/tevc -e tbres/pelab now rlink-router1 modify dest=10.1.0.1 bandwidth=$2
#/usr/testbed/bin/tevc -e tbres/pelab now rlink-router0 modify dest=10.5.0.1 delay=$1
#/usr/testbed/bin/tevc -e tbres/pelab now rlink-router1 modify dest=10.2.0.1 delay=$1
#/usr/testbed/bin/tevc -e tbres/pelab now rlink-router0 modify dest=10.5.0.1 bandwidth=$2
#/usr/testbed/bin/tevc -e tbres/pelab now rlink-router1 modify dest=10.2.0.1 bandwidth=$2
#/usr/testbed/bin/tevc -e tbres/pelab now rlink-router0 modify delay=$1
#/usr/testbed/bin/tevc -e tbres/pelab now rlink-router1 modify delay=$1
#/usr/testbed/bin/tevc -e tbres/pelab now rlink-router0 modify bandwidth=$2
#/usr/testbed/bin/tevc -e tbres/pelab now rlink-router1 modify bandwidth=$2
......@@ -53,7 +53,7 @@ if (defined($options{"s"})) {
if ($stub_cmdargs eq $UNKNOWN) {
$stub_cmdargs = $options{"s"};
} else {
$stub_cmdargs .= " " . $options{"s"};
$stub_cmdargs = " " . $options{"s"};
}
}
if (defined($options{"M"})) {
......
......@@ -87,8 +87,8 @@ static void logPrefix(int flags, struct timeval const * timestamp)
gettimeofday(&now, NULL);
timeptr = &now;
}
fprintf(logFile, "%d.%d ", (int)(timeptr->tv_sec),
(int)((timeptr->tv_usec)/1000));
fprintf(logFile, "%f ", (double)(timeptr->tv_sec) +
((timeptr->tv_usec)/1000)/1000.0);
}
fprintf(logFile, ": ");
}
......
......@@ -59,7 +59,7 @@ void init_connection(connection * conn)
conn->source_port = 0;
conn->dest_port = 0;
conn->last_usetime = time(NULL);
conn->pending = 0;
conn->pending.is_pending = 0;
}
}
......@@ -100,7 +100,7 @@ int replace_sender_by_stub_port(unsigned long ip, unsigned short stub_port,
// There is already a connection. Get rid of the old connection.
int index = pos->second;
snddb[index].last_usetime = now;
snddb[index].pending = 0;
snddb[index].pending.is_pending = 0;
if (snddb[index].sockfd != -1)
{
FD_CLR(snddb[index].sockfd, read_fds);
......@@ -134,7 +134,7 @@ int replace_sender_by_stub_port(unsigned long ip, unsigned short stub_port,
}
snddb[index].sockfd = sockfd;
snddb[index].last_usetime = now;
snddb[index].pending = 0;
snddb[index].pending.is_pending = 0;
result = index;
}
return result;
......@@ -382,7 +382,6 @@ void clear_pending(int index, fd_set * write_fds)
{
pending_receivers.erase(index);
FD_CLR(rcvdb[index].sockfd, write_fds);
rcvdb[index].pending = 0;
}
}
......@@ -403,6 +402,7 @@ void remove_index(int index, fd_set * write_fds)
close(rcvdb[index].sockfd);
rcvdb[index].sockfd = -1;
}
rcvdb[index].pending.is_pending = 0;
}
}
......@@ -482,7 +482,8 @@ u_int16_t handle_IP(u_char *args, const struct pcap_pkthdr* pkthdr, const u_char
perror("getsockopt() TCP_INFO");
clean_exit(1);
}
bandwidth = (info.tcpi_snd_cwnd * info.tcpi_snd_mss * 8)
bandwidth = (info.tcpi_unacked/*tcpi_snd_cwnd*/
* info.tcpi_snd_mss * 8)
/ base_rtt[path_id];
if (bandwidth > max_throughput[path_id])
{
......@@ -490,8 +491,9 @@ u_int16_t handle_IP(u_char *args, const struct pcap_pkthdr* pkthdr, const u_char
}
logWrite(DELAY_DETAIL, NULL,
"Kernel RTT: %lu, Kernel Losses: %lu, "
"Receive Window: %lu",
info.tcpi_rtt, info.tcpi_lost, tp->window);
"Receive Window: %lu, Kernel packets out: %lu",
info.tcpi_rtt, info.tcpi_lost, tp->window,
info.tcpi_unacked);
logWrite(DELAY_DETAIL, NULL,
"Tput: %lu, cwnd: %lu, snd_MSS: %lu bytes, "
"Base RTT: %lu",
......
......@@ -77,6 +77,49 @@ typedef void (*handle_index)(int);
// This returns 1 for success and 0 for failure.
typedef int (*send_to_monitor)(int, int);
// Information about each write that is going to happen.
typedef struct
{
long size;
// delta is a time difference in milliseconds
long delta;
} pending_write;
// The total number of writes we will queue up before discarding.
enum { PENDING_SIZE = 40 };
// The data structure which hold the writes pending for a particular connection
typedef struct
{
// This should start out false. If it is false, then the rest of the
// data in this struct is undefined.
int is_pending;
// When is the earliest moment at which we should try a write?
struct timeval deadline;
// When did the last write occur? Used to determine inter-write times.
struct timeval last_write;
// The list of the actual writes themselves. This is a circular
// queue. When it runs out of room it overwrites the oldest pending
// write.
pending_write writes[PENDING_SIZE];
// The index of the current write under consideration.
int current_index;
// The index of the next free slot. This may be the same as
// 'current_index'. If this is so, then the write indexed by
// 'current_index' is the oldest write pending and is to be
// overridden if another write comes along.
int free_index;
} pending_list;
// Initializes the pending write structure.
void init_pending_list(int index, long size, struct timeval time);
// Adds a pending write onto the tail of the list.
void push_pending_write(int index, pending_write current);
// Removes the oldest pending write.
void pop_pending_write(int index);
typedef struct {
short valid;
int sockfd;
......@@ -85,7 +128,7 @@ typedef struct {
unsigned short source_port;
unsigned short dest_port;
time_t last_usetime; //last monitor access time
int pending; // How many bytes are pending to this peer?
pending_list pending; // What writes are pending?
} connection;
typedef struct {
struct timeval captime;
......
......@@ -314,45 +314,138 @@ void clean_exit(int code){
exit(code);
}
void remove_pending(int index)
int send_with_reconnect(int index, int size);
void try_pending(int index)
{
if (rcvdb[index].pending == 0)
pending_list * pending = &rcvdb[index].pending;
struct timeval now;
gettimeofday(&now, NULL);
if (pending->is_pending
&& (pending->deadline.tv_sec < now.tv_sec ||
(pending->deadline.tv_sec == now.tv_sec
&& pending->deadline.tv_usec < now.tv_usec)))
{
int size = pending->writes[pending->current_index].size;
int error = send_with_reconnect(index, size);
if (error == 0)
{
// Complete success in writing.
pop_pending_write(index);
}
else if (error > 0)
{
// Partial success in writing.
pending->writes[pending->current_index].size = error;
}
else
{
clear_pending(index, &write_fds);
// Disconnected and cannot reconnect.
// Do nothing. We don't care about this connection anymore.
}
}
}
void add_pending(int index, int size)
void init_pending_list(int index, long size, struct timeval time)
{
if (rcvdb[index].pending == 0 && size > 0)
{
set_pending(index, &write_fds);
}
rcvdb[index].pending += size;
pending_list * pending = &(rcvdb[index].pending);
if (! pending->is_pending)
{
pending->is_pending = 1;
pending->deadline = time;
pending->last_write = time;
pending->writes[0].size = size;
pending->writes[0].delta = 0;
pending->current_index = 0;
pending->free_index = 1;
set_pending(index, &write_fds);
}
}
void try_pending(int index)
void push_pending_write(int index, pending_write current)
{
int size = 0;
int error = 0;
if (rcvdb[index].pending > LOW_WATER_MARK)
pending_list * pending = &(rcvdb[index].pending);
if (pending->is_pending)
{
int used_buffer = 0;
if (pending->free_index < pending->current_index)
{
size = LOW_WATER_MARK;
used_buffer = PENDING_SIZE -
(pending->current_index - pending->free_index);
}
else
{
size = rcvdb[index].pending;
used_buffer = pending->free_index - pending->current_index;
}
error = send(rcvdb[index].sockfd, random_buffer, size, MSG_DONTWAIT);
logWrite(PEER_WRITE, NULL, "Wrote %d pending bytes", error);
if (error == -1)
logWrite(PEER_WRITE, NULL, "Pending insert. Used buffer: %d", used_buffer);
// If free_index is equal to current_index, then we are out of
// space and we want to delete the cell at current_index.
if (pending->free_index == pending->current_index)
{
perror("try_pending");
clean_exit(1);
// pop_pending_write(index);
// pending->current_index = (pending->current_index + PENDING_SIZE/2)
// % PENDING_SIZE;
int delta = pending->writes[(pending->current_index + 1)
%PENDING_SIZE].delta
- pending->writes[pending->current_index].delta;
pending->deadline.tv_usec += delta * 1000;
while (pending->deadline.tv_usec < 0)
{
pending->deadline.tv_sec -= 1;
pending->deadline.tv_usec += 1000000;
}
while (pending->deadline.tv_usec >= 1000000)
{
pending->deadline.tv_sec += 1;
pending->deadline.tv_usec -= 1000000;
}
pending->current_index = (pending->current_index + 1) % PENDING_SIZE;
}
rcvdb[index].pending -= error;
total_size += error;
remove_pending(index);
pending->writes[pending->free_index] = current;
pending->free_index = (pending->free_index + 1) % PENDING_SIZE;
}
}
void pop_pending_write(int index)
{
pending_list * pending = &(rcvdb[index].pending);
if (pending->is_pending)
{
pending->current_index = (pending->current_index + 1) % PENDING_SIZE;
if (pending->free_index == pending->current_index)
{
pending->is_pending = 0;
clear_pending(index, &write_fds);
}
else
{
long seconds = pending->writes[pending->current_index].delta / 1000;
long millis = pending->writes[pending->current_index].delta % 1000;
pending->deadline.tv_usec += millis * 1000;
pending->deadline.tv_sec += seconds
+ (pending->deadline.tv_usec / 1000000);
pending->deadline.tv_usec %= 1000000;
}
}
}
// Updates the last_write time on the pending_list structure and
// returns the difference in millisecond betweeen the old time and the
// new time.
long update_write_time(int index, struct timeval deadline)
{
long result = deadline.tv_usec / 1000
- rcvdb[index].pending.last_write.tv_usec / 1000;
result += deadline.tv_sec * 1000
- rcvdb[index].pending.last_write.tv_sec * 1000;
rcvdb[index].pending.last_write = deadline;
return result;
}
void print_header(char *buf){
......@@ -417,7 +510,27 @@ void receive_sender(int i) {
}
void send_receiver(int index, int packet_size, fd_set * write_fds_copy){
void send_receiver(int index, int packet_size, struct timeval deadline)
{
if (rcvdb[index].pending.is_pending)
{
pending_write next;
next.size = packet_size;
next.delta = update_write_time(index, deadline);
push_pending_write(index, next);
}
else
{
int error = send_with_reconnect(index, packet_size);
if (error > 0)
{
// This means that there was a successful write, but
// incomplete. So we need to set up a pending write.
init_pending_list(index, error, deadline);
}
}
/*
int sockfd;
int error = 1, retry=0;
struct in_addr addr;
......@@ -464,19 +577,92 @@ void send_receiver(int index, int packet_size, fd_set * write_fds_copy){
// printf("Total: %d, Pending: %d\n", total_size, rcvdb[index].pending);
add_pending(index, packet_size - error);
}
*/
}
// Returns the number of bytes remaining. This means a 0 if everything
// goes OK, and a positive number if some of the bytes couldn't be
// written. Returns -1 if the connection was reset and reconnection failed.
int send_with_reconnect(int index, int size)
{
int result = 0;
int bytes_remaining = size;
int done = 0;
int error = 0;
int sockfd = rcvdb[index].sockfd;
if (bytes_remaining <= 0)
{
bytes_remaining = 1;
}
while (!done && bytes_remaining > 0)
{
int retry = 0;
int write_size = bytes_remaining;
if (write_size > MAX_PAYLOAD_SIZE)
{
write_size = MAX_PAYLOAD_SIZE;
}
error = send(sockfd, random_buffer, write_size, MSG_DONTWAIT);
logWrite(PEER_WRITE, NULL, "Wrote %d bytes", error);
// Handle failed connection
while (error == -1 && errno == ECONNRESET && retry < 3) {
reconnect_receiver(index);
sockfd= rcvdb[index].sockfd;
error = send(sockfd, random_buffer, size, MSG_DONTWAIT);
logWrite(PEER_WRITE, NULL, "Wrote %d reconnected bytes", error);
retry++;
}
//if still disconnected, reset
if (error == -1 && errno == ECONNRESET) {
remove_index(index, &write_fds);
printf("Error: send_receiver() - failed send to %s three times. \n", ipToString(rcvdb[index].ip));
result = -1;
}
else if (error == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
result = bytes_remaining;
done = 1;
}
else if (error == -1) {
perror("send_receiver: send");
clean_exit(1);
}
else {
total_size += error;
bytes_remaining -= error;
if (error < write_size)
{
done = 1;
}
result = bytes_remaining;
// printf("Total: %d, Pending: %d\n", total_size, rcvdb[index].pending);
}
}
return result;
}
void change_socket_buffer_size(int sockfd, int optname, int value)
{
int newSize = 0;
int newSizeLength = sizeof(newSize);
int error = setsockopt(sockfd, SOL_SOCKET, optname, &value, sizeof(value));
if (error == -1)
{
perror("setsockopt");
clean_exit(1);
}
error = getsockopt(sockfd, SOL_SOCKET, optname, &newSize, &newSizeLength);
if (error == -1)
{
perror("getsockopt verifying setsockopt");
clean_exit(1);
}
logWrite(CONTROL_RECEIVE | DELAY_DETAIL, NULL,
"Socket buffer size is now %d", newSize);
}
void process_control_packet(packet_info packet, fd_set * write_fds_copy){
void process_control_packet(packet_info packet, struct timeval deadline){
int index = -1;
int sockfd = -1;
......@@ -491,15 +677,16 @@ void process_control_packet(packet_info packet, fd_set * write_fds_copy){
{
case PACKET_WRITE:
logWrite(CONTROL_RECEIVE, NULL, "Told to write %d bytes", packet.value);
send_receiver(index, packet.value, write_fds_copy);
send_receiver(index, packet.value, deadline);
break;
case PACKET_SEND_BUFFER:
logWrite(CONTROL_RECEIVE, NULL, "Told to set send buffer to %d bytes",
packet.value);
logWrite(CONTROL_RECEIVE | DELAY_DETAIL, NULL,
"Told to set SEND buffer to %d bytes", packet.value);
change_socket_buffer_size(sockfd, SO_SNDBUF, packet.value);
break;
case PACKET_RECEIVE_BUFFER:
logWrite(CONTROL_RECEIVE, NULL, "Told to set receive buffer to %d bytes",
logWrite(CONTROL_RECEIVE | DELAY_DETAIL, NULL,
"Told to set RECEIVE buffer to %d bytes",
packet.value);
change_socket_buffer_size(sockfd, SO_RCVBUF, packet.value);
break;
......@@ -873,7 +1060,7 @@ void handle_packet_buffer(struct timeval * deadline, fd_set * write_fds_copy)
// printf("Sending packet to %s of size %ld\n", inet_ntoa(debug_temp),
// packet.size);
process_control_packet(packet, write_fds_copy);
process_control_packet(packet, *deadline);
packet_buffer_advance();
if (packet_buffer_more())
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment