Main Page | Class List | File List | Class Members | File Members

scheduler.c

Go to the documentation of this file.
00001 /* Packet transmission scheduler.
00002  *
00003  */
00004 
00005 #include <stdio.h>
00006 #include <stdlib.h>
00007 #include <string.h>
00008 #include <assert.h>
00009 #include <time.h>
00010 #include <semaphore.h>
00011 #include <pthread.h>
00012 #include <sys/socket.h>
00013 #include <linux/tcp.h>
00014 #include <linux/ip.h>
00015 #include <linux/in.h>
00016 
00017 #include "tcpproxy.h"
00018 #include "misc.h"
00019 #include "tcptimer.h"
00020 #include "tcp.h"
00021 #include "scheduler.h"
00022 
00023 enum queue_state
00024 {
00025     QUEUE_STATE_FREE = 0,
00026     QUEUE_STATE_USED,
00027     QUEUE_STATE_CLOSED
00028 } Tqueue_state;
00029 
00030 /* Sent packet record. */
00031 typedef struct packet_sent {
00032     struct packet_queue *q;          /* Queue packet was sent from. */
00033     unsigned seq;                    /* Sequence number of packet.  */
00034     unsigned size;                   /* Size of packet.             */
00035     long mtime;                      /* Time sent in miliseconds.   */
00036     struct packet_sent *prev;        /* Previous record.            */
00037     struct packet_sent *next;        /* Next record.                */
00038 } Tpacket_sent;
00039 
00040 /* Circular FIFO packet queue. */
00041 typedef struct packet_queue {
00042     enum queue_state state; /* State of the queue.                           */
00043     Tpacket_data **tab;     /* Array of pointers to packet data blocks.      */
00044     int max;                /* Max size of queue.                            */
00045     int head;               /* Head position.                                */
00046     int tail;               /* Tail position.                                */
00047     unsigned in_flight;     /* Number of data bytes in flight over the link. */
00048     unsigned dc;            /* Deficit counter for DRR scheduling.           */
00049     unsigned weight;        /* Weight for DRR scheduling.                    */
00050     Tpacket_sent *sent_hd;  /* Head of linked list of sent packets.          */
00051 } Tpacket_queue;
00052 
00053 static int sock;                              /* Socket for tx packets.      */
00054 static Tpacket_queue tcp_pq[TCP_MAX_QUEUES];  /* Array of TCP packet queues. */
00055 static Tpacket_queue hipri_pq;                /* Queue for hi pri. packets.  */
00056 static Tpacket_queue lowpri_pq;               /* Queue for low pri. packets. */
00057 static pthread_mutex_t sched_mut;             /* Scheduler mutex.            */
00058 static sem_t send_sem;                        /* Send packet semaphore.      */
00059 static long queued_pkts;                      /* Total queued packets.       */
00060 static unsigned tcp_in_flight;       /* No. TCP bytes in flight on the link. */
00061 static int max_tcp_in_flight = TCP_WIN_CLAMP_SIZE;  /* Max data on the link. */
00062 
00063 /*----------------------------------------------------------------------
00064  * alloc_queues -  perform scheduler initialisation.
00065  *----------------------------------------------------------------------
00066  */
00067 void init_sched()
00068 {
00069     int i;
00070     Tpacket_queue *q;
00071     int old_flags;
00072     int on = 1; 
00073 
00074     //DPRINT("init_sched: entered.\n");
00075 
00076     sock = socket(AF_INET, SOCK_RAW, IPPROTO_RAW);
00077 
00078     if(sock == -1) {
00079         EPRINT("init_sched: error opening raw datagram socket.\n");
00080         perror("init_sched");
00081         safe_exit(EXIT_ERR_SOCKET);
00082     }
00083 
00084     if(setsockopt(sock, IPPROTO_IP, IP_HDRINCL, &on, sizeof(on))) {
00085         EPRINT("init_sched: error setting IP_HDRINCL.\n");
00086         perror("init_sched");
00087         safe_exit(EXIT_ERR_SOCKET);
00088     }
00089 
00090     old_flags = fcntl (sock, F_GETFL, 0);  /* Get socket flags. */
00091     if (old_flags == -1) {
00092         EPRINT("init_sched: error reading socket flags.\n");
00093         perror("init_sched");
00094         safe_exit(EXIT_ERR_SOCKET);
00095     }
00096     old_flags &= ~O_NONBLOCK;  /* Clear non-blocking flag. */
00097     if(fcntl(sock, F_SETFL, old_flags) == -1)  /* Set socket flags. */
00098     {
00099         EPRINT("init_sched: error setting socket flags.\n");
00100         perror("init_sched");
00101         safe_exit(EXIT_ERR_SOCKET);     
00102     }
00103 
00104     pthread_mutex_init(&sched_mut, NULL);
00105     tcp_in_flight = 0;
00106 
00107     if(sem_init(&send_sem, 0, 0)) {
00108         EPRINT("init_sched: error initialising semaphore.\n");
00109         safe_exit(EXIT_ERR_SEM_INIT);
00110     }
00111 
00112     /* Initialise ICMP and other packet queues. */
00113     hipri_pq.state = QUEUE_STATE_USED;
00114     hipri_pq.tab = (Tpacket_data **) xmalloc(sizeof(Tpacket_data*)*QUEUE_SIZE_HI_PRI);
00115     hipri_pq.max = QUEUE_SIZE_HI_PRI;
00116     hipri_pq.head = 0;
00117     hipri_pq.tail = 0;
00118     lowpri_pq.state = QUEUE_STATE_USED;
00119     lowpri_pq.tab = (Tpacket_data **) xmalloc(sizeof(Tpacket_data*)*QUEUE_SIZE_LOW_PRI);
00120     lowpri_pq.max = QUEUE_SIZE_LOW_PRI;
00121     lowpri_pq.head = 0;
00122     lowpri_pq.tail = 0;
00123         
00124     /* Initialise the tcp packet queues. */
00125     for(i=0; i<TCP_MAX_QUEUES; i++) {
00126         q = &tcp_pq[i];
00127         q->state = QUEUE_STATE_FREE;
00128         q->tab = (Tpacket_data **) xmalloc(sizeof(Tpacket_data*)*QUEUE_SIZE_TCP);
00129         q->max = QUEUE_SIZE_TCP;
00130         q->head = 0;
00131         q->tail = 0;
00132         q->in_flight = 0;
00133         q->dc = 0;
00134         q->weight = QUEUE_WEIGHT_NORMAL;
00135         q->sent_hd = NULL;
00136     }
00137 
00138     queued_pkts = 0;
00139 }
00140 
00141 /*----------------------------------------------------------------------
00142  * queue_size -  return size of selected queue.
00143  *----------------------------------------------------------------------
00144  */
00145 inline int queue_size(Tpacket_queue *q)
00146 {
00147     return ((q->tail >= q->head) ?
00148             (q->tail - q->head) :
00149             (q->tail + q->max - q->head));
00150 }
00151 
00152 /*----------------------------------------------------------------------
00153  * queue_empty -  return true if the queue is empty.
00154  *----------------------------------------------------------------------
00155  */
00156 inline int queue_empty(Tpacket_queue *q)
00157 {
00158     return (q->head == q->tail);
00159 }
00160 
00161 /*----------------------------------------------------------------------
00162  * packet_timeout -  called by tcp_timer when a packet times out;
00163  *                   the packet may have already been acknowledged, in
00164  *                   which case the record of it is merely deallocated.
00165  *----------------------------------------------------------------------
00166  */
00167 void packet_timeout(void *arg)
00168 {
00169     Tpacket_sent *sent = (Tpacket_sent*)arg;
00170     assert(sent != NULL);
00171 
00172     mutex_lock("scheduler", &sched_mut);
00173 
00174     if(sent->size > 0) {  /* Sent record still active. */
00175         Tpacket_queue *q = sent->q;
00176         DPRINT("packet_timeout: sent record still active, seq=%u, size=%u.\n", sent->seq, sent->size);
00177         if(q->sent_hd == sent) {
00178             q->sent_hd = sent->next;
00179         } else {
00180             assert(sent->prev != NULL);
00181             sent->prev->next = sent->next;
00182         }
00183         assert(sent->size <= q->in_flight);
00184         q->in_flight -= sent->size;
00185         tcp_in_flight -= sent->size;
00186     } else {
00187         //DPRINT("packet_timeout: sent record inactive.\n");
00188     }
00189     xfree(sent);
00190     mutex_unlock("scheduler", &sched_mut);    
00191 }
00192 
00193 /*----------------------------------------------------------------------
00194  * add_tcp_sent -  add a record of a tcp packet to the queue's sent list;
00195  *                 assumes sched_mut HELD.
00196  *----------------------------------------------------------------------
00197  */
00198 void add_tcp_sent(Tpacket_data *pd, unsigned size, Tpacket_queue *q)
00199 {
00200     Tpacket_sent *sent;
00201     Tpacket_sent *ptr, *prev_ptr;
00202 
00203     //DPRINT("add_tcp_sent: entered, size=%u.\n", size);
00204     assert(size > 0);    
00205 
00206     sent = (Tpacket_sent*) xmalloc(sizeof(Tpacket_sent));
00207     sent->q = q;
00208     sent->seq = ntohl(pd->trans.tcp->seq);
00209     sent->size = size;
00210     sent->mtime = get_mclock();
00211     sent->prev = sent->next = NULL;
00212     
00213     prev_ptr = NULL;
00214     for(ptr=q->sent_hd; ptr; ptr=ptr->next) {
00215         if(seq_cmp(sent->seq, ptr->seq) < 0)  /* Need to insert event before ptr. */
00216             break;
00217         prev_ptr = ptr;
00218     }
00219 
00220     if(prev_ptr) {
00221         prev_ptr->next = sent;
00222         sent->prev = prev_ptr;
00223     } else {
00224         q->sent_hd = sent;
00225     }
00226     sent->next = ptr;
00227     if(ptr)
00228         ptr->prev = sent;
00229 
00230     add_timer(TCP_PACKET_TIMEOUT, packet_timeout, sent);
00231 }
00232 
00233 /*----------------------------------------------------------------------
00234  * alloc_tcp_queue -  allocate a tcp output queue;
00235  *                    returns the number of the allocated queue, or -1
00236  *                    if there are no free queues.
00237  *----------------------------------------------------------------------
00238  */
00239 int alloc_tcp_queue(unsigned weight)
00240 {
00241     int i;
00242     mutex_lock("alloc_tcp_queue", &sched_mut);
00243 
00244     for(i=0; i<TCP_MAX_QUEUES; i++) {
00245         if(tcp_pq[i].state == QUEUE_STATE_FREE)
00246             break;
00247     }
00248     if(i < TCP_MAX_QUEUES) {
00249         tcp_pq[i].state = QUEUE_STATE_USED;
00250         tcp_pq[i].weight = weight;
00251         DPRINT("alloc_tcp_queue: allocated tcp queue %d with weight %u.\n", i, weight);
00252     } else {
00253         DPRINT("alloc_tcp_queue: no free queues.\n");
00254         i = -1;
00255     }
00256     mutex_unlock("alloc_tcp_queue", &sched_mut);
00257 
00258     return i;
00259 }
00260 
00261 /*----------------------------------------------------------------------
00262  * close_tcp_queue -  puts tcp queue into the closed state, so that it
00263  *                    will be deallocated when it becomes empty and has
00264  *                    no data in flight.
00265  *----------------------------------------------------------------------
00266  */
00267 int close_tcp_queue(int queue_num)
00268 {
00269     DPRINT("close_tcp_queue: queue %d\n", queue_num);
00270     assert(queue_num >= 0 && queue_num < TCP_MAX_QUEUES);
00271 
00272     mutex_lock("close_tcp_queue", &sched_mut);
00273 
00274     assert(tcp_pq[queue_num].state == QUEUE_STATE_USED);
00275     tcp_pq[queue_num].state = QUEUE_STATE_CLOSED;
00276 
00277     mutex_unlock("close_tcp_queue", &sched_mut);
00278 
00279     return 0;
00280 }
00281 
00282 /*----------------------------------------------------------------------
00283  * clear_tcp_queue -  clears a tcp queue, discarding any packets
00284  *                    still in the queue;
00285  *                    returns the number of packets discarded;
00286  *                    assumes sched_mut HELD.
00287  *----------------------------------------------------------------------
00288  */
00289 int clear_tcp_queue(int queue_num)
00290 {
00291     Tpacket_queue *q;
00292     Tpacket_sent *sent;
00293     int queued;
00294     int sent_rec;
00295     unsigned unacked_bytes;
00296     int i;
00297 
00298     assert(queue_num >= 0 && queue_num < TCP_MAX_QUEUES);
00299 
00300     q = &tcp_pq[queue_num];
00301     queued = queue_size(q);
00302     assert(queued >= 0);
00303 
00304     DPRINT("clear_tcp_queue: clearing queue %d, discarding %d packets.\n",
00305            queue_num, queued);
00306 
00307     unacked_bytes = 0;
00308     sent_rec = 0;
00309     while(q->sent_hd != NULL) {
00310         sent = q->sent_hd;
00311         //DPRINT("clear_tcp_queue: sent packet record, seq=%u, size=%u.\n",
00312         //       sent->seq, sent->size);
00313         unacked_bytes += sent->size;
00314         q->sent_hd = sent->next;
00315         sent_rec++;
00316         sent->size = 0;
00317         /* If the timer is successfully cleared (i.e. it is not currently being
00318          * activated), then free sent. Otherwise, sent will be freed by packet_timeout. */
00319         if(clear_timer(packet_timeout, sent) != -1) {
00320             xfree(sent);
00321             //DPRINT("clear_tcp_queue: cleared timer.\n");
00322         }
00323     }
00324     if(DEBUG && sent_rec > 0)
00325         DPRINT("clear_tcp_queue: cleared %d sent records.\n", sent_rec);
00326 
00327     if(q->in_flight > 0)
00328     {
00329         DPRINT("clear_tcp_queue: in_flight=%u, unacked_bytes=%u, ", q->in_flight, unacked_bytes);
00330         if(q->in_flight != unacked_bytes) {
00331             DPRINT("values do NOT match.\n");
00332         } else {
00333             DPRINT("values match ok.\n");
00334         }
00335         tcp_in_flight -= q->in_flight;  /* Decrement global in flight counter. */
00336         q->in_flight = 0;
00337     }
00338 
00339     for(i=0; i<queued; i++) { /* Throw away all packets. */
00340         dealloc_packet_data(q->tab[q->head]);
00341         q->head = (q->head+1) % q->max;
00342         queued_pkts--;
00343     }
00344     assert(queue_empty(q));
00345     assert(queued_pkts >= 0);
00346 
00347     sem_post(&send_sem);  /* Signal send semaphore, as we may have reduced tcp_in_flight. */
00348 
00349     return queued;
00350 }
00351 
00352 /*----------------------------------------------------------------------
00353  * reset_tcp_queue -  clears all packets from a TCP queue - called when
00354  *                    an RST is seen.
00355  *-----------------------------------------------------------------------
00356  */
00357 int reset_tcp_queue(int queue_num)
00358 {
00359     //DPRINT("reset_tcp_queue: queue: %d\n", queue_num);
00360     assert(queue_num >= 0 && queue_num < TCP_MAX_QUEUES);
00361 
00362     mutex_lock("reset_tcp_queue", &sched_mut);
00363 
00364     assert(tcp_pq[queue_num].state == QUEUE_STATE_USED);
00365     clear_tcp_queue(queue_num);
00366 
00367     mutex_unlock("reset_tcp_queue", &sched_mut);
00368 
00369     return 0;
00370 }
00371 
00372 /*----------------------------------------------------------------------
00373  * check_dealloc_tcp_queue -  check if a tcp queue is ready to be
00374  *                            deallocated, and calls clear_tcp_queue.
00375  *                            returns 1 if the queue is cleared, zero
00376  *                            otherwise.
00377  *                            assumes sched_mut HELD.
00378  *----------------------------------------------------------------------
00379  */
00380 inline int check_dealloc_tcp_queue(int queue_num)
00381 {
00382     Tpacket_queue *q;
00383     //DPRINT("check_dealloc_tcp_queue: queue_num=%d.\n", queue_num);
00384     assert(queue_num >= 0 && queue_num < TCP_MAX_QUEUES);
00385     q = &tcp_pq[queue_num];
00386     if(q->state == QUEUE_STATE_CLOSED && queue_empty(q) && q->in_flight == 0) {
00387         DPRINT("check_dealloc_tcp_queue: deallocating queue %d\n", queue_num);
00388         assert(q->head == q->tail);
00389         q->state = QUEUE_STATE_FREE;
00390         return 1;
00391     } else {
00392         return 0;
00393     }
00394 }
00395 
00396 /*----------------------------------------------------------------------
00397  * cleanup_sched -  destroy objects and deallocate memory.
00398  *----------------------------------------------------------------------
00399  */
00400 void cleanup_sched()
00401 {
00402     Tpacket_queue *q;
00403     int i;
00404 
00405     //DPRINT("cleanup_sched: entered.\n");
00406 
00407     for(i=0; i<TCP_MAX_QUEUES; i++) {
00408         q = &tcp_pq[i];
00409         if(q->state != QUEUE_STATE_FREE) {
00410             clear_tcp_queue(i);
00411             q->state = QUEUE_STATE_FREE;
00412         }
00413         xfree(q->tab);
00414     }
00415 
00416     close(sock);
00417     sem_destroy(&send_sem);
00418 }
00419 
00420 /*----------------------------------------------------------------------
00421  * send_packet -  sends a packet via the raw network socket.
00422  *----------------------------------------------------------------------
00423  */
00424 int send_packet (Tpacket_data *pd)
00425 {
00426     struct sockaddr_in dst;
00427     int len = pd->ipq_pm.data_len;
00428     int sent;
00429 
00430     dst.sin_family = AF_INET;
00431 
00432     /* Copy destination address from ip header. */
00433     memcpy(&dst.sin_addr, &pd->ip->daddr, sizeof(pd->ip->daddr));
00434     
00435     DPRINT("send_packet: sending packet, len=%u, ip_id=%u.\n", 
00436            len, ((struct iphdr*)(pd->payload))->id);
00437     sent = sendto(sock, pd->payload, len, 0, (struct sockaddr *)&dst, sizeof dst);
00438     if(sent == -1) {
00439         EPRINT("send_packet: error sending packet.\n");
00440         perror("send_packet");
00441     } else if(sent != len) {
00442         EPRINT("send_packet: sent bytes not equal to packet size: sent=%d\n",
00443                sent, len);
00444     }
00445     dealloc_packet_data(pd);
00446 
00447     return sent;
00448 }
00449 
00450 /*----------------------------------------------------------------------
00451  * queue_packet -  push packet onto selected queue;
00452  *                 will block until there is space for the packet.
00453  *----------------------------------------------------------------------
00454  */
00455 int queue_packet(Tpacket_data *pd, Tpacket_queue* q)
00456 {
00457     mutex_lock("queue_packet", &sched_mut);
00458 
00459     if(queue_size(q) >= q->max) {  /* Check to see if queue is full. */
00460         DPRINT("queue_packet: queue is full, dropping packet.\n");
00461         mutex_unlock("queue_packet", &sched_mut);
00462         dealloc_packet_data(pd);
00463         return 1;
00464     }
00465         
00466     q->tab[q->tail] = pd;
00467     q->tail = (q->tail+1) % q->max;
00468     queued_pkts++;
00469 
00470     if(DEBUG) {
00471         int qnum = q-tcp_pq;
00472 
00473         if((qnum >= 0) && (qnum < TCP_MAX_QUEUES)) {
00474             DPRINT("queue_packet: tcp queue %d, ", qnum); 
00475         } else if (q == &hipri_pq) {
00476             DPRINT("queue_packet: queue hipri, ");
00477         } else if (q == &lowpri_pq) {
00478             DPRINT("queue_packet: queue lowpri, ");
00479         } else {
00480             DPRINT("queue_packet: queue unknown, ");
00481         }
00482         DPRINT("size=%d, queued_pkts=%ld\n", queue_size(q), queued_pkts);
00483     }
00484 
00485     mutex_unlock("queue_packet", &sched_mut);
00486     sem_post(&send_sem);  /* Indicate there are packets to send.   */
00487 
00488     return 0;
00489 }
00490 
00491 /*----------------------------------------------------------------------
00492  * dequeue_packet -  inline dequeue function, returns NULL on empty;
00493  *                   assumes sched_mut HELD.
00494  *----------------------------------------------------------------------
00495  */
00496 inline Tpacket_data* dequeue_packet(Tpacket_queue* q)
00497 {
00498     if(queue_empty(q)) {
00499         return NULL;
00500     } else {
00501         Tpacket_data *pd;
00502         pd = q->tab[q->head];            /* Get packet.                   */
00503         q->head = (q->head+1) % q->max;  /* Update queue head pointer.    */
00504         queued_pkts--;                   /* Decrement total packet count. */
00505         assert(queued_pkts >= 0);
00506         return pd;    
00507     }
00508 }
00509 
00510 /*----------------------------------------------------------------------
00511  * mh_ack_rcvd -  reduces in-flight counters based on sent records;
00512  *                called when an ack is received from the mobile host.
00513  *----------------------------------------------------------------------
00514  */
00515 void mh_ack_rcvd(unsigned ack_seq, int queue_num)
00516 {
00517     Tpacket_queue *q = &tcp_pq[queue_num];
00518     Tpacket_sent *sent;
00519     unsigned credit;
00520 
00521     mutex_lock("mh_ack_rcvd", &sched_mut);
00522        
00523     credit = 0;
00524     while(q->sent_hd != NULL && seq_cmp(ack_seq, q->sent_hd->seq) >= 1) {
00525         sent = q->sent_hd;
00526         credit += sent->size;
00527         q->sent_hd = sent->next;
00528         sent->size = 0;
00529         /* If the timer is successfully cleared (i.e. it is not currently being
00530          * activated), then free sent. Otherwise, sent will be freed by
00531          * packet_timeout. */
00532         if(clear_timer(packet_timeout, sent) != -1) {
00533             xfree(sent);
00534         }
00535     }
00536     if(q->sent_hd != NULL)
00537         q->sent_hd->prev = NULL;
00538     
00539     assert(credit <= q->in_flight);
00540     q->in_flight -= credit;
00541     tcp_in_flight -= credit;
00542     DPRINT("mh_ack_rcvd: ack_seq=%u, queue=%d, credit=%u, new tcp_in_flight=%u(%u)\n",
00543            ack_seq, queue_num, credit, tcp_in_flight, q->in_flight);
00544     
00545     check_dealloc_tcp_queue(queue_num);  /* Dealloc closed queue if possible. */
00546     
00547     mutex_unlock("mh_ack_rcvd", &sched_mut);    
00548     sem_post(&send_sem);
00549 }
00550 
00551 /*----------------------------------------------------------------------
00552  * queue_tcp -  queue a tcp packet to the specified tcp queue number.
00553  *----------------------------------------------------------------------
00554  */
00555 int queue_tcp(Tpacket_data *pd, int queue_num)
00556 {
00557     Tpacket_queue *q = &tcp_pq[queue_num];
00558     struct tcphdr *tcp = pd->trans.tcp;
00559     
00560     assert(queue_num >= 0 && queue_num < TCP_MAX_QUEUES);
00561     
00562     if(pd->dir == PACKET_DIR_IN) {
00563         if(tcp->syn) { /* Put packets with SYN set into high priority queue. */
00564             /* SYN count as 1 byte in flight, but we can ignore this. */
00565             return queue_packet(pd, &hipri_pq);
00566         } else {
00567             return queue_packet(pd, q);
00568         }
00569     } else if(pd->dir == PACKET_DIR_OUT) {
00570         DPRINT("queue_tcp: packet dir=out, sending immediately\n");
00571         send_packet(pd);
00572         return 0;
00573     } else {
00574         assert(UNREACHABLE);
00575     }
00576 }
00577 
00578 /*----------------------------------------------------------------------
00579  * queue_hipri -  queue packet as hipri; mainly for ICMP.
00580  *----------------------------------------------------------------------
00581  */
00582 int queue_hipri(Tpacket_data *pd)
00583 {
00584     if(pd->dir == PACKET_DIR_IN) {
00585         return queue_packet(pd, &hipri_pq);
00586     } else if(pd->dir == PACKET_DIR_OUT) {
00587         DPRINT("queue_hipri: packet dir=out, sending immediately\n");
00588         send_packet(pd);
00589         return 0;
00590     } else {
00591         assert(UNREACHABLE);
00592     }
00593 }
00594 
00595 /*----------------------------------------------------------------------
00596  * queue_lowpri -  queue packet as lowpri; mainly for UDP.
00597  *----------------------------------------------------------------------
00598  */
00599 int queue_lowpri(Tpacket_data *pd)
00600 {
00601     if(pd->dir == PACKET_DIR_IN) {
00602         return queue_packet(pd, &lowpri_pq);
00603     } else if(pd->dir == PACKET_DIR_OUT) {
00604         DPRINT("queue_lowpri: packet dir=out, sending immediately\n");
00605         send_packet(pd);
00606         return 0;
00607     } else {
00608         assert(UNREACHABLE);
00609     }
00610 }
00611 
00612 /*----------------------------------------------------------------------
00613  * scheduler -  entry point of scheduler thread;
00614  *              packets from TCP queues are transmitted whilst the link
00615  *              isn't full using token bucket and deficit round robin
00616  *              algorithms; when there are no packets or the link is full
00617  *              the scheduler waits on sched_mut.
00618  *----------------------------------------------------------------------
00619  */
00620 void scheduler (void *sig_mask)
00621 {
00622     register Tpacket_queue *q;
00623     Tpacket_data *pd;
00624     unsigned tcp_size;  /* Size of TCP data in a packet.               */
00625     int qnum;           /* Current TCP queue being scheduled.          */
00626     int wait_on_link;   /* Boolean: link is full, wait on link space.  */
00627     unsigned curr_dc;   /* Deficit counter for the current TCP queue;
00628                            zero unless we have to wait for link space. */
00629     
00630     if(pthread_sigmask(SIG_BLOCK, sig_mask, NULL)) {
00631         EPRINT("scheduler: error blocking signals.\n");
00632     }
00633     
00634     qnum = 0;
00635     curr_dc = 0;
00636     
00637     while(!request_exit) {
00638         if(wait_on_link || queued_pkts == 0) {
00639             sem_wait(&send_sem);
00640         }
00641         wait_on_link = 0;
00642         
00643         mutex_lock("scheduler", &sched_mut);
00644         
00645         /* Send from high priority queue ASAP. */
00646         while((pd = dequeue_packet(&hipri_pq)) != NULL) {
00647             send_packet(pd);
00648             DPRINT("scheduler: queue hipri, size=%d, queued_pkts=%ld\n",
00649                    queue_size(&hipri_pq), queued_pkts);
00650         }
00651         
00652         if(qnum >= TCP_MAX_QUEUES) {
00653             qnum = 0;
00654         }
00655         for(; qnum<TCP_MAX_QUEUES; qnum++) {
00656             q = &tcp_pq[qnum];
00657             
00658             if(q->state == QUEUE_STATE_FREE) {
00659                 continue;
00660                 
00661             } else if(!queue_empty(q)) {
00662                 if(curr_dc == 0) {                  
00663                     curr_dc = q->dc;
00664                     curr_dc += DRR_QUANTUM * q->weight;
00665                 }
00666                 while(!queue_empty(q)) {
00667                     pd = q->tab[q->head];  /* Get packet. */
00668                     tcp_size = tcp_data_size(pd);
00669                     if(curr_dc < tcp_size) {
00670                         break;
00671                     }
00672                     if(tcp_in_flight + tcp_size > max_tcp_in_flight) {
00673                         wait_on_link = 1;
00674                         break;  /* Go and wait on send_sem if link is full. */
00675                     }
00676                     if(tcp_size > 0) {
00677                         q->in_flight += tcp_size;
00678                         tcp_in_flight += tcp_size;
00679                         curr_dc -= tcp_size;
00680                         add_tcp_sent(pd, tcp_size, q);              
00681                     }
00682                     q->head = (q->head+1) % q->max;  /* Update queue head pointer. */
00683                     queued_pkts--;                   /* Decrement total packet count. */
00684                     assert(queued_pkts >= 0);
00685                     
00686                     send_packet(pd);
00687                     
00688                     DPRINT("scheduler: queue %d, size=%d, tcp_in_flight=%u(%u), "
00689                            "dc=%u, queued_pkts=%ld\n",
00690                            qnum, queue_size(q), tcp_in_flight, q->in_flight, curr_dc, queued_pkts);
00691                 }
00692                 
00693                 if(wait_on_link) {
00694                     DPRINT("scheduler: queue %d, link full; will wait on send_sem.\n", qnum);
00695                     break;
00696                     
00697                 } else {
00698                     if(queue_empty(q))
00699                         q->dc = 0;
00700                     else
00701                         q->dc = curr_dc;
00702                     
00703                     curr_dc = 0;
00704                 }
00705             }
00706             check_dealloc_tcp_queue(qnum);
00707         }
00708         
00709         if(!queue_empty(&lowpri_pq)) {
00710             /* Send low priority packets only if the link isn't full. */
00711             if(tcp_in_flight < max_tcp_in_flight) {
00712                 while((pd = dequeue_packet(&lowpri_pq)) != NULL) {
00713                     send_packet(pd);
00714                     DPRINT("scheduler: queue lowpri, size=%d, queued_pkts=%ld\n",
00715                            queue_size(&lowpri_pq), queued_pkts);
00716                 }
00717             } else {
00718                 wait_on_link = 1;
00719             }
00720         }
00721         
00722         mutex_unlock("scheduler", &sched_mut);
00723     }
00724 }

Generated on Sun May 14 13:36:52 2006 by  doxygen 1.4.2