stub.h 8.56 KB
Newer Older
1

2
3
4
#ifndef _STUB_H
#define _STUB_H

5
6
7
8
9
#ifdef __cplusplus
extern "C"
{
#endif

10
11
12
13
14
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
15
#include <stdarg.h>
16
17
18
19
20
21
22
23
24
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/wait.h>
#include <signal.h>
#include <time.h>
#include <sys/time.h>
#include <fcntl.h>
25
#include <netdb.h>
Junxing Zhang's avatar
Junxing Zhang committed
26
#include <math.h>
Junxing Zhang's avatar
Junxing Zhang committed
27
28
29
30
31
32
33
#include <pcap.h>
#include <netinet/if_ether.h> 
#include <net/ethernet.h>
#include <netinet/ether.h> 
#include <netinet/ip.h> 
#include <netinet/udp.h>
#include <netinet/tcp.h>
Junxing Zhang's avatar
Junxing Zhang committed
34
35

#define STDIN 0 // file descriptor for standard input
36
#define QUANTA 500    //feed-loop interval in msec
37
#define MONITOR_PORT 4200 //the port the monitor connects to
Junxing Zhang's avatar
Junxing Zhang committed
38
39
40
41
#define SENDER_PORT  3491 //the port the stub senders connect to 
#define PENDING_CONNECTIONS  10	 //the pending connections the queue will hold
#define CONCURRENT_SENDERS   50	 //concurrent senders the stub maintains
#define CONCURRENT_RECEIVERS 50	 //concurrent receivers the stub maintains
42
#define MAX_PAYLOAD_SIZE     64000 //size of the traffic payload 
43
44
45
46

// This is the low water mark of the send buffer. That is, if select
// says that a write buffer is writable, this is the minimum amount of
// buffer space available.
47
#define LOW_WATER_MARK 8192
48

Junxing Zhang's avatar
Junxing Zhang committed
49
50
51
#define MAX_TCPDUMP_LINE     256 //the max line size of the tcpdump output
#define SIZEOF_LONG sizeof(long) //message bulding block
#define BANDWIDTH_OVER_THROUGHPUT 0 //the safty margin for estimating the available bandwidth
Junxing Zhang's avatar
Junxing Zhang committed
52
#define SNIFF_WINSIZE 131071 //from min(net.core.rmem_max, max(net.ipv4.tcp_rmem)) on Plab linux
53
#define SNIFF_TIMEOUT 0 //in msec 
54
55

//magic numbers
56
57
58
#define CODE_BANDWIDTH  0x00000001 
#define CODE_DELAY      0x00000002 
#define CODE_LOSS       0x00000003 
59
60
#define CODE_LIST_DELAY 0x00000004

61
//magic numbers for alternative algorithms
62
63
64
#define BANDWIDTH_AVERAGE 0
#define BANDWIDTH_MAX 1
#define BANDWIDTH_VEGAS 2
65

66
#define MONITOR_RECORD_SIZE (sizeof(long)*3 + sizeof(unsigned short)*3)
67

68
69
70
71
72
73
74
75
76
77
78
#define PACKET_WRITE 1
#define PACKET_SEND_BUFFER 2
#define PACKET_RECEIVE_BUFFER 3

enum { FAILED_LOOKUP = -1 };
// The int is the index of the connection.
typedef void (*handle_index)(int);
// The first int is the socket of the monitor.
// The second int is the index of the connection.
// This returns 1 for success and 0 for failure.
typedef int (*send_to_monitor)(int, int);
79
80

typedef struct {
81
82
83
  short  valid;
  int    sockfd;
  unsigned long ip;
84
85
86
  unsigned short stub_port;
  unsigned short source_port;
  unsigned short dest_port;
Junxing Zhang's avatar
Junxing Zhang committed
87
  time_t last_usetime; //last monitor access time
88
  int pending; // How many bytes are pending to this peer?
89
90
} connection;
typedef struct {
91
92
93
  struct timeval captime;
  unsigned long  seq_start;
  unsigned long  seq_end;
94
  unsigned int   pkt_size;
95
96
} sniff_record;
typedef struct {
97
  sniff_record records[SNIFF_WINSIZE];
98
99
  int start; //circular buffer pointers
  int end;
100
101
} sniff_path;
typedef struct {
102
103
  unsigned int loss_counter; //in terms of packet
  unsigned int total_counter;
104
105
106
107
108
109
110
111
112
113
114
} loss_record;
typedef struct delay_sample {
  struct delay_sample *next;
  unsigned long       value;
  struct timeval       time;
} delay_sample;
typedef struct {
  delay_sample *head;
  delay_sample *tail;
  int  sample_number;
} delay_record;
115

116
extern short  flag_debug;
117
118
extern short flag_standalone;
extern int bandwidth_method;
Junxing Zhang's avatar
Junxing Zhang committed
119
extern int pcapfd;
120
121
extern int maxfd;
extern connection snddb[CONCURRENT_SENDERS];
122
extern connection rcvdb[CONCURRENT_RECEIVERS];
123
124
extern sniff_path sniff_rcvdb[CONCURRENT_RECEIVERS];
extern unsigned long delays[CONCURRENT_RECEIVERS]; //delay is calculated at the sender side
125
extern unsigned long last_delays[CONCURRENT_RECEIVERS];
126
extern unsigned long delay_count[CONCURRENT_RECEIVERS];
127
extern loss_record loss_records[CONCURRENT_RECEIVERS]; //loss is calculated at the sender side
128
extern unsigned long last_loss_rates[CONCURRENT_RECEIVERS]; //loss per billion
129
extern delay_record delay_records[CONCURRENT_RECEIVERS]; //delay is calculated at the sender side
130
131
132
133
extern int is_live;

extern unsigned long max_throughput[CONCURRENT_RECEIVERS];
extern unsigned long base_rtt[CONCURRENT_RECEIVERS];
134

Junxing Zhang's avatar
Junxing Zhang committed
135
extern void sniff(void);
136
extern void init_pcap(int to_ms, unsigned short port, char * device);
137
138
139
140
141
142
143
extern void append_delay_sample(int path_id, long sample_value,
				struct timeval const * timestamp);
extern void remove_delay_samples(int path_id);
extern void clean_exit(int);
extern void update_stats(void);
extern unsigned int received_stat(void);
extern unsigned int dropped_stat(void);
144

145
146
147
148
149
typedef struct
{
  unsigned int firstUnknown;
  unsigned int nextSequence;
  unsigned int ackSize;
150
  unsigned int fullAckSize; //full packet size
151
  unsigned int repeatSize;
152
153
  struct timeval beginTime;
  struct timeval endTime;
154
  int isValid;
155
156
157
158
159
160
161
} ThroughputAckState;

extern ThroughputAckState throughput[CONCURRENT_RECEIVERS];

// Returns the number of acknowledged bytes since the last
// throughputTick() call.
extern unsigned int throughputTick(ThroughputAckState * state);
162
163
extern void throughputInit(ThroughputAckState * state, unsigned int sequence,
			   struct timeval const * firstTime);
164
extern unsigned int bytesThisTick(ThroughputAckState * state);
165

166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
// Add a potential sender to the pool.
void add_empty_sender(int index);

// Initialize a receiver or sender connection.
void init_connection(connection * conn);

// Run function on the index of every valid sender that is readable.
void for_each_readable_sender(handle_index function, fd_set * read_fds_copy);

// Try to find the sender by the IP address and the source port of the
// actual connection being created by the seinding peer. If there is a
// sender, the socket is closed and replaced with the sockfd
// argument. If the sender cannot be found, create a new sender based
// on the sockfd argument.
// Returns FAILED_LOOKUP if the sender cannot be found and there are
// no empty slots.
int replace_sender_by_stub_port(unsigned long ip, unsigned short stub_port,
                                int sockfd, fd_set * read_fds);
184

185
186
// Remove the index from the database, invalidating it.
void remove_sender_index(int index, fd_set * read_fds);
187

188
189
// Add a potential receiver to the pool.
void add_empty_receiver(int index);
190

191
192
193
// Run function on the index of each writable receiver which has some
// bytes pending.
void for_each_pending(handle_index function, fd_set * write_fds_copy);
194

195
196
197
// Run function on each index which will send an update to the
// monitor. Returns 1 for success and 0 for failure.
int for_each_to_monitor(send_to_monitor function, int monitor);
198

199
200
201
202
203
204
// Find the index of the receiver based on the IP address of the
// receiver, and the source and destination ports of the Emulab
// connection.
// Returns FAILED_LOOKUP on failure or the index of the receiver.
int find_by_address(unsigned long ip, unsigned short source_port,
                    unsigned short dest_port);
205
206


207
208
209
210
211
212
213
// Find the index of the receiver based on the above criteria. If this
// fails, create a new connection to the IP address.
// Returns FAILED_LOOKUP on failure or the index of the receiver.
// Failure happens when the receiver is not already in the database
// and there are no more empty slots in the database.
int insert_by_address(unsigned long ip, unsigned short source_port,
                      unsigned short dest_port);
214

215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
// Insert a fake entry for purely monitoring purposes.
int insert_fake(unsigned long ip, unsigned short port);

// Reconnect to a receiver. Resets the socket, the stub_port, and the
// sniff records.
void reconnect_receiver(int index);


// Reset a receiver to point to the ip source_port and dest_port
// specified. Note that this does not change the socket or stub_port
// at all (call reconnect for that). Nor does it change the sniff records.
void reset_receive_records(int index, unsigned long ip,
                           unsigned short source_port,
                           unsigned short dest_port);

// Find the index of the receiver based on the IP address and the
// source port number of the stub connection.
// Returns FAILED_LOOKUP on failure.
int find_by_stub_port(unsigned long ip, unsigned short stub_port);

// Put the index into the pending category.
void set_pending(int index, fd_set * write_fds);

// Remove the index from the pending category.
void clear_pending(int index, fd_set * write_fds);

// Remove a receiver from the database, invalidating it.
void remove_index(int index, fd_set * write_fds);

#ifdef __cplusplus
}
#endif

#endif