Commit 53e16304 authored by Jonathon Duerig's avatar Jonathon Duerig

Added pelab directory and starting files for the monitor and stub components.

parent c431cc86
import sys
import os
import time
import socket
import select
emulated_to_real = {'10.0.0.1' : '10.1.0.1',
'10.0.0.2' : '10.1.0.2'}
real_to_emulated = {'10.1.0.1' : '10.0.0.1',
'10.1.0.2' : '10.0.0.2'}
def main_loop():
# Initialize
quanta = 5 # in seconds
stub_address = 'planet0.pelab.tbres.emulab.net'
conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
conn.connect((stub_address, 4200))
poll = select.poll()
poll.register(sys.stdin, select.POLLIN)
poll.register(conn, select.POLLIN)
done = False
while not done:
# Reset
dest_set = set([])
max_time = time.time() + quanta
# Collect data until the next quanta boundary
while time.time() < max_time:
fdlist = poll.poll(max(max_time - time.time(), 0) * 1000)
for pos in fdlist:
if pos[0] == sys.stdin.fileno() and not done:
# A line of data from tcpdump is available.
ip = get_next_destination()
dest_set.add(ip)
elif pos[0] == conn.fileno() and not done:
# A record for change in link characteristics is available.
done = not receive_characteristic(conn)
elif not done:
sys.stdout.write('fd: ' + str(pos[0]) + ' conn-fd: ' + str(conn.fileno()) + '\n')
# Update the stub
send_destinations(conn, dest_set)
sys.stdout.write('Loop-end\n')
def get_next_destination():
line = sys.stdin.readline()
ip_list = line.split('>', 1)[1].strip().split('.', 4)
result = ip_list[0] + '.' + ip_list[1] + '.' + ip_list[2] + '.' + ip_list[3]
sys.stdout.write('dest: ' + result + '\n')
return result
def receive_characteristic(conn):
buffer = conn.recv(12)
if len(buffer) == 12:
dest = load_int(buffer[0:4])
# TODO: Map real dest back into emulated dest
command = load_int(buffer[4:8])
value = load_int(buffer[8:12])
if command == 1:
# value is bandwidth in kbps
set_bandwidth(value)
elif command == 2:
# value is delay in milliseconds
set_delay(value)
elif command == 3:
# value is packet loss in packets per billion
set_loss(value/1000000000.0)
return True
elif len(buffer) == 0:
return False
def load_int(str):
result = 0
for i in range(4):
result = result | ((ord(str[i]) & 0xff) << (8*i))
return result
def set_bandwidth(kbps):
sys.stdout.write('<event> bandwidth=' + str(kbps) + '\n')
return set_link('bandwidth=' + str(kbps))
def set_delay(milliseconds):
return set_link('delay=' + str(milliseconds) + 'ms')
def set_loss(probability):
return set_link('plr=' + str(probability))
def set_link(ending):
command_base = '/usr/testbed/bin/tevc -e tbres/pelab now link0 modify '
return os.system(command_base + ending)
def send_destinations(conn, dest_set):
sys.stdout.write('<send> ' + str(0) + ' ' + str(len(dest_set)) + ' -- '
+ str(dest_set) + '\n')
output = save_int(0) + save_int(len(dest_set))
for dest in dest_set:
real_dest = emulated_to_real[dest]
ip_list = real_dest.split('.', 3)
ip = 0
for ip_byte in ip_list:
ip = ip << 8
ip = ip | (int(ip_byte, 10) & 0xff)
output = output + save_int(ip)
conn.sendall(output)
def save_int(number):
result = ''
for i in range(4):
result = result + chr(number & 0xff)
number = number >> 8
return result
main_loop()
sudo /usr/sbin/tcpdump -n -i eth1 "!(dst host 10.0.0.1) && dst net 10" | python monitor.py
#ifndef _STUB_H
#define _STUB_H
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/wait.h>
#include <signal.h>
#include <time.h>
#include <sys/time.h>
#include <fcntl.h>
#define QUANTA 5000000 // feed-loop interval in usec
#define CONTROL_PORT 4200 // the port users will be connecting to
#define TRAFFIC_PORT 3491 // the port client will be connecting to
#define CONCURRENT_SENDERS 50 // how many pending connections queue will hold
#define CONCURRENT_RECEIVERS 50 // how many pending connections queue will hold
#define MAX_PKTSIZE 100 // conservative for now
#define SIZEOF_LONG sizeof(long)
struct connection {
short valid;
int sockfd;
unsigned long ip;
time_t last_usetime;
};
typedef struct connection connection;
#endif
/*
** stubrcv.c
*/
#include "stub.h"
void print_header(char *buf){
int i, j, len=14;
printf("Buffer header len: %d \n", len);
for (i=0; i<len; i++){
j = buf[i] & 0x000000ff;
printf("%d: %u \n", i, buf[i]);
}
}
void receive_packets(int sockfd_snd) {
char buf[MAX_PKTSIZE];
int numbytes;
unsigned long tmpulong, sndsec, sndusec, delayusec;
struct timeval application_sendtime;
if ((numbytes=recv(sockfd_snd, buf, MAX_PKTSIZE, 0)) == -1) {
perror("recv");
exit(1);
}
if (getenv("Debug")!=NULL) print_header(buf);
//get the application send time at the first eight bytes
gettimeofday(&application_sendtime, NULL);
memcpy(&tmpulong, buf, sizeof(long));
sndsec = ntohl(tmpulong);
memcpy(&tmpulong, buf+sizeof(long), sizeof(long));
sndusec = ntohl(tmpulong);
delayusec = (application_sendtime.tv_sec-sndsec)*1000000+
(application_sendtime.tv_usec-sndusec);
printf("One Way Delay (usec): %ld \n",delayusec);
}
void sigchld_handler(int s)
{
while(waitpid(-1, NULL, WNOHANG) > 0);
}
int main(void)
{
int sockfd_snd, sockfd_rcv;
struct sockaddr_in my_addr; // my address information
struct sockaddr_in their_addr; // connector's address information
socklen_t sin_size;
struct sigaction sa;
int yes=1;
if ((sockfd_rcv = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
perror("socket");
exit(1);
}
if (setsockopt(sockfd_rcv, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)) == -1) {
perror("setsockopt");
exit(1);
}
my_addr.sin_family = AF_INET; // host byte order
my_addr.sin_port = htons(TRAFFIC_PORT); // short, network byte order
my_addr.sin_addr.s_addr = INADDR_ANY; // automatically fill with my IP
memset(&(my_addr.sin_zero), '\0', 8); // zero the rest of the struct
if (getenv("Debug")!=NULL) printf("Listen on %s\n",inet_ntoa(my_addr.sin_addr));
if (bind(sockfd_rcv, (struct sockaddr *)&my_addr, sizeof(struct sockaddr)) == -1) {
perror("bind");
exit(1);
}
if (listen(sockfd_rcv, CONCURRENT_RECEIVERS) == -1) {
perror("listen");
exit(1);
}
sa.sa_handler = sigchld_handler; // reap all dead processes
sigemptyset(&sa.sa_mask);
sa.sa_flags = SA_RESTART;
if (sigaction(SIGCHLD, &sa, NULL) == -1) {
perror("sigaction");
exit(1);
}
sin_size = sizeof(struct sockaddr_in);
while(1) { // main accept() loop
if ((sockfd_snd = accept(sockfd_rcv, (struct sockaddr *)&their_addr, &sin_size)) == -1) {
perror("accept");
continue;
}
if (getenv("Debug")!=NULL) printf("server: got connection from %s\n",inet_ntoa(their_addr.sin_addr));
if (!fork()) { // this is the child process
close(sockfd_rcv); // child doesn't need the listener
receive_packets(sockfd_snd);
close(sockfd_snd);
exit(0);
}
close(sockfd_snd); // parent doesn't need this
}
close(sockfd_rcv);
return 0;
}
/*
** stubsnd.c
*/
#include "stub.h"
connection db[CONCURRENT_RECEIVERS];
void init_db(void) {
int i;
for (i=0; i<CONCURRENT_RECEIVERS; i++){
db[i].valid = 0;
}
}
void insert_db(unsigned long ip, int sockfd) {
int i, next = -1;
time_t now = time(NULL);
double thisdiff, maxdiff = 0;
//find an unused entry or LRU entry
for (i=0; i<CONCURRENT_RECEIVERS; i++){
if (db[i].valid == 0) {
next = i;
break;
} else {
thisdiff = difftime(now, db[i].last_usetime);
if (thisdiff > maxdiff) {
maxdiff = thisdiff;
next = i;
}
}
}
if (db[next].valid == 1) {
close(db[next].sockfd);
}
db[next].valid = 1;
db[next].ip = ip;
db[next].sockfd= sockfd;
db[next].last_usetime = now;
}
int search_db(unsigned long indexip){
int i;
for (i=0; i<CONCURRENT_RECEIVERS; i++){
if (db[i].valid==1 && db[i].ip == indexip) {
db[i].last_usetime = time(NULL);
return db[i].sockfd ;
}
}
return -1; //no sockfd is -1
}
void clean_exit(int code){
int i;
for (i=0; i<CONCURRENT_RECEIVERS; i++){
if (db[i].valid == 1){
close(db[i].sockfd);
}
}
exit(code);
}
/* -------------------------------------------------------------------
* returns the TCP window size (on the sending buffer, SO_SNDBUF),
* or -1 on error.
* ------------------------------------------------------------------- */
int getsock_tcp_windowsize( int inSock )
{
int rc;
int theTCPWin = 0;
socklen_t len;
int mySock = inSock;
#ifdef SO_SNDBUF
if ( inSock < 0 ) {
/* no socket given, return system default
* allocate our own new socket */
mySock = socket( AF_INET, SOCK_STREAM, 0 );
}
/* send buffer -- query for buffer size */
len = sizeof( theTCPWin );
rc = getsockopt( mySock, SOL_SOCKET, SO_SNDBUF,
(char*) &theTCPWin, &len );
if ( rc < 0 ) {
return rc;
}
if ( inSock < 0 ) {
/* we allocated our own socket, so deallocate it */
close( mySock );
}
#endif
return theTCPWin;
} /* end getsock_tcp_windowsize */
void print_header(char *buf){
int i, j, len=14;
printf("Buffer header len: %d \n", len);
for (i=0; i<len; i++){
j = buf[i] & 0x000000ff;
printf("%d: %u \n", i, j);
}
}
int receive_message(int sockfd_monitor, char *buf) {
int numbytes;
struct timeval tv;
fd_set readset;
unsigned long timeout;
timeout = QUANTA/2; //QUANTA should be even
tv.tv_sec = timeout/1000000;
tv.tv_usec= timeout%1000000;
FD_ZERO(&readset);
FD_SET(sockfd_monitor, &readset);
bzero(buf, MAX_PKTSIZE);
//poll for feed-in
if (select(sockfd_monitor+1, &readset, NULL, NULL, &tv) >0) {
if ((numbytes=recv(sockfd_monitor, buf, MAX_PKTSIZE, 0)) == -1) {
return -1; //no data
}
if (numbytes == 0){
return 0; //socket closed
}
if (getenv("Debug")!=NULL) {
print_header(buf);
printf("numbytes: %d \n", numbytes);
}
return 1; //received message
} //if
return -1; //no data
}
void send_message(int sockfd_monitor, char *buf) {
char feedback[] = "test";
struct timeval tv;
fd_set writeset;
unsigned long timeout;
timeout = QUANTA/2; //QUANTA should be even
tv.tv_sec = timeout/1000000;
tv.tv_usec= timeout%1000000;
FD_ZERO(&writeset);
FD_SET(sockfd_monitor, &writeset);
//poll for feed-back
if (select(sockfd_monitor+1, NULL, &writeset, NULL, &tv) >0) {
printf("poll receivers. \n"); //poll func
memcpy(buf, feedback, sizeof(feedback));
if (send(sockfd_monitor, buf, MAX_PKTSIZE, 0) == -1){
perror("ERROR: send_messages() - send()");
clean_exit(1);
}
} //if
}
void send_packets(unsigned long destaddr, char *packet_buffer){
int sockfd_traffic, i;
struct sockaddr_in their_addr; // connector's address information
struct timeval application_sendtime;
unsigned long tmpulong;
if ((sockfd_traffic=search_db(destaddr)) == -1) {
if ((sockfd_traffic = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
perror("socket");
clean_exit(1);
}
their_addr.sin_family = AF_INET; // host byte order
their_addr.sin_port = htons(TRAFFIC_PORT); // short, network byte order
their_addr.sin_addr.s_addr = destaddr;
memset(&(their_addr.sin_zero), '\0', 8); // zero the rest of the struct
if (getenv("Debug")!=NULL) printf("Try to connect to %s \n", inet_ntoa(their_addr.sin_addr));
if (connect(sockfd_traffic, (struct sockaddr *)&their_addr, sizeof(struct sockaddr)) == -1) {
perror("connect");
clean_exit(1);
}
insert_db(destaddr, sockfd_traffic);
}
srandom(getpid());
for (i=0; i<MAX_PKTSIZE; i++) packet_buffer[i]=(char)(random()&0x000000ff);
//put the application send time at the first eight bytes
gettimeofday(&application_sendtime, NULL);
tmpulong = htonl(application_sendtime.tv_sec);
memcpy(packet_buffer, &tmpulong, sizeof(long));
tmpulong = htonl(application_sendtime.tv_usec);
memcpy(packet_buffer+sizeof(long), &tmpulong, sizeof(long));
if (send(sockfd_traffic, packet_buffer, MAX_PKTSIZE, 0) == -1){
perror("ERROR: send_packets() - send()");
clean_exit(1);
}
}
void sigchld_handler(int s)
{
while(waitpid(-1, NULL, WNOHANG) > 0);
}
int main(int argc, char *argv[]) {
int sockfd_control, sockfd_monitor;
struct sigaction sa;
char buf[MAX_PKTSIZE];
struct sockaddr_in their_addr; // connector's address information
struct sockaddr_in my_addr; // my address information
socklen_t sin_size;
int yes=1, i, longsz, rcvflag;
unsigned long tmpulong, destaddr, destnum;
char *nextptr;
if ((sockfd_control = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
perror("socket");
exit(1);
}
if (setsockopt(sockfd_control, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)) == -1) {
perror("setsockopt");
exit(1);
}
my_addr.sin_family = AF_INET;
my_addr.sin_port = htons(CONTROL_PORT);
my_addr.sin_addr.s_addr = INADDR_ANY; // automatically fill with my IP
memset(&(my_addr.sin_zero), '\0', 8); // zero the rest of the struct
if (bind(sockfd_control, (struct sockaddr *)&my_addr, sizeof(struct sockaddr)) == -1) {
perror("bind");
exit(1);
}
if (listen(sockfd_control, 1) == -1) {
perror("listen");
exit(1);
}
sin_size = sizeof(struct sockaddr_in);
init_db();
sa.sa_handler = sigchld_handler; // reap all dead processes
sigemptyset(&sa.sa_mask);
sa.sa_flags = SA_RESTART;
if (sigaction(SIGCHLD, &sa, NULL) == -1) {
perror("sigaction");
exit(1);
}
longsz = sizeof(long);
while(1) { // main accept() loop
if ((sockfd_monitor = accept(sockfd_control, (struct sockaddr *)&their_addr, &sin_size)) == -1) {
perror("accept");
continue;
}
if (getenv("Debug")!=NULL) printf("sender: got connection from %s\n",inet_ntoa(their_addr.sin_addr));
//Make the monitor socket non-blocking
if (fcntl(sockfd_monitor, F_SETFL, O_NONBLOCK)<0){
perror("fcntl(sockfd_monitor, F_SETFL, O_NONBLOCK):");
exit(-1);
}
while (1) {
rcvflag = receive_message(sockfd_monitor, buf);
if (rcvflag == 0) break; //socket closed by the peer
if (rcvflag > 0) {
nextptr = buf+longsz;
memcpy(&tmpulong, nextptr, longsz);
destnum = ntohl(tmpulong);
nextptr += longsz;
for (i=0; i<destnum; i++){
memcpy(&tmpulong, nextptr, longsz);
destaddr = tmpulong; //address should stay in Network Order!
nextptr += longsz;
if (!fork()) { // this is the child process
close(sockfd_control); // child doesn't need the listener
close(sockfd_monitor);
send_packets(destaddr, buf);
clean_exit(0);
}
} //for
} //if
send_message(sockfd_monitor, buf);
} //while receive
close(sockfd_monitor);
}
close(sockfd_control);
return 0;
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment