Commit 4fe2cec0 authored by Junxing Zhang's avatar Junxing Zhang

add changes at 12/27 for the fork version

parent 58a82518
......@@ -15,14 +15,22 @@
#include <time.h>
#include <sys/time.h>
#include <fcntl.h>
#include <netdb.h>
#define QUANTA 5000000 // feed-loop interval in usec
#define CONTROL_PORT 4200 // the port users will be connecting to
#define CONTROL_PORT 3490 // the port users will be connecting to
#define TRAFFIC_PORT 3491 // the port client will be connecting to
#define CONCURRENT_SENDERS 50 // how many pending connections queue will hold
#define CONCURRENT_RECEIVERS 50 // how many pending connections queue will hold
#define MAX_PKTSIZE 100 // conservative for now
#define SIZEOF_LONG sizeof(long)
#define SIZEOF_LONG sizeof(long) //message bulding block
//magic numbers
#define CODE_TRAFFIC 0x01000000
#define CODE_INQUIRY 0x02000000
#define CODE_BANDWIDTH 0x00000001
#define CODE_DELAY 0x00000002
#define CODE_LOSS 0x00000003
struct connection {
......
......@@ -4,6 +4,8 @@
#include "stub.h"
//Global
void print_header(char *buf){
int i, j, len=14;
......@@ -15,27 +17,73 @@ void print_header(char *buf){
}
void receive_packets(int sockfd_snd) {
char buf[MAX_PKTSIZE];
int numbytes;
unsigned long tmpulong, sndsec, sndusec, delayusec;
struct timeval application_sendtime;
unsigned long delay; //delay in usec
// unsigned long loss; //loss*1000000
// unsigned long abw; //abw in kbps
char inbuf[MAX_PKTSIZE], outbuf[3*SIZEOF_LONG];
int numbytes, longsz, nleft;
unsigned long tmpulong, sndsec, sndusec, pkttype;
struct timeval application_sendtime;
struct hostent *my_hostent;
char hostname[128];
char *next;
if ((numbytes=recv(sockfd_snd, buf, MAX_PKTSIZE, 0)) == -1) {
perror("recv");
exit(1);
}
if (getenv("Debug")!=NULL) print_header(buf);
//get the application send time at the first eight bytes
gettimeofday(&application_sendtime, NULL);
memcpy(&tmpulong, buf, sizeof(long));
sndsec = ntohl(tmpulong);
memcpy(&tmpulong, buf+sizeof(long), sizeof(long));
sndusec = ntohl(tmpulong);
delayusec = (application_sendtime.tv_sec-sndsec)*1000000+
(application_sendtime.tv_usec-sndusec);
printf("One Way Delay (usec): %ld \n",delayusec);
longsz = sizeof(long);
//blocking until the sender closes the socket
while ((numbytes=recv(sockfd_snd, inbuf, longsz, 0)) != 0) {
//read packet type first
if ( numbytes == -1) {
perror("recv");
exit(1);
}
//get the rcv timestamp in case it is traffic
gettimeofday(&application_sendtime, NULL);
//check the packet type
memcpy(&tmpulong, inbuf, longsz);
pkttype = ntohl(tmpulong);
if (pkttype == CODE_TRAFFIC) {
//read the rest of the traffic packet
next = inbuf;
nleft= MAX_PKTSIZE-longsz;
while (nleft > 0) {
if ((numbytes=recv(sockfd_snd, next, nleft, 0)) == -1) {
perror("recv");
exit(1);
}
nleft -= numbytes;
next += numbytes;
}
//get the traffic send time
memcpy(&tmpulong, inbuf, longsz);
sndsec = ntohl(tmpulong);
memcpy(&tmpulong, inbuf+longsz, longsz);
sndusec = ntohl(tmpulong);
delay = (application_sendtime.tv_sec-sndsec)*1000000+
(application_sendtime.tv_usec-sndusec);
if (getenv("Debug")!=NULL) printf("One Way Delay (usec): %ld \n",delay);
} else {
//inquiry
gethostname(hostname, sizeof(hostname));
if ((my_hostent=gethostbyname(hostname)) == NULL) { // get the host info
herror("gethostbyname");
exit(1);
}
memcpy(outbuf, my_hostent->h_addr, SIZEOF_LONG);
tmpulong = htonl(CODE_DELAY);
memcpy(outbuf+SIZEOF_LONG, &tmpulong, SIZEOF_LONG);
tmpulong = htonl(delay);
memcpy(outbuf+SIZEOF_LONG+SIZEOF_LONG, &tmpulong, SIZEOF_LONG);
if (send(sockfd_snd, outbuf, 3*SIZEOF_LONG, 0) == -1){
perror("ERROR: send_packets() - send()");
exit(1);
}
//loss and bw come here
} //if
} //while
}
void sigchld_handler(int s)
......@@ -45,63 +93,63 @@ void sigchld_handler(int s)
int main(void)
{
int sockfd_snd, sockfd_rcv;
struct sockaddr_in my_addr; // my address information
struct sockaddr_in their_addr; // connector's address information
socklen_t sin_size;
struct sigaction sa;
int yes=1;
if ((sockfd_rcv = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
perror("socket");
exit(1);
}
if (setsockopt(sockfd_rcv, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)) == -1) {
perror("setsockopt");
exit(1);
}
my_addr.sin_family = AF_INET; // host byte order
my_addr.sin_port = htons(TRAFFIC_PORT); // short, network byte order
my_addr.sin_addr.s_addr = INADDR_ANY; // automatically fill with my IP
memset(&(my_addr.sin_zero), '\0', 8); // zero the rest of the struct
if (getenv("Debug")!=NULL) printf("Listen on %s\n",inet_ntoa(my_addr.sin_addr));
if (bind(sockfd_rcv, (struct sockaddr *)&my_addr, sizeof(struct sockaddr)) == -1) {
perror("bind");
exit(1);
}
if (listen(sockfd_rcv, CONCURRENT_RECEIVERS) == -1) {
perror("listen");
exit(1);
}
sa.sa_handler = sigchld_handler; // reap all dead processes
sigemptyset(&sa.sa_mask);
sa.sa_flags = SA_RESTART;
if (sigaction(SIGCHLD, &sa, NULL) == -1) {
perror("sigaction");
exit(1);
}
sin_size = sizeof(struct sockaddr_in);
while(1) { // main accept() loop
if ((sockfd_snd = accept(sockfd_rcv, (struct sockaddr *)&their_addr, &sin_size)) == -1) {
perror("accept");
continue;
}
if (getenv("Debug")!=NULL) printf("server: got connection from %s\n",inet_ntoa(their_addr.sin_addr));
if (!fork()) { // this is the child process
close(sockfd_rcv); // child doesn't need the listener
receive_packets(sockfd_snd);
close(sockfd_snd);
exit(0);
}
close(sockfd_snd); // parent doesn't need this
}
close(sockfd_rcv);
return 0;
int sockfd_snd, sockfd_rcv;
struct sockaddr_in my_addr; // my address information
struct sockaddr_in their_addr; // connector's address information
socklen_t sin_size;
struct sigaction sa;
int yes=1;
if ((sockfd_rcv = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
perror("socket");
exit(1);
}
if (setsockopt(sockfd_rcv, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)) == -1) {
perror("setsockopt");
exit(1);
}
my_addr.sin_family = AF_INET; // host byte order
my_addr.sin_port = htons(TRAFFIC_PORT); // short, network byte order
my_addr.sin_addr.s_addr = INADDR_ANY; // automatically fill with my IP
memset(&(my_addr.sin_zero), '\0', 8); // zero the rest of the struct
if (getenv("Debug")!=NULL) printf("Listen on %s\n",inet_ntoa(my_addr.sin_addr));
if (bind(sockfd_rcv, (struct sockaddr *)&my_addr, sizeof(struct sockaddr)) == -1) {
perror("bind");
exit(1);
}
if (listen(sockfd_rcv, CONCURRENT_RECEIVERS) == -1) {
perror("listen");
exit(1);
}
sa.sa_handler = sigchld_handler; // reap all dead processes
sigemptyset(&sa.sa_mask);
sa.sa_flags = SA_RESTART;
if (sigaction(SIGCHLD, &sa, NULL) == -1) {
perror("sigaction");
exit(1);
}
sin_size = sizeof(struct sockaddr_in);
while(1) { // main accept() loop
if ((sockfd_snd = accept(sockfd_rcv, (struct sockaddr *)&their_addr, &sin_size)) == -1) {
perror("accept");
continue;
}
if (getenv("Debug")!=NULL) printf("server: got connection from %s\n",inet_ntoa(their_addr.sin_addr));
if (!fork()) { // this is the child process
close(sockfd_rcv); // child doesn't need the listener
receive_packets(sockfd_snd);
close(sockfd_snd);
exit(0);
}
close(sockfd_snd); // parent doesn't need this
}
close(sockfd_rcv);
return 0;
}
......@@ -109,6 +109,31 @@ void print_header(char *buf){
}
}
int get_socket(unsigned long destaddr){
int sockfd_traffic;
struct sockaddr_in their_addr; // connector's address information
if ((sockfd_traffic=search_db(destaddr)) == -1) {
if ((sockfd_traffic = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
perror("socket");
clean_exit(1);
}
their_addr.sin_family = AF_INET; // host byte order
their_addr.sin_port = htons(TRAFFIC_PORT); // short, network byte order
their_addr.sin_addr.s_addr = destaddr;
memset(&(their_addr.sin_zero), '\0', 8); // zero the rest of the struct
if (getenv("Debug")!=NULL) printf("Try to connect to %s \n", inet_ntoa(their_addr.sin_addr));
if (connect(sockfd_traffic, (struct sockaddr *)&their_addr, sizeof(struct sockaddr)) == -1) {
perror("connect");
clean_exit(1);
}
insert_db(destaddr, sockfd_traffic);
}
return sockfd_traffic;
}
int receive_message(int sockfd_monitor, char *buf) {
int numbytes;
struct timeval tv;
......@@ -140,10 +165,11 @@ int receive_message(int sockfd_monitor, char *buf) {
}
void send_message(int sockfd_monitor, char *buf) {
char feedback[] = "test";
char inbuf[3*SIZEOF_LONG], outbuf[SIZEOF_LONG];
struct timeval tv;
fd_set writeset;
unsigned long timeout;
unsigned long timeout, tmpulong;
int i;
timeout = QUANTA/2; //QUANTA should be even
tv.tv_sec = timeout/1000000;
......@@ -152,50 +178,47 @@ void send_message(int sockfd_monitor, char *buf) {
FD_SET(sockfd_monitor, &writeset);
//poll for feed-back
if (select(sockfd_monitor+1, NULL, &writeset, NULL, &tv) >0) {
printf("poll receivers. \n"); //poll func
memcpy(buf, feedback, sizeof(feedback));
if (send(sockfd_monitor, buf, MAX_PKTSIZE, 0) == -1){
perror("ERROR: send_messages() - send()");
clean_exit(1);
}
} //if
tmpulong = CODE_INQUIRY;
memcpy(outbuf, &tmpulong, SIZEOF_LONG);
for (i=0; i<CONCURRENT_RECEIVERS; i++){
if (db[i].valid==1) {
if (send(db[i].sockfd, outbuf, SIZEOF_LONG, 0) == -1){
perror("ERROR: send_messages() - send() traffic");
clean_exit(1);
}
//no incomplete receive check here
if ((recv(db[i].sockfd, inbuf, 3*SIZEOF_LONG, 0)) == -1) {
perror("ERROR: send_messages() - recv()");
clean_exit(1);
}
if (select(sockfd_monitor+1, NULL, &writeset, NULL, &tv) >0) {
if (send(sockfd_monitor, inbuf, 3*SIZEOF_LONG, 0) == -1){
perror("ERROR: send_messages() - send() monitor");
clean_exit(1);
}
} //if
} //if
} //for
}
void send_packets(unsigned long destaddr, char *packet_buffer){
int sockfd_traffic, i;
struct sockaddr_in their_addr; // connector's address information
void send_packets(int sockfd_traffic, char *packet_buffer){
int i;
struct timeval application_sendtime;
unsigned long tmpulong;
if ((sockfd_traffic=search_db(destaddr)) == -1) {
if ((sockfd_traffic = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
perror("socket");
clean_exit(1);
}
their_addr.sin_family = AF_INET; // host byte order
their_addr.sin_port = htons(TRAFFIC_PORT); // short, network byte order
their_addr.sin_addr.s_addr = destaddr;
memset(&(their_addr.sin_zero), '\0', 8); // zero the rest of the struct
if (getenv("Debug")!=NULL) printf("Try to connect to %s \n", inet_ntoa(their_addr.sin_addr));
if (connect(sockfd_traffic, (struct sockaddr *)&their_addr, sizeof(struct sockaddr)) == -1) {
perror("connect");
clean_exit(1);
}
insert_db(destaddr, sockfd_traffic);
}
srandom(getpid());
for (i=0; i<MAX_PKTSIZE; i++) packet_buffer[i]=(char)(random()&0x000000ff);
//put the application send time at the first eight bytes
//put the packet type, application send time at the first eight bytes
tmpulong = htonl(CODE_TRAFFIC);
memcpy(packet_buffer, &tmpulong, SIZEOF_LONG);
gettimeofday(&application_sendtime, NULL);
tmpulong = htonl(application_sendtime.tv_sec);
memcpy(packet_buffer, &tmpulong, sizeof(long));
memcpy(packet_buffer+ SIZEOF_LONG, &tmpulong, SIZEOF_LONG);
tmpulong = htonl(application_sendtime.tv_usec);
memcpy(packet_buffer+sizeof(long), &tmpulong, sizeof(long));
memcpy(packet_buffer+SIZEOF_LONG+SIZEOF_LONG, &tmpulong, SIZEOF_LONG);
//add send loop here!
if (send(sockfd_traffic, packet_buffer, MAX_PKTSIZE, 0) == -1){
perror("ERROR: send_packets() - send()");
clean_exit(1);
......@@ -208,7 +231,7 @@ void sigchld_handler(int s)
}
int main(int argc, char *argv[]) {
int sockfd_control, sockfd_monitor;
int sockfd_control, sockfd_monitor, sockfd_traffic;
struct sigaction sa;
char buf[MAX_PKTSIZE];
struct sockaddr_in their_addr; // connector's address information
......@@ -278,11 +301,12 @@ int main(int argc, char *argv[]) {
memcpy(&tmpulong, nextptr, longsz);
destaddr = tmpulong; //address should stay in Network Order!
nextptr += longsz;
sockfd_traffic = get_socket(destaddr);
if (!fork()) { // this is the child process
close(sockfd_control); // child doesn't need the listener
close(sockfd_monitor);
send_packets(destaddr, buf);
clean_exit(0);
close(sockfd_monitor);
send_packets(sockfd_traffic, buf);
exit(0); //child doesn't close traffic connections
}
} //for
} //if
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment