00001
00002
00003
00004
00005 #include <stdio.h>
00006 #include <stdlib.h>
00007 #include <string.h>
00008 #include <assert.h>
00009 #include <time.h>
00010 #include <semaphore.h>
00011 #include <pthread.h>
00012 #include <sys/socket.h>
00013 #include <linux/tcp.h>
00014 #include <linux/ip.h>
00015 #include <linux/in.h>
00016
00017 #include "tcpproxy.h"
00018 #include "misc.h"
00019 #include "tcptimer.h"
00020 #include "tcp.h"
00021 #include "scheduler.h"
00022
00023 enum queue_state
00024 {
00025 QUEUE_STATE_FREE = 0,
00026 QUEUE_STATE_USED,
00027 QUEUE_STATE_CLOSED
00028 } Tqueue_state;
00029
00030
00031 typedef struct packet_sent {
00032 struct packet_queue *q;
00033 unsigned seq;
00034 unsigned size;
00035 long mtime;
00036 struct packet_sent *prev;
00037 struct packet_sent *next;
00038 } Tpacket_sent;
00039
00040
00041 typedef struct packet_queue {
00042 enum queue_state state;
00043 Tpacket_data **tab;
00044 int max;
00045 int head;
00046 int tail;
00047 unsigned in_flight;
00048 unsigned dc;
00049 unsigned weight;
00050 Tpacket_sent *sent_hd;
00051 } Tpacket_queue;
00052
00053 static int sock;
00054 static Tpacket_queue tcp_pq[TCP_MAX_QUEUES];
00055 static Tpacket_queue hipri_pq;
00056 static Tpacket_queue lowpri_pq;
00057 static pthread_mutex_t sched_mut;
00058 static sem_t send_sem;
00059 static long queued_pkts;
00060 static unsigned tcp_in_flight;
00061 static int max_tcp_in_flight = TCP_WIN_CLAMP_SIZE;
00062
00063
00064
00065
00066
00067 void init_sched()
00068 {
00069 int i;
00070 Tpacket_queue *q;
00071 int old_flags;
00072 int on = 1;
00073
00074
00075
00076 sock = socket(AF_INET, SOCK_RAW, IPPROTO_RAW);
00077
00078 if(sock == -1) {
00079 EPRINT("init_sched: error opening raw datagram socket.\n");
00080 perror("init_sched");
00081 safe_exit(EXIT_ERR_SOCKET);
00082 }
00083
00084 if(setsockopt(sock, IPPROTO_IP, IP_HDRINCL, &on, sizeof(on))) {
00085 EPRINT("init_sched: error setting IP_HDRINCL.\n");
00086 perror("init_sched");
00087 safe_exit(EXIT_ERR_SOCKET);
00088 }
00089
00090 old_flags = fcntl (sock, F_GETFL, 0);
00091 if (old_flags == -1) {
00092 EPRINT("init_sched: error reading socket flags.\n");
00093 perror("init_sched");
00094 safe_exit(EXIT_ERR_SOCKET);
00095 }
00096 old_flags &= ~O_NONBLOCK;
00097 if(fcntl(sock, F_SETFL, old_flags) == -1)
00098 {
00099 EPRINT("init_sched: error setting socket flags.\n");
00100 perror("init_sched");
00101 safe_exit(EXIT_ERR_SOCKET);
00102 }
00103
00104 pthread_mutex_init(&sched_mut, NULL);
00105 tcp_in_flight = 0;
00106
00107 if(sem_init(&send_sem, 0, 0)) {
00108 EPRINT("init_sched: error initialising semaphore.\n");
00109 safe_exit(EXIT_ERR_SEM_INIT);
00110 }
00111
00112
00113 hipri_pq.state = QUEUE_STATE_USED;
00114 hipri_pq.tab = (Tpacket_data **) xmalloc(sizeof(Tpacket_data*)*QUEUE_SIZE_HI_PRI);
00115 hipri_pq.max = QUEUE_SIZE_HI_PRI;
00116 hipri_pq.head = 0;
00117 hipri_pq.tail = 0;
00118 lowpri_pq.state = QUEUE_STATE_USED;
00119 lowpri_pq.tab = (Tpacket_data **) xmalloc(sizeof(Tpacket_data*)*QUEUE_SIZE_LOW_PRI);
00120 lowpri_pq.max = QUEUE_SIZE_LOW_PRI;
00121 lowpri_pq.head = 0;
00122 lowpri_pq.tail = 0;
00123
00124
00125 for(i=0; i<TCP_MAX_QUEUES; i++) {
00126 q = &tcp_pq[i];
00127 q->state = QUEUE_STATE_FREE;
00128 q->tab = (Tpacket_data **) xmalloc(sizeof(Tpacket_data*)*QUEUE_SIZE_TCP);
00129 q->max = QUEUE_SIZE_TCP;
00130 q->head = 0;
00131 q->tail = 0;
00132 q->in_flight = 0;
00133 q->dc = 0;
00134 q->weight = QUEUE_WEIGHT_NORMAL;
00135 q->sent_hd = NULL;
00136 }
00137
00138 queued_pkts = 0;
00139 }
00140
00141
00142
00143
00144
00145 inline int queue_size(Tpacket_queue *q)
00146 {
00147 return ((q->tail >= q->head) ?
00148 (q->tail - q->head) :
00149 (q->tail + q->max - q->head));
00150 }
00151
00152
00153
00154
00155
00156 inline int queue_empty(Tpacket_queue *q)
00157 {
00158 return (q->head == q->tail);
00159 }
00160
00161
00162
00163
00164
00165
00166
00167 void packet_timeout(void *arg)
00168 {
00169 Tpacket_sent *sent = (Tpacket_sent*)arg;
00170 assert(sent != NULL);
00171
00172 mutex_lock("scheduler", &sched_mut);
00173
00174 if(sent->size > 0) {
00175 Tpacket_queue *q = sent->q;
00176 DPRINT("packet_timeout: sent record still active, seq=%u, size=%u.\n", sent->seq, sent->size);
00177 if(q->sent_hd == sent) {
00178 q->sent_hd = sent->next;
00179 } else {
00180 assert(sent->prev != NULL);
00181 sent->prev->next = sent->next;
00182 }
00183 assert(sent->size <= q->in_flight);
00184 q->in_flight -= sent->size;
00185 tcp_in_flight -= sent->size;
00186 } else {
00187
00188 }
00189 xfree(sent);
00190 mutex_unlock("scheduler", &sched_mut);
00191 }
00192
00193
00194
00195
00196
00197
00198 void add_tcp_sent(Tpacket_data *pd, unsigned size, Tpacket_queue *q)
00199 {
00200 Tpacket_sent *sent;
00201 Tpacket_sent *ptr, *prev_ptr;
00202
00203
00204 assert(size > 0);
00205
00206 sent = (Tpacket_sent*) xmalloc(sizeof(Tpacket_sent));
00207 sent->q = q;
00208 sent->seq = ntohl(pd->trans.tcp->seq);
00209 sent->size = size;
00210 sent->mtime = get_mclock();
00211 sent->prev = sent->next = NULL;
00212
00213 prev_ptr = NULL;
00214 for(ptr=q->sent_hd; ptr; ptr=ptr->next) {
00215 if(seq_cmp(sent->seq, ptr->seq) < 0)
00216 break;
00217 prev_ptr = ptr;
00218 }
00219
00220 if(prev_ptr) {
00221 prev_ptr->next = sent;
00222 sent->prev = prev_ptr;
00223 } else {
00224 q->sent_hd = sent;
00225 }
00226 sent->next = ptr;
00227 if(ptr)
00228 ptr->prev = sent;
00229
00230 add_timer(TCP_PACKET_TIMEOUT, packet_timeout, sent);
00231 }
00232
00233
00234
00235
00236
00237
00238
00239 int alloc_tcp_queue(unsigned weight)
00240 {
00241 int i;
00242 mutex_lock("alloc_tcp_queue", &sched_mut);
00243
00244 for(i=0; i<TCP_MAX_QUEUES; i++) {
00245 if(tcp_pq[i].state == QUEUE_STATE_FREE)
00246 break;
00247 }
00248 if(i < TCP_MAX_QUEUES) {
00249 tcp_pq[i].state = QUEUE_STATE_USED;
00250 tcp_pq[i].weight = weight;
00251 DPRINT("alloc_tcp_queue: allocated tcp queue %d with weight %u.\n", i, weight);
00252 } else {
00253 DPRINT("alloc_tcp_queue: no free queues.\n");
00254 i = -1;
00255 }
00256 mutex_unlock("alloc_tcp_queue", &sched_mut);
00257
00258 return i;
00259 }
00260
00261
00262
00263
00264
00265
00266
00267 int close_tcp_queue(int queue_num)
00268 {
00269 DPRINT("close_tcp_queue: queue %d\n", queue_num);
00270 assert(queue_num >= 0 && queue_num < TCP_MAX_QUEUES);
00271
00272 mutex_lock("close_tcp_queue", &sched_mut);
00273
00274 assert(tcp_pq[queue_num].state == QUEUE_STATE_USED);
00275 tcp_pq[queue_num].state = QUEUE_STATE_CLOSED;
00276
00277 mutex_unlock("close_tcp_queue", &sched_mut);
00278
00279 return 0;
00280 }
00281
00282
00283
00284
00285
00286
00287
00288
00289 int clear_tcp_queue(int queue_num)
00290 {
00291 Tpacket_queue *q;
00292 Tpacket_sent *sent;
00293 int queued;
00294 int sent_rec;
00295 unsigned unacked_bytes;
00296 int i;
00297
00298 assert(queue_num >= 0 && queue_num < TCP_MAX_QUEUES);
00299
00300 q = &tcp_pq[queue_num];
00301 queued = queue_size(q);
00302 assert(queued >= 0);
00303
00304 DPRINT("clear_tcp_queue: clearing queue %d, discarding %d packets.\n",
00305 queue_num, queued);
00306
00307 unacked_bytes = 0;
00308 sent_rec = 0;
00309 while(q->sent_hd != NULL) {
00310 sent = q->sent_hd;
00311
00312
00313 unacked_bytes += sent->size;
00314 q->sent_hd = sent->next;
00315 sent_rec++;
00316 sent->size = 0;
00317
00318
00319 if(clear_timer(packet_timeout, sent) != -1) {
00320 xfree(sent);
00321
00322 }
00323 }
00324 if(DEBUG && sent_rec > 0)
00325 DPRINT("clear_tcp_queue: cleared %d sent records.\n", sent_rec);
00326
00327 if(q->in_flight > 0)
00328 {
00329 DPRINT("clear_tcp_queue: in_flight=%u, unacked_bytes=%u, ", q->in_flight, unacked_bytes);
00330 if(q->in_flight != unacked_bytes) {
00331 DPRINT("values do NOT match.\n");
00332 } else {
00333 DPRINT("values match ok.\n");
00334 }
00335 tcp_in_flight -= q->in_flight;
00336 q->in_flight = 0;
00337 }
00338
00339 for(i=0; i<queued; i++) {
00340 dealloc_packet_data(q->tab[q->head]);
00341 q->head = (q->head+1) % q->max;
00342 queued_pkts--;
00343 }
00344 assert(queue_empty(q));
00345 assert(queued_pkts >= 0);
00346
00347 sem_post(&send_sem);
00348
00349 return queued;
00350 }
00351
00352
00353
00354
00355
00356
00357 int reset_tcp_queue(int queue_num)
00358 {
00359
00360 assert(queue_num >= 0 && queue_num < TCP_MAX_QUEUES);
00361
00362 mutex_lock("reset_tcp_queue", &sched_mut);
00363
00364 assert(tcp_pq[queue_num].state == QUEUE_STATE_USED);
00365 clear_tcp_queue(queue_num);
00366
00367 mutex_unlock("reset_tcp_queue", &sched_mut);
00368
00369 return 0;
00370 }
00371
00372
00373
00374
00375
00376
00377
00378
00379
00380 inline int check_dealloc_tcp_queue(int queue_num)
00381 {
00382 Tpacket_queue *q;
00383
00384 assert(queue_num >= 0 && queue_num < TCP_MAX_QUEUES);
00385 q = &tcp_pq[queue_num];
00386 if(q->state == QUEUE_STATE_CLOSED && queue_empty(q) && q->in_flight == 0) {
00387 DPRINT("check_dealloc_tcp_queue: deallocating queue %d\n", queue_num);
00388 assert(q->head == q->tail);
00389 q->state = QUEUE_STATE_FREE;
00390 return 1;
00391 } else {
00392 return 0;
00393 }
00394 }
00395
00396
00397
00398
00399
00400 void cleanup_sched()
00401 {
00402 Tpacket_queue *q;
00403 int i;
00404
00405
00406
00407 for(i=0; i<TCP_MAX_QUEUES; i++) {
00408 q = &tcp_pq[i];
00409 if(q->state != QUEUE_STATE_FREE) {
00410 clear_tcp_queue(i);
00411 q->state = QUEUE_STATE_FREE;
00412 }
00413 xfree(q->tab);
00414 }
00415
00416 close(sock);
00417 sem_destroy(&send_sem);
00418 }
00419
00420
00421
00422
00423
00424 int send_packet (Tpacket_data *pd)
00425 {
00426 struct sockaddr_in dst;
00427 int len = pd->ipq_pm.data_len;
00428 int sent;
00429
00430 dst.sin_family = AF_INET;
00431
00432
00433 memcpy(&dst.sin_addr, &pd->ip->daddr, sizeof(pd->ip->daddr));
00434
00435 DPRINT("send_packet: sending packet, len=%u, ip_id=%u.\n",
00436 len, ((struct iphdr*)(pd->payload))->id);
00437 sent = sendto(sock, pd->payload, len, 0, (struct sockaddr *)&dst, sizeof dst);
00438 if(sent == -1) {
00439 EPRINT("send_packet: error sending packet.\n");
00440 perror("send_packet");
00441 } else if(sent != len) {
00442 EPRINT("send_packet: sent bytes not equal to packet size: sent=%d\n",
00443 sent, len);
00444 }
00445 dealloc_packet_data(pd);
00446
00447 return sent;
00448 }
00449
00450
00451
00452
00453
00454
00455 int queue_packet(Tpacket_data *pd, Tpacket_queue* q)
00456 {
00457 mutex_lock("queue_packet", &sched_mut);
00458
00459 if(queue_size(q) >= q->max) {
00460 DPRINT("queue_packet: queue is full, dropping packet.\n");
00461 mutex_unlock("queue_packet", &sched_mut);
00462 dealloc_packet_data(pd);
00463 return 1;
00464 }
00465
00466 q->tab[q->tail] = pd;
00467 q->tail = (q->tail+1) % q->max;
00468 queued_pkts++;
00469
00470 if(DEBUG) {
00471 int qnum = q-tcp_pq;
00472
00473 if((qnum >= 0) && (qnum < TCP_MAX_QUEUES)) {
00474 DPRINT("queue_packet: tcp queue %d, ", qnum);
00475 } else if (q == &hipri_pq) {
00476 DPRINT("queue_packet: queue hipri, ");
00477 } else if (q == &lowpri_pq) {
00478 DPRINT("queue_packet: queue lowpri, ");
00479 } else {
00480 DPRINT("queue_packet: queue unknown, ");
00481 }
00482 DPRINT("size=%d, queued_pkts=%ld\n", queue_size(q), queued_pkts);
00483 }
00484
00485 mutex_unlock("queue_packet", &sched_mut);
00486 sem_post(&send_sem);
00487
00488 return 0;
00489 }
00490
00491
00492
00493
00494
00495
00496 inline Tpacket_data* dequeue_packet(Tpacket_queue* q)
00497 {
00498 if(queue_empty(q)) {
00499 return NULL;
00500 } else {
00501 Tpacket_data *pd;
00502 pd = q->tab[q->head];
00503 q->head = (q->head+1) % q->max;
00504 queued_pkts--;
00505 assert(queued_pkts >= 0);
00506 return pd;
00507 }
00508 }
00509
00510
00511
00512
00513
00514
00515 void mh_ack_rcvd(unsigned ack_seq, int queue_num)
00516 {
00517 Tpacket_queue *q = &tcp_pq[queue_num];
00518 Tpacket_sent *sent;
00519 unsigned credit;
00520
00521 mutex_lock("mh_ack_rcvd", &sched_mut);
00522
00523 credit = 0;
00524 while(q->sent_hd != NULL && seq_cmp(ack_seq, q->sent_hd->seq) >= 1) {
00525 sent = q->sent_hd;
00526 credit += sent->size;
00527 q->sent_hd = sent->next;
00528 sent->size = 0;
00529
00530
00531
00532 if(clear_timer(packet_timeout, sent) != -1) {
00533 xfree(sent);
00534 }
00535 }
00536 if(q->sent_hd != NULL)
00537 q->sent_hd->prev = NULL;
00538
00539 assert(credit <= q->in_flight);
00540 q->in_flight -= credit;
00541 tcp_in_flight -= credit;
00542 DPRINT("mh_ack_rcvd: ack_seq=%u, queue=%d, credit=%u, new tcp_in_flight=%u(%u)\n",
00543 ack_seq, queue_num, credit, tcp_in_flight, q->in_flight);
00544
00545 check_dealloc_tcp_queue(queue_num);
00546
00547 mutex_unlock("mh_ack_rcvd", &sched_mut);
00548 sem_post(&send_sem);
00549 }
00550
00551
00552
00553
00554
00555 int queue_tcp(Tpacket_data *pd, int queue_num)
00556 {
00557 Tpacket_queue *q = &tcp_pq[queue_num];
00558 struct tcphdr *tcp = pd->trans.tcp;
00559
00560 assert(queue_num >= 0 && queue_num < TCP_MAX_QUEUES);
00561
00562 if(pd->dir == PACKET_DIR_IN) {
00563 if(tcp->syn) {
00564
00565 return queue_packet(pd, &hipri_pq);
00566 } else {
00567 return queue_packet(pd, q);
00568 }
00569 } else if(pd->dir == PACKET_DIR_OUT) {
00570 DPRINT("queue_tcp: packet dir=out, sending immediately\n");
00571 send_packet(pd);
00572 return 0;
00573 } else {
00574 assert(UNREACHABLE);
00575 }
00576 }
00577
00578
00579
00580
00581
00582 int queue_hipri(Tpacket_data *pd)
00583 {
00584 if(pd->dir == PACKET_DIR_IN) {
00585 return queue_packet(pd, &hipri_pq);
00586 } else if(pd->dir == PACKET_DIR_OUT) {
00587 DPRINT("queue_hipri: packet dir=out, sending immediately\n");
00588 send_packet(pd);
00589 return 0;
00590 } else {
00591 assert(UNREACHABLE);
00592 }
00593 }
00594
00595
00596
00597
00598
00599 int queue_lowpri(Tpacket_data *pd)
00600 {
00601 if(pd->dir == PACKET_DIR_IN) {
00602 return queue_packet(pd, &lowpri_pq);
00603 } else if(pd->dir == PACKET_DIR_OUT) {
00604 DPRINT("queue_lowpri: packet dir=out, sending immediately\n");
00605 send_packet(pd);
00606 return 0;
00607 } else {
00608 assert(UNREACHABLE);
00609 }
00610 }
00611
00612
00613
00614
00615
00616
00617
00618
00619
00620 void scheduler (void *sig_mask)
00621 {
00622 register Tpacket_queue *q;
00623 Tpacket_data *pd;
00624 unsigned tcp_size;
00625 int qnum;
00626 int wait_on_link;
00627 unsigned curr_dc;
00628
00629
00630 if(pthread_sigmask(SIG_BLOCK, sig_mask, NULL)) {
00631 EPRINT("scheduler: error blocking signals.\n");
00632 }
00633
00634 qnum = 0;
00635 curr_dc = 0;
00636
00637 while(!request_exit) {
00638 if(wait_on_link || queued_pkts == 0) {
00639 sem_wait(&send_sem);
00640 }
00641 wait_on_link = 0;
00642
00643 mutex_lock("scheduler", &sched_mut);
00644
00645
00646 while((pd = dequeue_packet(&hipri_pq)) != NULL) {
00647 send_packet(pd);
00648 DPRINT("scheduler: queue hipri, size=%d, queued_pkts=%ld\n",
00649 queue_size(&hipri_pq), queued_pkts);
00650 }
00651
00652 if(qnum >= TCP_MAX_QUEUES) {
00653 qnum = 0;
00654 }
00655 for(; qnum<TCP_MAX_QUEUES; qnum++) {
00656 q = &tcp_pq[qnum];
00657
00658 if(q->state == QUEUE_STATE_FREE) {
00659 continue;
00660
00661 } else if(!queue_empty(q)) {
00662 if(curr_dc == 0) {
00663 curr_dc = q->dc;
00664 curr_dc += DRR_QUANTUM * q->weight;
00665 }
00666 while(!queue_empty(q)) {
00667 pd = q->tab[q->head];
00668 tcp_size = tcp_data_size(pd);
00669 if(curr_dc < tcp_size) {
00670 break;
00671 }
00672 if(tcp_in_flight + tcp_size > max_tcp_in_flight) {
00673 wait_on_link = 1;
00674 break;
00675 }
00676 if(tcp_size > 0) {
00677 q->in_flight += tcp_size;
00678 tcp_in_flight += tcp_size;
00679 curr_dc -= tcp_size;
00680 add_tcp_sent(pd, tcp_size, q);
00681 }
00682 q->head = (q->head+1) % q->max;
00683 queued_pkts--;
00684 assert(queued_pkts >= 0);
00685
00686 send_packet(pd);
00687
00688 DPRINT("scheduler: queue %d, size=%d, tcp_in_flight=%u(%u), "
00689 "dc=%u, queued_pkts=%ld\n",
00690 qnum, queue_size(q), tcp_in_flight, q->in_flight, curr_dc, queued_pkts);
00691 }
00692
00693 if(wait_on_link) {
00694 DPRINT("scheduler: queue %d, link full; will wait on send_sem.\n", qnum);
00695 break;
00696
00697 } else {
00698 if(queue_empty(q))
00699 q->dc = 0;
00700 else
00701 q->dc = curr_dc;
00702
00703 curr_dc = 0;
00704 }
00705 }
00706 check_dealloc_tcp_queue(qnum);
00707 }
00708
00709 if(!queue_empty(&lowpri_pq)) {
00710
00711 if(tcp_in_flight < max_tcp_in_flight) {
00712 while((pd = dequeue_packet(&lowpri_pq)) != NULL) {
00713 send_packet(pd);
00714 DPRINT("scheduler: queue lowpri, size=%d, queued_pkts=%ld\n",
00715 queue_size(&lowpri_pq), queued_pkts);
00716 }
00717 } else {
00718 wait_on_link = 1;
00719 }
00720 }
00721
00722 mutex_unlock("scheduler", &sched_mut);
00723 }
00724 }