stub.h 10.2 KB
Newer Older
1

2 3 4
#ifndef _STUB_H
#define _STUB_H

5 6 7 8 9
#ifdef __cplusplus
extern "C"
{
#endif

10 11 12 13 14
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
15
#include <stdarg.h>
16 17 18 19 20 21 22 23 24
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/wait.h>
#include <signal.h>
#include <time.h>
#include <sys/time.h>
#include <fcntl.h>
25
#include <netdb.h>
Junxing Zhang's avatar
Junxing Zhang committed
26
#include <math.h>
Junxing Zhang's avatar
Junxing Zhang committed
27
#include <pcap.h>
28
#include <netinet/if_ether.h>
Junxing Zhang's avatar
Junxing Zhang committed
29
#include <net/ethernet.h>
30 31
#include <netinet/ether.h>
#include <netinet/ip.h>
Junxing Zhang's avatar
Junxing Zhang committed
32 33
#include <netinet/udp.h>
#include <netinet/tcp.h>
Junxing Zhang's avatar
Junxing Zhang committed
34 35

#define STDIN 0 // file descriptor for standard input
36
#define QUANTA 500    //feed-loop interval in msec
37
#define MONITOR_PORT 4200 //the port the monitor connects to
38 39 40 41 42
#define SENDER_PORT  3491 //the port the stub senders connect to
#define PENDING_CONNECTIONS  10  //the pending connections the queue will hold
#define CONCURRENT_SENDERS   50  //concurrent senders the stub maintains
#define CONCURRENT_RECEIVERS 50  //concurrent receivers the stub maintains
#define MAX_PAYLOAD_SIZE     64000 //size of the traffic payload
43

Junxing Zhang's avatar
Junxing Zhang committed
44 45 46
#define MAX_TCPDUMP_LINE     256 //the max line size of the tcpdump output
#define SIZEOF_LONG sizeof(long) //message bulding block
#define BANDWIDTH_OVER_THROUGHPUT 0 //the safty margin for estimating the available bandwidth
Junxing Zhang's avatar
Junxing Zhang committed
47
#define SNIFF_WINSIZE 131071 //from min(net.core.rmem_max, max(net.ipv4.tcp_rmem)) on Plab linux
48
#define SNIFF_TIMEOUT 0 //in msec
49 50

//magic numbers
51 52 53
#define CODE_BANDWIDTH  0x00000001
#define CODE_DELAY      0x00000002
#define CODE_LOSS       0x00000003
54
#define CODE_LIST_DELAY 0x00000004
55
#define CODE_MAX_DELAY  0x00000005
56

57
//magic numbers for alternative algorithms
58 59 60
#define BANDWIDTH_AVERAGE 0
#define BANDWIDTH_MAX 1
#define BANDWIDTH_VEGAS 2
61
#define BANDWIDTH_BUFFER 3
62

63
#define MONITOR_RECORD_SIZE (sizeof(long)*3 + sizeof(unsigned short)*3)
64

65 66 67 68 69 70 71 72 73 74 75
#define PACKET_WRITE 1
#define PACKET_SEND_BUFFER 2
#define PACKET_RECEIVE_BUFFER 3

enum { FAILED_LOOKUP = -1 };
// The int is the index of the connection.
typedef void (*handle_index)(int);
// The first int is the socket of the monitor.
// The second int is the index of the connection.
// This returns 1 for success and 0 for failure.
typedef int (*send_to_monitor)(int, int);
76

77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
// Information about each write that is going to happen.
typedef struct
{
  long size;
  // delta is a time difference in milliseconds
  long delta;
} pending_write;

// The total number of writes we will queue up before discarding.
enum { PENDING_SIZE = 40 };

// The data structure which hold the writes pending for a particular connection
typedef struct
{
  // This should start out false. If it is false, then the rest of the
  // data in this struct is undefined.
  int is_pending;
  // When is the earliest moment at which we should try a write?
  struct timeval deadline;
  // When did the last write occur? Used to determine inter-write times.
  struct timeval last_write;
  // The list of the actual writes themselves. This is a circular
  // queue. When it runs out of room it overwrites the oldest pending
  // write.
  pending_write writes[PENDING_SIZE];
102
  // The index of the current write under consideration.
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
  int current_index;
  // The index of the next free slot. This may be the same as
  // 'current_index'. If this is so, then the write indexed by
  // 'current_index' is the oldest write pending and is to be
  // overridden if another write comes along.
  int free_index;
} pending_list;

// Initializes the pending write structure.
void init_pending_list(int index, long size, struct timeval time);

// Adds a pending write onto the tail of the list.
void push_pending_write(int index, pending_write current);

// Removes the oldest pending write.
void pop_pending_write(int index);

120
typedef struct {
121 122 123
  short  valid;
  int    sockfd;
  unsigned long ip;
124 125 126
  unsigned short stub_port;
  unsigned short source_port;
  unsigned short dest_port;
Junxing Zhang's avatar
Junxing Zhang committed
127
  time_t last_usetime; //last monitor access time
128
  pending_list pending; // What writes are pending?
129 130
} connection;
typedef struct {
131 132 133
  struct timeval captime;
  unsigned long  seq_start;
  unsigned long  seq_end;
134
  unsigned int   pkt_size;
135 136
} sniff_record;
typedef struct {
137
  sniff_record records[SNIFF_WINSIZE];
138 139
  int start; //circular buffer pointers
  int end;
140 141
} sniff_path;
typedef struct {
142 143
  unsigned int loss_counter; //in terms of packet
  unsigned int total_counter;
144 145 146 147 148 149 150 151 152 153 154
} loss_record;
typedef struct delay_sample {
  struct delay_sample *next;
  unsigned long       value;
  struct timeval       time;
} delay_sample;
typedef struct {
  delay_sample *head;
  delay_sample *tail;
  int  sample_number;
} delay_record;
155

156
extern short  flag_debug;
157 158
extern short flag_standalone;
extern int bandwidth_method;
Junxing Zhang's avatar
Junxing Zhang committed
159
extern int pcapfd;
160 161
extern int maxfd;
extern connection snddb[CONCURRENT_SENDERS];
162
extern connection rcvdb[CONCURRENT_RECEIVERS];
163 164
extern sniff_path sniff_rcvdb[CONCURRENT_RECEIVERS];
extern unsigned long delays[CONCURRENT_RECEIVERS]; //delay is calculated at the sender side
165
extern unsigned long last_delays[CONCURRENT_RECEIVERS];
166
extern unsigned long delay_count[CONCURRENT_RECEIVERS];
167
extern loss_record loss_records[CONCURRENT_RECEIVERS]; //loss is calculated at the sender side
168
extern unsigned long last_loss_rates[CONCURRENT_RECEIVERS]; //loss per billion
169
extern delay_record delay_records[CONCURRENT_RECEIVERS]; //delay is calculated at the sender side
170 171
extern int is_live;

172
extern int last_through[CONCURRENT_RECEIVERS];
173 174
extern int buffer_full[CONCURRENT_RECEIVERS];

175 176
extern unsigned long max_throughput[CONCURRENT_RECEIVERS];
extern unsigned long base_rtt[CONCURRENT_RECEIVERS];
177

178 179 180
extern int max_delay[CONCURRENT_RECEIVERS];
extern int last_max_delay[CONCURRENT_RECEIVERS];

Junxing Zhang's avatar
Junxing Zhang committed
181
extern void sniff(void);
182
extern void init_pcap(int to_ms, unsigned short port, char * device);
183
extern void append_delay_sample(int path_id, long sample_value,
184
                                struct timeval const * timestamp);
185 186 187 188 189
extern void remove_delay_samples(int path_id);
extern void clean_exit(int);
extern void update_stats(void);
extern unsigned int received_stat(void);
extern unsigned int dropped_stat(void);
190

191 192 193 194 195
typedef struct
{
  unsigned int firstUnknown;
  unsigned int nextSequence;
  unsigned int ackSize;
196
  unsigned int fullAckSize; //full packet size
197
  unsigned int repeatSize;
198 199
  struct timeval beginTime;
  struct timeval endTime;
200
  int isValid;
201 202 203 204 205 206 207
} ThroughputAckState;

extern ThroughputAckState throughput[CONCURRENT_RECEIVERS];

// Returns the number of acknowledged bytes since the last
// throughputTick() call.
extern unsigned int throughputTick(ThroughputAckState * state);
208
extern void throughputInit(ThroughputAckState * state, unsigned int sequence,
209
                           struct timeval const * firstTime);
210
extern unsigned int bytesThisTick(ThroughputAckState * state);
211

212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229
// Add a potential sender to the pool.
void add_empty_sender(int index);

// Initialize a receiver or sender connection.
void init_connection(connection * conn);

// Run function on the index of every valid sender that is readable.
void for_each_readable_sender(handle_index function, fd_set * read_fds_copy);

// Try to find the sender by the IP address and the source port of the
// actual connection being created by the seinding peer. If there is a
// sender, the socket is closed and replaced with the sockfd
// argument. If the sender cannot be found, create a new sender based
// on the sockfd argument.
// Returns FAILED_LOOKUP if the sender cannot be found and there are
// no empty slots.
int replace_sender_by_stub_port(unsigned long ip, unsigned short stub_port,
                                int sockfd, fd_set * read_fds);
230

231 232
// Remove the index from the database, invalidating it.
void remove_sender_index(int index, fd_set * read_fds);
233

234 235
// Add a potential receiver to the pool.
void add_empty_receiver(int index);
236

237 238 239
// Run function on the index of each writable receiver which has some
// bytes pending.
void for_each_pending(handle_index function, fd_set * write_fds_copy);
240

241 242 243
// Run function on each index which will send an update to the
// monitor. Returns 1 for success and 0 for failure.
int for_each_to_monitor(send_to_monitor function, int monitor);
244

245 246 247 248 249 250
// Find the index of the receiver based on the IP address of the
// receiver, and the source and destination ports of the Emulab
// connection.
// Returns FAILED_LOOKUP on failure or the index of the receiver.
int find_by_address(unsigned long ip, unsigned short source_port,
                    unsigned short dest_port);
251 252


253 254 255 256 257 258 259
// Find the index of the receiver based on the above criteria. If this
// fails, create a new connection to the IP address.
// Returns FAILED_LOOKUP on failure or the index of the receiver.
// Failure happens when the receiver is not already in the database
// and there are no more empty slots in the database.
int insert_by_address(unsigned long ip, unsigned short source_port,
                      unsigned short dest_port);
260

261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294
// Insert a fake entry for purely monitoring purposes.
int insert_fake(unsigned long ip, unsigned short port);

// Reconnect to a receiver. Resets the socket, the stub_port, and the
// sniff records.
void reconnect_receiver(int index);


// Reset a receiver to point to the ip source_port and dest_port
// specified. Note that this does not change the socket or stub_port
// at all (call reconnect for that). Nor does it change the sniff records.
void reset_receive_records(int index, unsigned long ip,
                           unsigned short source_port,
                           unsigned short dest_port);

// Find the index of the receiver based on the IP address and the
// source port number of the stub connection.
// Returns FAILED_LOOKUP on failure.
int find_by_stub_port(unsigned long ip, unsigned short stub_port);

// Put the index into the pending category.
void set_pending(int index, fd_set * write_fds);

// Remove the index from the pending category.
void clear_pending(int index, fd_set * write_fds);

// Remove a receiver from the database, invalidating it.
void remove_index(int index, fd_set * write_fds);

#ifdef __cplusplus
}
#endif

#endif