All new accounts created on Gitlab now require administrator approval. If you invite any collaborators, please let Flux staff know so they can approve the accounts.

Commit 5fbb1830 authored by Jonathon Duerig's avatar Jonathon Duerig

Fixed bug where a single packet in a quantum caused an infinite loop. Fixed a...

Fixed bug where a single packet in a quantum caused an infinite loop. Fixed a bug where only socket buffer size updated in a quanum caused an infinite loop. Outstanding issue: Odd emulab side latency. Way too low.
parent 18b67d2a
......@@ -41,11 +41,6 @@ extern "C"
#define CONCURRENT_RECEIVERS 50 //concurrent receivers the stub maintains
#define MAX_PAYLOAD_SIZE 64000 //size of the traffic payload
// This is the low water mark of the send buffer. That is, if select
// says that a write buffer is writable, this is the minimum amount of
// buffer space available.
#define LOW_WATER_MARK 8192
#define MAX_TCPDUMP_LINE 256 //the max line size of the tcpdump output
#define SIZEOF_LONG sizeof(long) //message bulding block
#define BANDWIDTH_OVER_THROUGHPUT 0 //the safty margin for estimating the available bandwidth
......
......@@ -207,6 +207,7 @@ void packet_buffer_advance(void)
}
packet_info * write_buffer = NULL;
int write_delta_total = 0;
int write_buffer_size = 0;
int write_buffer_index = 0;
......@@ -759,19 +760,32 @@ int receive_monitor(int sockfd, struct timeval * deadline) {
}
write_buffer = malloc(destnum * sizeof(packet_info));
write_buffer_size = destnum;
write_buffer_index = 0;
write_delta_total = 0;
for (i = 0; i < destnum; ++i)
{
char * packet_pos = packet_buffer + i*MONITOR_RECORD_SIZE;
packet_info * write_pos = write_buffer + i;
read_packet_info(write_pos, packet_pos);
process_control_packet(*write_pos);
write_delta_total += write_pos->delta;
}
free(packet_buffer);
gettimeofday(deadline, NULL);
while (write_buffer[write_buffer_index].type != PACKET_WRITE)
write_buffer_index = 0;
while ((write_buffer_index != write_buffer_size)
&& (write_buffer[write_buffer_index].type != PACKET_WRITE))
{
write_buffer_index = (write_buffer_index + 1) % write_buffer_size;
++write_buffer_index;
}
if (write_buffer_index == write_buffer_size)
{
// This means that the only things in this list were state
// changes. And we've got that taken care of. So no write buffer.
free(write_buffer);
write_buffer = NULL;
write_buffer_index = 0;
write_buffer_size = 0;
write_delta_total = 0;
}
logWrite(CONTROL_RECEIVE, NULL, "Finished processing buffer from monitor");
#endif
......@@ -1141,6 +1155,7 @@ void handle_packet_buffer(struct timeval * deadline, fd_set * write_fds_copy)
while (deadline->tv_sec < now.tv_sec ||
(deadline->tv_sec == now.tv_sec && deadline->tv_usec < now.tv_usec))
{
int delta = 0;
index = insert_by_address(packet.ip, packet.source_port,
packet.dest_port);
if (index == -1)
......@@ -1156,14 +1171,37 @@ void handle_packet_buffer(struct timeval * deadline, fd_set * write_fds_copy)
buffer_full[index] = 1;
}
write_buffer_index = (write_buffer_index + 1) % write_buffer_size;
packet = write_buffer[write_buffer_index];
while (packet.type != PACKET_WRITE)
++write_buffer_index;
if (write_buffer_index == write_buffer_size)
{
// If we are going to wraparound, then we need to add the
// delay of the remaining quanta time. This is to prevent a
// single write from causing link saturation, for instance.
write_buffer_index = 0;
delta = QUANTA - write_delta_total;
if (delta < 0)
{
delta = 0;
}
}
while (write_buffer[write_buffer_index].type != PACKET_WRITE)
{
write_buffer_index = (write_buffer_index + 1) % write_buffer_size;
packet = write_buffer[write_buffer_index];
++write_buffer_index;
if (write_buffer_index == write_buffer_size)
{
// If we are going to wraparound, then we need to add the
// delay of the remaining quanta time. This is to prevent a
// single write from causing link saturation, for instance.
write_buffer_index = 0;
delta = QUANTA - write_delta_total;
if (delta < 0)
{
delta = 0;
}
}
}
deadline->tv_usec += packet.delta * 1000;
delta += packet.delta;
deadline->tv_usec += delta * 1000;
if (deadline->tv_usec > 1000000)
{
deadline->tv_sec += deadline->tv_usec / 1000000;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment