Commit 9db0e590 authored by Jonathon Duerig's avatar Jonathon Duerig

Added the simple rotation model suggested by Rob. This takes care of item 1.

parent 98707d11
......@@ -120,6 +120,39 @@ void packet_buffer_add(char * buffer, int size)
}
}
void read_packet_info(packet_info * dest, char * source)
{
// Get ip
memcpy(&dest->ip, source, SIZEOF_LONG);
source += SIZEOF_LONG;
// Get source port
memcpy(&dest->source_port, source, sizeof(dest->source_port));
dest->source_port = ntohs(dest->source_port);
source += sizeof(dest->source_port);
// Get dest port
memcpy(&dest->dest_port, source, sizeof(dest->dest_port));
dest->dest_port = ntohs(dest->dest_port);
source += sizeof(dest->dest_port);
// Get delta time
memcpy(&dest->delta, source, SIZEOF_LONG);
dest->delta = ntohl(dest->delta);
source += SIZEOF_LONG;
// Get value
memcpy(&dest->value, source, SIZEOF_LONG);
dest->value = ntohl(dest->value);
source += SIZEOF_LONG;
// Get type
memcpy(&dest->type, source, sizeof(dest->type));
dest->type = ntohs(dest->type);
source += sizeof(dest->type);
}
// Get info about the next packet to send
packet_info packet_buffer_front(void)
{
......@@ -133,36 +166,8 @@ packet_info packet_buffer_front(void)
}
else
{
char * base = packet_buffer_head->buffer + packet_buffer_index;
// Get ip
memcpy(&result.ip, base, SIZEOF_LONG);
base += SIZEOF_LONG;
// Get source port
memcpy(&result.source_port, base, sizeof(result.source_port));
result.source_port = ntohs(result.source_port);
base += sizeof(result.source_port);
// Get dest port
memcpy(&result.dest_port, base, sizeof(result.dest_port));
result.dest_port = ntohs(result.dest_port);
base += sizeof(result.dest_port);
// Get delta time
memcpy(&result.delta, base, SIZEOF_LONG);
result.delta = ntohl(result.delta);
base += SIZEOF_LONG;
// Get value
memcpy(&result.value, base, SIZEOF_LONG);
result.value = ntohl(result.value);
base += SIZEOF_LONG;
// Get type
memcpy(&result.type, base, sizeof(result.type));
result.type = ntohs(result.type);
base += sizeof(result.type);
read_packet_info(&result,
packet_buffer_head->buffer + packet_buffer_index);
}
if (thisAddress != lastAddress)
{
......@@ -200,6 +205,10 @@ void packet_buffer_advance(void)
}
}
packet_info * write_buffer = NULL;
int write_buffer_size = 0;
int write_buffer_index = 0;
void init_random_buffer(void)
{
int i = 0;
......@@ -311,6 +320,10 @@ void clean_exit(int code){
}
packet_buffer_cleanup();
logCleanup();
if (write_buffer != NULL)
{
free(write_buffer);
}
exit(code);
}
......@@ -662,7 +675,11 @@ void change_socket_buffer_size(int sockfd, int optname, int value)
"Socket buffer size is now %d", newSize);
}
void process_control_packet(packet_info packet, struct timeval deadline){
void process_control_packet(packet_info packet
#ifdef USE_PACKET_BUFFER
, struct timeval deadline
#endif
){
int index = -1;
int sockfd = -1;
......@@ -676,8 +693,10 @@ void process_control_packet(packet_info packet, struct timeval deadline){
switch(packet.type)
{
case PACKET_WRITE:
#ifdef USE_PACKET_BUFFER
logWrite(CONTROL_RECEIVE, NULL, "Told to write %d bytes", packet.value);
send_receiver(index, packet.value, deadline);
#endif
break;
case PACKET_SEND_BUFFER:
logWrite(CONTROL_RECEIVE | DELAY_DETAIL, NULL,
......@@ -702,6 +721,8 @@ int receive_monitor(int sockfd, struct timeval * deadline) {
unsigned long destnum = 0;
char * packet_buffer = NULL;
int i = 0;
//receive first two longs
if (recv_all(sockfd, buf, 2*SIZEOF_LONG)==0) {
return 0;
......@@ -723,11 +744,34 @@ int receive_monitor(int sockfd, struct timeval * deadline) {
free(packet_buffer);
return 0;
}
#ifdef USE_PACKET_BUFFER
if (!packet_buffer_more())
{
gettimeofday(deadline, NULL);
}
packet_buffer_add(packet_buffer, buffer_size);
#else
logWrite(CONTROL_RECEIVE, NULL, "Processing buffer from monitor");
if (write_buffer != NULL)
{
free(write_buffer);
}
write_buffer = malloc(destnum * sizeof(packet_info));
write_buffer_size = destnum;
write_buffer_index = 0;
for (i = 0; i < destnum; ++i)
{
char * packet_pos = packet_buffer + i*MONITOR_RECORD_SIZE;
packet_info * write_pos = write_buffer + i;
read_packet_info(write_pos, packet_pos);
process_control_packet(*write_pos);
}
free(packet_buffer);
gettimeofday(deadline, NULL);
logWrite(CONTROL_RECEIVE, NULL, "Finished processing buffer from monitor");
#endif
return 1;
}
......@@ -1047,7 +1091,9 @@ void handle_packet_buffer(struct timeval * deadline, fd_set * write_fds_copy)
struct timeval now;
packet_info packet;
gettimeofday(&now, NULL);
int index = 0;
#ifdef USE_PACKET_BUFFER
if (packet_buffer_more())
{
packet = packet_buffer_front();
......@@ -1074,6 +1120,40 @@ void handle_packet_buffer(struct timeval * deadline, fd_set * write_fds_copy)
}
}
}
#else
if (write_buffer != NULL)
{
packet = write_buffer[write_buffer_index];
while (deadline->tv_sec < now.tv_sec ||
(deadline->tv_sec == now.tv_sec && deadline->tv_usec < now.tv_usec))
{
index = insert_by_address(packet.ip, packet.source_port,
packet.dest_port);
if (index == -1)
{
printf("No more connection slots.\n");
clean_exit(1);
}
logWrite(CONTROL_RECEIVE, NULL, "Told to write %d bytes", packet.value);
send_with_reconnect(index, packet.value);
write_buffer_index = (write_buffer_index + 1) % write_buffer_size;
packet = write_buffer[write_buffer_index];
while (packet.type != PACKET_WRITE)
{
write_buffer_index = (write_buffer_index + 1) % write_buffer_size;
packet = write_buffer[write_buffer_index];
}
deadline->tv_usec += packet.delta * 1000;
if (deadline->tv_usec > 1000000)
{
deadline->tv_sec += deadline->tv_usec / 1000000;
deadline->tv_usec = deadline->tv_usec % 1000000;
}
}
}
#endif
}
int have_time(struct timeval *start_tvp, struct timeval *left_tvp){
......@@ -1403,7 +1483,9 @@ int main(int argc, char *argv[]) {
// send to destinations which are writeable and are behind.
// logWrite(MAIN_LOOP, NULL, "Send pending packets to peers");
#ifdef USE_PACKET_BUFFER
for_each_pending(try_pending, &write_fds_copy);
#endif
// receive from existing senders
// logWrite(MAIN_LOOP, NULL, "Receive packets from peers");
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment