Main Page | Class Hierarchy | Class List | File List | Class Members

ProtocolStack/ReceiveQueue.cs

00001 using System;
00002 using System.Threading;
00003 using System.Collections;
00004 
00005 namespace ProtocolStack
00006 {
00012         public class ReceiveQueue : Queue
00013         {
00015                 RemoteHostComms thisRemoteHost;
00016                 Segment workingSegment;
00017                 Thread processingThread;
00018 
00022                 public ReceiveQueue(RemoteHostComms thisRemoteHost) : base() {
00023                         this.thisRemoteHost = thisRemoteHost;
00024                 }
00025 
00026                 public void EnqueueBlocking(Segment seg) {
00027                         lock(this) {
00028                                 Enqueue(seg);
00029                                 Monitor.Pulse(this);
00030                         }
00031                 }
00032                 
00033                 public Segment DequeueBlocking() {
00034                         Segment result;
00035                         lock(this) {
00036                                 while (Count == 0) {
00037                                         Monitor.Wait(this);
00038                                 }
00039                                 result = (Segment)Dequeue();
00040                                 Monitor.Pulse(this);
00041                         }
00042                         return result;
00043                 }
00044 
00045                 public void Start() {
00046                         if (processingThread == null || !processingThread.IsAlive) {
00047                                 processingThread = new Thread(new ThreadStart(ProcessQueue));
00048                                 processingThread.Start();
00049                         }
00050                 }
00051 
00052                 public void Stop() {
00053                         if (processingThread.IsAlive) {
00054                                 processingThread.Abort();
00055                                 processingThread.Join();
00056                         }
00057                 }
00058 
00059 
00063                 void ProcessQueue() {
00064                         while(true) {
00065                                 workingSegment = this.DequeueBlocking(); // blocks
00066                                 // Discard any duplicate or old segments
00067                                 if (thisRemoteHost.SeqManager.IsProcessable(workingSegment.Headers)) {
00068                                         ProcessSegmentHeaders(workingSegment.Headers, workingSegment.Data.DataLength);
00069                                         // send it for processing
00070                                         thisRemoteHost.MessageAssembler.Process(workingSegment);
00071                                 } // else drop and noop;        
00072                         }
00073                 }
00074 
00080                 private void ProcessSegmentHeaders(SegmentHeaders myHeaders, uint dataLength) {
00081                         /* updates protocol sequence number state, issues NACKs for missing segments */
00082                         thisRemoteHost.SeqManager.ProcessReceivedSegmentHeaders(ref myHeaders);
00083                         /* update ACK state*/
00084                         thisRemoteHost.AckManager.ProcessReceivedSegmentHeaders(ref myHeaders);
00085                         /* retransmit if we need to */
00086                         thisRemoteHost.NackManager.ProcessReceivedSegmentHeaders(ref myHeaders);
00087                         /* make new rate calculations */
00088                         thisRemoteHost.RateManager.ProcessReceivedSegmentHeaders(ref myHeaders, dataLength);
00089                 }
00090         }
00091 
00092 
00093 
00094 }

Generated on Mon May 8 22:07:27 2006 by  doxygen 1.3.9.1