All new accounts created on Gitlab now require administrator approval. If you invite any collaborators, please let Flux staff know so they can approve the accounts.

Commit 888f222c authored by Jonathon Duerig's avatar Jonathon Duerig

Fixed a transient error discovered by Junxing. Rather, I've fixed a bug that...

Fixed a transient error discovered by Junxing. Rather, I've fixed a bug that could account for it and the error hasn't resurfaced after many runs. The transient symptom was when pcap only captured the first three packets and failed from then on.

All measurement times are now based on packet timestamps rather than gettimeofday(), throughput measurement is much more consistent now, though a more thorough evaluation is still required.

Delay is averaged over a quantum. This is probably not the right thing to do, but it is better than the alternative (latest measurement). I am still discussion with Sneha about what to do here.
parent f04e33df
......@@ -260,6 +260,10 @@ int insert_fake(unsigned long ip, unsigned short port)
last_loss_rates[index]=0;
delays[index]=0;
last_delays[index]=0;
delay_count[index]=0;
delay_records[index].head = NULL;
delay_records[index].tail = NULL;
delay_records[index].sample_number = 0;
result = index;
}
return result;
......@@ -332,6 +336,7 @@ void reconnect_receiver(int index)
last_loss_rates[index]=0;
delays[index]=0;
last_delays[index]=0;
remove_delay_samples(index);
}
void reset_receive_records(int index, unsigned long ip,
......
......@@ -40,6 +40,7 @@ struct my_ip {
sniff_path sniff_rcvdb[CONCURRENT_RECEIVERS];
pcap_t* descr;
struct pcap_stat stats;
int pcapfd;
FILE *loss_log;
......@@ -55,7 +56,8 @@ int throughputInWindow(ThroughputAckState * state, unsigned int sequence)
}
// Reset the state of a connection completely.
void throughputInit(ThroughputAckState * state, unsigned int sequence)
void throughputInit(ThroughputAckState * state, unsigned int sequence,
struct timeval const * firstTime)
{
if (state->isValid == 0)
{
......@@ -63,7 +65,9 @@ void throughputInit(ThroughputAckState * state, unsigned int sequence)
state->nextSequence = sequence;
state->ackSize = 0;
state->repeatSize = 0;
gettimeofday(&state->lastTime, NULL);
// gettimeofday(&state->lastTime, NULL);
state->beginTime = *firstTime;
state->endTime = *firstTime;
state->isValid = 1;
}
}
......@@ -96,11 +100,13 @@ void throughputProcessSend(ThroughputAckState * state, unsigned int sequence,
}
// Notify the throughput monitor that some bytes have been acknowledged.
void throughputProcessAck(ThroughputAckState * state, unsigned int sequence)
void throughputProcessAck(ThroughputAckState * state, unsigned int sequence,
struct timeval const * timestamp)
{
if (! state->isValid) {
printf("throughputProcessAck() called with invalid state\n");
}
state->endTime = *timestamp;
if (throughputInWindow(state, sequence))
{
state->ackSize += sequence - state->firstUnknown + 1;
......@@ -120,18 +126,25 @@ unsigned int throughputTick(ThroughputAckState * state)
{
double result = 0.0;
double divisor = 1.0;
struct timeval now;
gettimeofday(&now, NULL);
divisor = now.tv_sec - state->lastTime.tv_sec;
divisor += (now.tv_usec - state->lastTime.tv_usec)/1000000.0;
result = (state->ackSize * 8.0) / (divisor * 1000.0);
// struct timeval now;
// gettimeofday(&now, NULL);
divisor = state->endTime.tv_sec - state->beginTime.tv_sec;
divisor += (state->endTime.tv_usec - state->beginTime.tv_usec)/1000000.0;
if (fabs(divisor) < 0.0001)
{
result = 0.0;
}
else
{
result = (state->ackSize * 8.0) / (divisor * 1000.0);
}
// printf("ByteCount: %u\n", state->ackSize);
// printf("UnAck ByteCount: %i (%i - %i)\n",
// state->nextSequence - state->firstUnknown,
// state->nextSequence, state->firstUnknown);
state->ackSize = 0;
state->repeatSize = 0;
state->lastTime = now;
state->beginTime = state->endTime;
return (unsigned int) result;
}
......@@ -366,7 +379,7 @@ u_int16_t handle_IP(u_char *args, const struct pcap_pkthdr* pkthdr, const u_char
loss_records[path_id].total_counter++;
if (path->end == path->start){ //no previous packet
throughputInit(&throughput[path_id], seq_start);
throughputInit(&throughput[path_id], seq_start, &(pkthdr->ts));
throughputProcessSend(&throughput[path_id], seq_start, length);
return push_sniff_rcvdb(path_id, seq_start, seq_end, &(pkthdr->ts)); //new packet
} else {
......@@ -425,18 +438,22 @@ u_int16_t handle_IP(u_char *args, const struct pcap_pkthdr* pkthdr, const u_char
if (ack_bit == 1) { //has an acknowledgement
ack_seq = ntohl(tp->ack_seq);
throughputProcessAck(&throughput[path_id], ack_seq);
throughputProcessAck(&throughput[path_id], ack_seq, &(pkthdr->ts));
record_id = search_sniff_rcvdb(path_id, (unsigned long)(ack_seq-1));
if (record_id != -1) { //new ack received
if (flag_resend) { //if the ack is triggered by a resend, skip the delay calculation.
flag_resend = 0;
} else { //calculate the delay
int delay = 0;
msecs = floor((pkthdr->ts.tv_usec-sniff_rcvdb[path_id].records[record_id].captime.tv_usec)/1000.0+0.5);
delays[path_id] = (pkthdr->ts.tv_sec-sniff_rcvdb[path_id].records[record_id].captime.tv_sec)*1000 + msecs;
append_delay_sample(path_id, delays[path_id]);
delay = (pkthdr->ts.tv_sec-sniff_rcvdb[path_id].records[record_id].captime.tv_sec)*1000 + msecs;
delays[path_id] += delay;
(delay_count[path_id])++;
append_delay_sample(path_id, delay, &(pkthdr->ts));
logWrite(DELAY_DETAIL, &(pkthdr->ts),
"Delay: %lu", delays[path_id]);
"Delay: %lu, Sum: %lu, Count: %lu", delay,
delays[path_id], delay_count[path_id]);
}
pop_sniff_rcvdb(path_id, (unsigned long)(ack_seq-1)); //advance the sniff window base
} //ack in rcvdb
......@@ -571,13 +588,13 @@ void init_pcap(int to_ms, unsigned short port, char * device, int is_live) {
exit(1);
}
}
// pcapfd = pcap_fileno(descr);
pcapfd = pcap_get_selectable_fd(descr);
if (pcapfd == -1)
{
fprintf(stderr, "Error: pcap file descriptor is not selectable\n");
exit(1);
}
}
init_sniff_rcvdb();
loss_log = fopen("loss.log", "w"); //loss log
......@@ -593,3 +610,22 @@ void sniff(void) {
}
void update_stats(void)
{
int error = pcap_stats(descr, &stats);
if (error == -1)
{
pcap_perror(descr, "pcap_stats()");
clean_exit(1);
}
}
unsigned int received_stat(void)
{
return stats.ps_recv;
}
unsigned int dropped_stat(void)
{
return stats.ps_drop;
}
......@@ -50,7 +50,7 @@ extern "C"
#define SIZEOF_LONG sizeof(long) //message bulding block
#define BANDWIDTH_OVER_THROUGHPUT 0 //the safty margin for estimating the available bandwidth
#define SNIFF_WINSIZE 131071 //from min(net.core.rmem_max, max(net.ipv4.tcp_rmem)) on Plab linux
#define SNIFF_TIMEOUT QUANTA/10 //in msec
#define SNIFF_TIMEOUT 0 //in msec
//magic numbers
#define CODE_BANDWIDTH 0x00000001
......@@ -115,6 +115,7 @@ extern connection rcvdb[CONCURRENT_RECEIVERS];
extern sniff_path sniff_rcvdb[CONCURRENT_RECEIVERS];
extern unsigned long delays[CONCURRENT_RECEIVERS]; //delay is calculated at the sender side
extern unsigned long last_delays[CONCURRENT_RECEIVERS];
extern unsigned long delay_count[CONCURRENT_RECEIVERS];
extern loss_record loss_records[CONCURRENT_RECEIVERS]; //loss is calculated at the sender side
extern unsigned long last_loss_rates[CONCURRENT_RECEIVERS]; //loss per billion
extern delay_record delay_records[CONCURRENT_RECEIVERS]; //delay is calculated at the sender side
......@@ -122,8 +123,13 @@ extern delay_record delay_records[CONCURRENT_RECEIVERS]; //delay is calculated a
extern void sniff(void);
extern void init_pcap(int to_ms, unsigned short port, char * device,
int is_live);
extern void append_delay_sample(int path_id, long sample_value);
void clean_exit(int);
extern void append_delay_sample(int path_id, long sample_value,
struct timeval const * timestamp);
extern void remove_delay_samples(int path_id);
extern void clean_exit(int);
extern void update_stats(void);
extern unsigned int received_stat(void);
extern unsigned int dropped_stat(void);
typedef struct
{
......@@ -131,7 +137,8 @@ typedef struct
unsigned int nextSequence;
unsigned int ackSize;
unsigned int repeatSize;
struct timeval lastTime;
struct timeval beginTime;
struct timeval endTime;
int isValid;
} ThroughputAckState;
......@@ -140,7 +147,8 @@ extern ThroughputAckState throughput[CONCURRENT_RECEIVERS];
// Returns the number of acknowledged bytes since the last
// throughputTick() call.
extern unsigned int throughputTick(ThroughputAckState * state);
extern void throughputInit(ThroughputAckState * state, unsigned int sequence);
extern void throughputInit(ThroughputAckState * state, unsigned int sequence,
struct timeval const * firstTime);
extern unsigned int bytesThisTick(ThroughputAckState * state);
// Add a potential sender to the pool.
......
......@@ -19,13 +19,12 @@ extern int optopt;
extern int opterr;
extern int optreset;
void append_delay_sample(int path_id, long sample_value);
//Global
short flag_debug, flag_standalone;
connection rcvdb[CONCURRENT_RECEIVERS];
unsigned long delays[CONCURRENT_RECEIVERS]; //delay is calculated at the sender side
unsigned long last_delays[CONCURRENT_RECEIVERS];
unsigned long delay_count[CONCURRENT_RECEIVERS];
delay_record delay_records[CONCURRENT_RECEIVERS]; //delay list is calculated at the sender side
loss_record loss_records[CONCURRENT_RECEIVERS]; //loss is calculated at the sender side
unsigned long last_loss_rates[CONCURRENT_RECEIVERS]; //loss per billion
......@@ -198,7 +197,8 @@ void init_random_buffer(void)
}
//Append a delay sample to the tail of the ith delay-record queue.
void append_delay_sample(int path_id, long sample_value) {
void append_delay_sample(int path_id, long sample_value,
struct timeval const * timestamp) {
delay_sample *sample = malloc(sizeof(delay_sample));
if (sample == NULL) {
perror("allocate");
......@@ -206,7 +206,7 @@ void append_delay_sample(int path_id, long sample_value) {
}
sample->next = NULL;
sample->value = sample_value;
gettimeofday(&(sample->time), NULL);
sample->time = *timestamp;
if (delay_records[path_id].tail == NULL) {
delay_records[path_id].head = sample;
delay_records[path_id].tail = sample;
......@@ -688,14 +688,26 @@ int send_delay_to_monitor(int monitor, int index)
char outbuf_delay[buffer_size];
unsigned long tmpulong;
if (delay_count[index] > 0)
{
delays[index] /= delay_count[index];
}
else
{
delays[index] = last_delays[index];
}
// If measurement changed since last send
if (abs((long)delays[index] - (long)last_delays[index])
> (long)(last_delays[index]/5)) {
// if (abs((long)delays[index] - (long)last_delays[index])
// > (long)(last_delays[index]/5)) {
if (delays[index] != last_delays[index])
{
// Insert the address info
char * buf = save_receiver_address(outbuf_delay, index);
logWrite(CONTROL_SEND, NULL, "Sending delay(%d) about stream(%hu:%s:%hu)",
delays[index], ipToString(rcvdb[index].ip));
delays[index], rcvdb[index].source_port,
ipToString(rcvdb[index].ip), rcvdb[index].dest_port);
// Insert the code number for delay
tmpulong = htonl(CODE_DELAY);
......@@ -710,7 +722,7 @@ int send_delay_to_monitor(int monitor, int index)
if (send_all(monitor, outbuf_delay, buffer_size) == 0){
return 0;
}
last_delays[index] = delays[index];
logWrite(CONTROL_SEND, NULL, "Sending delay success");
{
......@@ -736,6 +748,10 @@ int send_delay_to_monitor(int monitor, int index)
}
// printf("Sent delay: %ld\n", delays[i]);
}
last_delays[index] = delays[index];
delays[index] = 0;
delay_count[index] = 0;
return 1;
}
......@@ -750,6 +766,11 @@ int send_bandwidth_to_monitor(int monitor, int index)
// Insert the address info
char * buf = save_receiver_address(outbuf, index);
logWrite(CONTROL_SEND, NULL,
"Sending bandwidth(%lukbps) about stream(%hu:%s:%hu)",
bandwidth, rcvdb[index].source_port, ipToString(rcvdb[index].ip),
rcvdb[index].dest_port);
// Insert the code number for bandwidth
memcpy(buf, &code, SIZEOF_LONG);
buf += SIZEOF_LONG;
......@@ -1014,7 +1035,7 @@ int main(int argc, char *argv[]) {
fd_set read_fds_copy, write_fds_copy;
socklen_t sin_size;
struct timeval start_tv, left_tv;
int yes=1, flag_send_monitor=0;
int yes=1;//, flag_send_monitor=0;
struct timeval packet_deadline;
struct in_addr addr;
int flag_measure=0;
......@@ -1025,6 +1046,7 @@ int main(int argc, char *argv[]) {
FILE * logfile = NULL;
unsigned long quantum_no=0;
int logflags = LOG_NOTHING;
int select_count = 0;
gettimeofday(&packet_deadline, NULL);
......@@ -1045,7 +1067,7 @@ int main(int argc, char *argv[]) {
case 'd':
flag_debug = 1; break;
case 'f':
if (logfile != NULL)
if (logfile == NULL)
{
logfile = fopen(optarg, "a");
if (logfile == NULL)
......@@ -1148,7 +1170,7 @@ int main(int argc, char *argv[]) {
// rcvdb[0].valid = 1;
standalone_port = atoi(argv[2]);
inet_aton(argv[1], &addr);
insert_fake(addr.s_addr, atoi(argv[2]));
insert_fake(addr.s_addr, standalone_port);
// rcvdb[0].ip = addr.s_addr;
// rcvdb[0].sockfd= -1; //show error if used
// rcvdb[0].last_usetime = time(NULL);
......@@ -1237,12 +1259,15 @@ int main(int argc, char *argv[]) {
//main loop - the stubd runs forever
while (1) {
flag_send_monitor=0; //reset flag for each quanta
// flag_send_monitor=0; //reset flag for each quanta
gettimeofday(&start_tv, NULL); //reset start time for each quanta
// printf("Total: %d\n", total_size);
// printf("========== Quantum %lu ==========\n", quantum_no);
logWrite(MAIN_LOOP, NULL, "Quantum %lu", quantum_no);
update_stats();
logWrite(MAIN_LOOP, NULL, "PCAP Received: %u Dropped: %u",
received_stat(), dropped_stat());
quantum_no++;
//while in a quanta
......@@ -1250,11 +1275,14 @@ int main(int argc, char *argv[]) {
read_fds_copy = read_fds;
write_fds_copy = write_fds;
if (select(maxfd+1, &read_fds_copy, &write_fds_copy, NULL, &left_tv) == -1) {
select_count = select(maxfd+1, &read_fds_copy, &write_fds_copy, NULL,
&left_tv);
if (select_count == -1)
{
perror("select");
clean_exit(1);
}
// fprintf(stderr, "Select count: %d\n", select_count);
// Send out packets to our peers if the deadline has passed.
// logWrite(MAIN_LOOP, NULL, "Send normal packets to peers");
handle_packet_buffer(&packet_deadline, &write_fds_copy);
......@@ -1307,7 +1335,7 @@ int main(int argc, char *argv[]) {
inet_ntoa(their_addr.sin_addr));
FD_CLR(sockfd_rcv_monitor, &read_fds); //allow only one monitor connection
FD_SET(sockfd_monitor, &read_fds); //check the monitor connection for read
FD_SET(sockfd_monitor, &write_fds); //check the monitor connection for write
// FD_SET(sockfd_monitor, &write_fds); //check the monitor connection for write
if (sockfd_monitor > maxfd) { //keep track of the maximum
maxfd = sockfd_monitor;
}
......@@ -1320,7 +1348,7 @@ int main(int argc, char *argv[]) {
logWrite(MAIN_LOOP, NULL, "Receive control message from monitor");
if (receive_monitor(sockfd_monitor, &packet_deadline) == 0) { //socket_monitor closed by peer
FD_CLR(sockfd_monitor, &read_fds); //stop checking the monitor socket
FD_CLR(sockfd_monitor, &write_fds);
// FD_CLR(sockfd_monitor, &write_fds);
sockfd_monitor = -1;
FD_SET(sockfd_rcv_monitor, &read_fds); //start checking the receiver control socket
if (sockfd_rcv_monitor > maxfd) { // keep track of the maximum
......@@ -1329,32 +1357,34 @@ int main(int argc, char *argv[]) {
}
}
//send measurements to the monitor once in each quanta
if (sockfd_monitor!=-1 && flag_send_monitor==0 && FD_ISSET(sockfd_monitor, &write_fds_copy)) {
logWrite(MAIN_LOOP, NULL, "Send control message to monitor");
if (send_monitor(sockfd_monitor) == 0) { //socket_monitor closed by peer
logWrite(MAIN_LOOP, NULL, "Message to monitor failed");
FD_CLR(sockfd_monitor, &read_fds); //stop checking the monitor socket
FD_CLR(sockfd_monitor, &write_fds);
sockfd_monitor = -1;
FD_SET(sockfd_rcv_monitor, &read_fds); //start checking the receiver control socket
if (sockfd_rcv_monitor > maxfd) { // keep track of the maximum
maxfd = sockfd_rcv_monitor;
}
} else {
logWrite(MAIN_LOOP, NULL, "Message to monitor succeeded");
flag_send_monitor=1;
}
}
//sniff packets
if (FD_ISSET(pcapfd, &read_fds_copy)) {
logWrite(MAIN_LOOP, NULL, "Sniff packet stream");
// logWrite(MAIN_LOOP, NULL, "Sniff packet stream");
sniff();
}
} //while in quanta
//send measurements to the monitor once in each quanta
if (sockfd_monitor!=-1)
// && FD_ISSET(sockfd_monitor, &write_fds_copy)) {
{
logWrite(MAIN_LOOP, NULL, "Send control message to monitor");
if (send_monitor(sockfd_monitor) == 0) { //socket_monitor closed by peer
logWrite(MAIN_LOOP, NULL, "Message to monitor failed");
FD_CLR(sockfd_monitor, &read_fds); //stop checking the monitor socket
// FD_CLR(sockfd_monitor, &write_fds);
sockfd_monitor = -1;
FD_SET(sockfd_rcv_monitor, &read_fds); //start checking the receiver control socket
if (sockfd_rcv_monitor > maxfd) { // keep track of the maximum
maxfd = sockfd_rcv_monitor;
}
} else {
logWrite(MAIN_LOOP, NULL, "Message to monitor succeeded");
// flag_send_monitor=1;
}
}
// In testmode, we only start printing in the quanta we first see a packet
if (flag_standalone) {
print_measurements();
......@@ -1365,7 +1395,6 @@ int main(int argc, char *argv[]) {
printf("Test done - total bytes transmitted: %llu\n",total_bytes);
break;
}
} //while forever
packet_buffer_cleanup();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment