Commit e8e5b666 authored by Junxing Zhang's avatar Junxing Zhang

add the reset and initialization code in stubd.c

change stub-monitor.c to work with the new stubd
fixed one typo in stub-pcap.c
parent 09e9b15b
...@@ -27,10 +27,11 @@ void send_stub(int sockfd, char *addr, char *buf) { ...@@ -27,10 +27,11 @@ void send_stub(int sockfd, char *addr, char *buf) {
struct in_addr address; struct in_addr address;
unsigned long tmpulong; unsigned long tmpulong;
tmpulong = htonl(127L); tmpulong = htonl(127L); //no use for now
memcpy(buf, &tmpulong, SIZEOF_LONG); memcpy(buf, &tmpulong, SIZEOF_LONG);
tmpulong = htonl(1L); tmpulong = htonl(1L); //1 destinate
memcpy(buf+SIZEOF_LONG, &tmpulong, SIZEOF_LONG); memcpy(buf+SIZEOF_LONG, &tmpulong, SIZEOF_LONG);
inet_aton(addr, &address); inet_aton(addr, &address);
tmpulong = address.s_addr; tmpulong = address.s_addr;
if (flag_debug) { if (flag_debug) {
...@@ -38,8 +39,13 @@ void send_stub(int sockfd, char *addr, char *buf) { ...@@ -38,8 +39,13 @@ void send_stub(int sockfd, char *addr, char *buf) {
printf("send the stub a probing address: %s \n", inet_ntoa(address)); printf("send the stub a probing address: %s \n", inet_ntoa(address));
} }
memcpy(buf+SIZEOF_LONG+SIZEOF_LONG, &tmpulong, SIZEOF_LONG); memcpy(buf+SIZEOF_LONG+SIZEOF_LONG, &tmpulong, SIZEOF_LONG);
tmpulong = htonl(5L); //interdeparture 5 ms
memcpy(buf+3*SIZEOF_LONG, &tmpulong, SIZEOF_LONG);
tmpulong = htonl(80L); //packet size 80
memcpy(buf+4*SIZEOF_LONG, &tmpulong, SIZEOF_LONG);
//should use send_all()! //should use send_all()!
if (send(sockfd, buf, 3*SIZEOF_LONG, 0) == -1){ if (send(sockfd, buf, 5*SIZEOF_LONG, 0) == -1){
perror("ERROR: send_stub() - send()"); perror("ERROR: send_stub() - send()");
exit(1); exit(1);
} }
...@@ -62,10 +68,10 @@ void receive_stub(int sockfd, char *buf) { ...@@ -62,10 +68,10 @@ void receive_stub(int sockfd, char *buf) {
int have_time(struct timeval *start_tvp, struct timeval *left_tvp){ int have_time(struct timeval *start_tvp, struct timeval *left_tvp){
struct timeval current_tv; struct timeval current_tv;
long left_usec, past_usec; long long left_usec, past_usec; //64-bit integer
gettimeofday(&current_tv, NULL); gettimeofday(&current_tv, NULL);
past_usec = (current_tv.tv_sec-start_tvp->tv_sec)*1000000+ past_usec = ((long long)(current_tv.tv_sec-start_tvp->tv_sec))*1000000+
(current_tv.tv_usec-start_tvp->tv_usec); (current_tv.tv_usec-start_tvp->tv_usec);
left_usec = QUANTA*1000-past_usec; //QUANTA is in msec left_usec = QUANTA*1000-past_usec; //QUANTA is in msec
if (left_usec > 0) { if (left_usec > 0) {
...@@ -109,14 +115,14 @@ int main(int argc, char *argv[]) ...@@ -109,14 +115,14 @@ int main(int argc, char *argv[])
ip = inet_ntoa(addr); ip = inet_ntoa(addr);
strcpy(public_addr0, ip); strcpy(public_addr0, ip);
if (flag_debug) { if (flag_debug) {
printf("public_addr0: %s", inet_ntoa(addr)); printf("public_addr0: %s \n", public_addr0);
} }
hp = gethostbyname(public_hostname1); hp = gethostbyname(public_hostname1);
bcopy(hp->h_addr, &addr, hp->h_length); bcopy(hp->h_addr, &addr, hp->h_length);
ip = inet_ntoa(addr); ip = inet_ntoa(addr);
strcpy(public_addr1, ip); strcpy(public_addr1, ip);
if (flag_debug) { if (flag_debug) {
printf("public_addr1: %s", inet_ntoa(addr)); printf("public_addr1: %s \n", public_addr1);
} }
...@@ -182,7 +188,7 @@ int main(int argc, char *argv[]) ...@@ -182,7 +188,7 @@ int main(int argc, char *argv[])
if (flag_debug) { if (flag_debug) {
printf("send to: %s \n", public_addr0); printf("send to: %s \n", public_addr0);
} }
send_stub(sockfd0, public_addr1, buf); //feed the stub with the private or public address send_stub(sockfd0, private_addr1, buf); //feed the stub with the private or public address
flag_send_stub0=1; flag_send_stub0=1;
} }
...@@ -190,7 +196,7 @@ int main(int argc, char *argv[]) ...@@ -190,7 +196,7 @@ int main(int argc, char *argv[])
if (flag_debug) { if (flag_debug) {
printf("send to: %s \n", public_addr1); printf("send to: %s \n", public_addr1);
} }
send_stub(sockfd1, public_addr0, buf); //feed the stub with the private or public address send_stub(sockfd1, private_addr0, buf); //feed the stub with the private or public address
flag_send_stub1=1; flag_send_stub1=1;
} }
......
...@@ -141,7 +141,7 @@ int nnmod(int num, int modulus){ ...@@ -141,7 +141,7 @@ int nnmod(int num, int modulus){
//Check if the seq is in the sequence block [seq_start, seq_end) //Check if the seq is in the sequence block [seq_start, seq_end)
//Take account of the seq wrap-arround //Take account of the seq wrap-arround
int in_seqence_block(unsigned long seq_start, unsigned long seq_end, unsigned long seq) { int in_sequence_block(unsigned long seq_start, unsigned long seq_end, unsigned long seq) {
if (seq_start < seq_end) { if (seq_start < seq_end) {
if (seq_start<=seq && seq<seq_end) if (seq_start<=seq && seq<seq_end)
return 1; //in range return 1; //in range
...@@ -200,7 +200,7 @@ int search_sniff_rcvdb(int path_id, u_long seqnum) { ...@@ -200,7 +200,7 @@ int search_sniff_rcvdb(int path_id, u_long seqnum) {
int next = path->start; int next = path->start;
while (next != (path->end)){ while (next != (path->end)){
if (in_seqence_block(path->records[next].seq_start, path->records[next].seq_end, seqnum)){ if (in_sequence_block(path->records[next].seq_start, path->records[next].seq_end, seqnum)){
return next; return next;
} }
next = nnmod(next+1, SNIFF_WINSIZE); next = nnmod(next+1, SNIFF_WINSIZE);
......
...@@ -21,7 +21,6 @@ loss_record loss_records[CONCURRENT_RECEIVERS]; //loss is calculated at the send ...@@ -21,7 +21,6 @@ loss_record loss_records[CONCURRENT_RECEIVERS]; //loss is calculated at the send
unsigned long last_loss_rates[CONCURRENT_RECEIVERS]; //loss per billion unsigned long last_loss_rates[CONCURRENT_RECEIVERS]; //loss per billion
connection snddb[CONCURRENT_SENDERS]; connection snddb[CONCURRENT_SENDERS];
unsigned long throughputs[CONCURRENT_SENDERS], last_throughputs[CONCURRENT_SENDERS];
fd_set read_fds,write_fds; fd_set read_fds,write_fds;
int maxfd; int maxfd;
...@@ -147,16 +146,21 @@ void init_random_buffer(void) ...@@ -147,16 +146,21 @@ void init_random_buffer(void)
} }
} }
//Initialize or reset state varialbes related to a receiver connection
void reset_rcv_entry(int i) {
rcvdb[i].valid = 0;
loss_records[i].loss_counter=0;
loss_records[i].total_counter=0;
last_loss_rates[i]=0;
delays[i]=0;
last_delays[i]=0;
}
void init(void) { void init(void) {
int i; int i;
for (i=0; i<CONCURRENT_RECEIVERS; i++){ for (i=0; i<CONCURRENT_RECEIVERS; i++){
rcvdb[i].valid = 0; reset_rcv_entry(i);
loss_records[i].loss_counter=0;
loss_records[i].total_counter=0;
last_loss_rates[i]=0;
delays[i]=0;
last_delays[i]=0;
} }
for (i=0; i<CONCURRENT_SENDERS; i++){ for (i=0; i<CONCURRENT_SENDERS; i++){
snddb[i].valid = 0; snddb[i].valid = 0;
...@@ -191,12 +195,18 @@ int insert_db(unsigned long ip, int sockfd, int dbtype) { ...@@ -191,12 +195,18 @@ int insert_db(unsigned long ip, int sockfd, int dbtype) {
} }
} }
if (db[next].valid == 1) { if (db[next].valid == 1) {
if (dbtype == 0 ) {
//if it is a rcvdb record, reset the corresponding sniff_rcvdb record
sniff_rcvdb[next].start= sniff_rcvdb[next].end;
}
FD_CLR(db[next].sockfd, &read_fds);
close(db[next].sockfd); close(db[next].sockfd);
if (dbtype == 0 ) { //if rcvdb
//reset related state variables
sniff_rcvdb[next].start = 0;
sniff_rcvdb[next].end = 0;
throughput[next].isValid = 0;
FD_CLR(db[next].sockfd, &write_fds);
reset_rcv_entry(next);
} else { //if snddb
FD_CLR(db[next].sockfd, &read_fds);
}
} }
db[next].valid = 1; db[next].valid = 1;
db[next].ip = ip; db[next].ip = ip;
...@@ -254,11 +264,7 @@ int get_rcvdb_index(unsigned long destaddr){ ...@@ -254,11 +264,7 @@ int get_rcvdb_index(unsigned long destaddr){
perror("connect"); perror("connect");
clean_exit(1); clean_exit(1);
} }
// update maxfd
if (sockfd > maxfd)
{
maxfd = sockfd;
}
dbindex=insert_db(destaddr, sockfd, 0); //insert rcvdb dbindex=insert_db(destaddr, sockfd, 0); //insert rcvdb
} }
return dbindex; return dbindex;
...@@ -315,64 +321,68 @@ int send_all(int sockfd, char *buf, int size) { ...@@ -315,64 +321,68 @@ int send_all(int sockfd, char *buf, int size) {
void receive_sender(int i) { void receive_sender(int i) {
char inbuf[MAX_PAYLOAD_SIZE]; char inbuf[MAX_PAYLOAD_SIZE];
//unsigned long tmpulong, sndsec, sndusec;
//struct timeval rcvtime;
if (recv(snddb[i].sockfd, inbuf, MAX_PAYLOAD_SIZE, 0)== 0) { //connection closed if (recv(snddb[i].sockfd, inbuf, MAX_PAYLOAD_SIZE, 0)== 0) { //connection closed
snddb[i].valid = 0; snddb[i].valid = 0; //no additional clean-up because no other state varialbe is related
FD_CLR(snddb[i].sockfd, &read_fds); FD_CLR(snddb[i].sockfd, &read_fds);
//additional clean-up !!
} else {
/* outdated since we use sniff for delay measurement now
gettimeofday(&rcvtime, NULL);
memcpy(&tmpulong, inbuf, SIZEOF_LONG);
sndsec = ntohl(tmpulong);
memcpy(&tmpulong, inbuf+SIZEOF_LONG, SIZEOF_LONG);
sndusec = ntohl(tmpulong);
delays[i] = (rcvtime.tv_sec-sndsec)*1000+floor((rcvtime.tv_usec-sndusec)/1000+0.5);
if (flag_debug) printf("One Way Delay (msec): %ld \n",delays[i]);
*/
} }
} }
int debug_fd = 0;
void send_receiver(unsigned long destaddr, long size, fd_set * write_fds_copy){ void send_receiver(unsigned long destaddr, long size, fd_set * write_fds_copy){
int index; int index;
int sockfd; int sockfd;
int error = 1; int error = 1, retry=0;
struct in_addr addr;
if (size > MAX_PAYLOAD_SIZE) if (size > MAX_PAYLOAD_SIZE){
{
printf("size exceeded MAX_PAYLOAD_SIZE\n"); printf("size exceeded MAX_PAYLOAD_SIZE\n");
size = MAX_PAYLOAD_SIZE; size = MAX_PAYLOAD_SIZE;
} }
if (size <= 0) if (size <= 0){
{
size = 1; size = 1;
} }
index = get_rcvdb_index(destaddr); index = get_rcvdb_index(destaddr);
sockfd= rcvdb[index].sockfd; sockfd= rcvdb[index].sockfd;
//send packets //if (select says its ok to write) || (we just created this connection)
// if (select says its ok to write) || (we just created this connection) if (FD_ISSET(sockfd, write_fds_copy) || !FD_ISSET(sockfd, &write_fds)){
if (FD_ISSET(sockfd, write_fds_copy) || !FD_ISSET(sockfd, &write_fds))
{
FD_SET(sockfd, &write_fds);
debug_fd = sockfd;
error = send_all(sockfd, random_buffer, size); error = send_all(sockfd, random_buffer, size);
} }
while (error == 0){ //rcv conn closed while (error==0 && retry<3){ //rcv conn closed
// TODO: What reset stuff needs to go here? //clear up the failed connection
rcvdb[index].valid = 0; if (!FD_ISSET(sockfd, &write_fds)) { //new connection
FD_CLR(rcvdb[index].sockfd, &write_fds); rcvdb[index].valid = 0;
} else { //existent connection
//reset the related state variables
sniff_rcvdb[index].start = 0;
sniff_rcvdb[index].end = 0;
throughput[index].isValid = 0;
FD_CLR(rcvdb[index].sockfd, &write_fds);
reset_rcv_entry(index);
}
//try again
index = get_rcvdb_index(destaddr); index = get_rcvdb_index(destaddr);
sockfd= rcvdb[index].sockfd; sockfd= rcvdb[index].sockfd;
FD_SET(sockfd, &write_fds);
throughput[index].isValid = 0;
error = send_all(sockfd, random_buffer, size); error = send_all(sockfd, random_buffer, size);
retry++;
} //while
//if no success for 3 tries, clean up and report the error
if (error == 0) {
rcvdb[index].valid = 0;
addr.s_addr = destaddr;
printf("Error: send_receiver() - failed send to %s three times. \n", inet_ntoa(addr));
} else {
//if a new connection succeeds, set the fds
if (error!=0 && !FD_ISSET(sockfd, &write_fds)){
FD_SET(sockfd, &write_fds);
if (sockfd > maxfd) {
maxfd = sockfd;
}
}
} }
} }
...@@ -629,7 +639,7 @@ int main(int argc, char *argv[]) { ...@@ -629,7 +639,7 @@ int main(int argc, char *argv[]) {
// Send out packets to our peers if the deadline has passed. // Send out packets to our peers if the deadline has passed.
handle_packet_buffer(&packet_deadline, &write_fds_copy); handle_packet_buffer(&packet_deadline, &write_fds_copy);
//handle new sender packets //receive from existent senders
for (i=0; i<CONCURRENT_SENDERS; i++){ for (i=0; i<CONCURRENT_SENDERS; i++){
if (snddb[i].valid==1 && FD_ISSET(snddb[i].sockfd, &read_fds_copy)) { if (snddb[i].valid==1 && FD_ISSET(snddb[i].sockfd, &read_fds_copy)) {
receive_sender(i); receive_sender(i);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment