Main Page | Class Hierarchy | Class List | File List | Class Members

SegmentTransport.cs

00001 using System;
00002 using System.Net;
00003 using System.Net.Sockets;
00004 using System.Collections;
00005 using System.Threading;
00006 
00007 namespace ProtocolStack {
00008 
00014         public class SegmentTransmitter {
00015                 RemoteHostComms thisRemoteHost; 
00016                 StackInterface thisStack;
00017                 UdpClient myUDP;
00018 
00019                 Segment workingSegment;
00020                 IPAddress currentDestination;
00021                 Thread processingThread;
00022 
00027                 public void Start() {
00028                         if (processingThread == null || !processingThread.IsAlive) {
00029                                 Console.WriteLine("Starting Segment Transmitter...");
00030                                 try {
00031                                         StackSettings s = thisStack.Settings;
00032                                         myUDP = new UdpClient(new IPEndPoint(thisStack.ipMappings.Lookup(thisStack.settings.MyDeviceID), s.MyDeviceID == s.SERVER_DEVICE_ID ? s.ServerSendFromPortNumber : s.ClientSendFromPortNumber));
00033                                 } catch (ApplicationException appEx) {
00034                                         throw new ApplicationException(String.Format("Unable to create transmission socket", thisStack.ipMappings.Lookup(thisStack.settings.MyDeviceID)), appEx);
00035                                 } catch (SocketException sockEx) {
00036                                         throw new ApplicationException(String.Format("Unable to create transmission socket for IP Address [{0}]", thisStack.ipMappings.Lookup(thisStack.settings.MyDeviceID)), sockEx);
00037                                 }
00038                                 processingThread = new Thread(new ThreadStart(Run));
00039                                 processingThread.Start();
00040                                 Console.WriteLine("...Segment Transmitter Started.");
00041                         }
00042                 }
00043 
00048                 public void Stop() {
00049                         if (processingThread != null && processingThread.IsAlive) {
00050                                 processingThread.Abort();
00051                                 processingThread.Join();
00052                                 myUDP.Close();
00053                         }
00054                 }
00055 
00061                 public SegmentTransmitter(RemoteHostComms thisRemoteHost) {
00062                         this.thisRemoteHost = thisRemoteHost;
00063                         this.thisStack = thisRemoteHost.thisStack;
00064                 }
00065 
00070                 public void Run() {
00071                         /* Takes next segment from transmit queue. 
00072                          * Looks up destination IP
00073                          * Fills in any Just In Time fields
00074                          * Send (rate control?)
00075                          * Repeat As Needed */
00076                         try {
00077                                 while (true) {
00078                                         // get next queue entry
00079                                         TransmissionQueueEntry currentQueueEntry; 
00080                                         if (thisRemoteHost.readyToSendUserData) {
00081                                                 currentQueueEntry = thisRemoteHost.TxQueue.DequeueBlocking(); // blocks
00082                                         } else { // if we're waiting to be ready to send user daat
00083                                                 Console.WriteLine("### Not ready to send ###");
00084                                                 lock(thisRemoteHost.AckManager) {
00085                                                         Console.WriteLine("### Waiting for ACKable to be ready ###");
00086                                                         while (!thisRemoteHost.AckManager.IsAckableWaiting()) {
00087                                                                 Monitor.Wait(thisRemoteHost.AckManager);
00088                                                                 Console.WriteLine("Woke up");
00089                                                         }
00090                                                         Console.WriteLine("### ACKable ready, creating empty message carrier");
00091                                                         currentQueueEntry = TransmissionQueueEntry.FromMessage(new EmptyMessage(thisRemoteHost.RemoteDeviceID), true)[0];
00092                                                 }
00093                                         }
00094                                         BlockForCreditAndSend(currentQueueEntry);
00095                                 }
00096                         } catch (ThreadAbortException) {
00097                                 // close down
00098                         }
00099                 }
00100 
00101                 private void BlockForCreditAndSend(TransmissionQueueEntry currentQueueEntry) {
00102                         bool originalUnsent = true;
00103                         while (originalUnsent) {
00104                                 if (thisRemoteHost.RateManager.WaitForSend(currentQueueEntry.segmentData.DataLength, 10)) {  // if we're ok to send
00105                                         SendTQE(currentQueueEntry);
00106                                         originalUnsent = false;
00107                                 } else {
00108                                         // construct no-op mesage and use that to carry credit info
00109                                         EmptyMessage msg = new EmptyMessage();
00110                                         msg.Destination = currentQueueEntry.destinationDeviceID;
00111                                         TransmissionQueueEntry[] entryArray = TransmissionQueueEntry.FromMessage(msg, true); // should only be 1 segment!
00112                                         Console.WriteLine("    10 seconds elapsed without credit to send - sending credit update message to try and free deadlock *");
00113                                         SendTQE(entryArray[0]);
00114                                         originalUnsent = true;
00115                                 }
00116                         }
00117                 }
00118 
00119                 private void SendTQE(TransmissionQueueEntry currentQueueEntry) {
00121                         workingSegment = new Segment();
00122                         uint sequenceOffset = 0;
00123                         RateInfo rateInfo = null;
00124                         /* Gathering Data For Headers */
00125                         bool isAckable = thisRemoteHost.AckManager.IsAckableWaiting();
00126                         AckInfo ackInfo = thisRemoteHost.AckManager.GetNextAck();
00127                         workingSegment.Headers = new SegmentHeaders();
00128                         NackInfo nackInfo = thisRemoteHost.NackManager.GetNextNack();
00129                         uint seqNum = thisRemoteHost.SeqManager.GetNextSequenceNumber();
00130                         bool isResynch = thisRemoteHost.SeqManager.IsResynching;
00131                         if (isResynch) {
00132                                 sequenceOffset = thisRemoteHost.SeqManager.GetSequenceOffset(seqNum);
00133                         } else {
00134                                 rateInfo = thisRemoteHost.RateManager.GetRateInfoForTx(isAckable);
00135                         }
00136                         // TODO: Add Try / Catch here (looking up IP address of destination)
00137                         currentDestination = thisStack.ipMappings.Lookup(currentQueueEntry.destinationDeviceID); // unneeded? 
00138 
00139                         /* Assign Segment Data */
00140                         workingSegment.Data = currentQueueEntry.segmentData;
00141 
00142                         /* Fill in Segment Headers */
00143                         /* Initial Flags */
00144                         workingSegment.Headers.IsAckable = isAckable;
00145                         workingSegment.Headers.IsResynch = isResynch;
00146                         workingSegment.Headers.IsFirstInMessage = currentQueueEntry.isMessageStart;
00147                         workingSegment.Headers.IsLastInMessage = currentQueueEntry.isMessageEnd;
00148 
00149                         /* Source Device ID and Sequence Number */
00150                         workingSegment.Headers.SourceDeviceID = thisStack.settings.MyDeviceID;
00151                         workingSegment.Headers.SequenceNumber = seqNum;
00152 
00153                         /* ACKs and NACKs */
00154                         if (ackInfo == null) { // no ACKs today
00155                                 workingSegment.Headers.ClearAck();
00156                         } else { // ACK that segment!
00157                                 workingSegment.Headers.SetAck(ackInfo);
00158                         }
00159                         if (nackInfo == null) { // no NACKs today
00160                                 workingSegment.Headers.ClearNACKStatus();
00161                         } else { // NACK that segment!
00162                                 workingSegment.Headers.SetNACKStatus(nackInfo);
00163                                 Console.WriteLine("Adding NACK on outgoing segment #{0} for retrans of seg #{1}", workingSegment.Headers.SequenceNumber, nackInfo.SequenceNumber);
00164                         }
00165                                 
00166                         /* Synch or rate info */
00167                         if (isResynch) {
00168                                 workingSegment.Headers.SynchOrRate = sequenceOffset;
00169                         } else {
00170                                 workingSegment.Headers.SynchOrRate = rateInfo.data;
00171                         }
00172 
00173                         /* Time stamp and send */
00174                         workingSegment.Headers.TimeSent = DateTime.Now.ToUniversalTime();
00175                         tempSend(workingSegment, currentDestination);
00176                         thisRemoteHost.RetransmissionBuffer.PutSegment(workingSegment);
00177                         thisRemoteHost.RateManager.DecreaseSendCredit(workingSegment.Data.DataLength);
00178 
00179                         /* If we sent an Ackable packet, make a note to expect a reply */
00180                         if (isAckable) { 
00181                                 thisRemoteHost.AckManager.SentAckableSegment(workingSegment.Headers.SequenceNumber);
00182                         }
00183                 }
00184 
00185                 public void tempSend(Segment targetSegment, IPAddress destination) {
00186                         byte[] datagram = targetSegment.ToByteArray();
00187                         StackSettings s = thisStack.Settings;
00188                         myUDP.Send(datagram, datagram.Length, new IPEndPoint(destination, (s.MyDeviceID == 0) ? s.ClientReceivePortNumber : s.ServerReceivePortNumber));
00189                         Console.WriteLine("  > Sent #{0}, Credit Onboard: {1}, ACKable: {2}, Size: {3}", targetSegment.Headers.SequenceNumber, targetSegment.Headers.SynchOrRate ,targetSegment.Headers.IsAckable, targetSegment.Data.DataLength);
00190                         //Console.WriteLine("  > Sent #{0}", targetSegment.Headers.SequenceNumber);
00191                 }
00192         }
00193 
00194         /*************************************************************************/
00195 
00196         public class SegmentReceiver {
00197                 /* Run's in it's own thread. 
00198                  * Gabs UDP packets from the UDP Rx port
00199                  * Turns those packets into segments and stamps a Rx time on them
00200                  * Sends segment to appropriate next stage (DeMux)*/
00202                 Thread processingThread;
00203                 StackInterface thisStack;
00204                 UdpClient udpListener;
00205 
00207                 public ReceiveQueue destinationQueue;
00208 
00210                 public SegmentReceiver(StackInterface thisStack) {
00211                         this.thisStack = thisStack;
00212                 }
00213 
00214                 public void Start() {
00215                         Console.WriteLine("Starting Segment Receiver...");
00216                         if (processingThread == null || !processingThread.IsAlive) {
00217                                 processingThread = new Thread(new ThreadStart(Listen));
00218                                 processingThread.Start();
00219                                 Console.WriteLine("... Segment Receiver Started.");
00220                         } else {
00221                                 Console.WriteLine("... Segment Receiver Already Started!");
00222                         }
00223                         
00224                 }
00225 
00226                 public void Stop() {
00227                         Console.WriteLine("Stopping Segment Receiver...");
00228                         if (processingThread != null && processingThread.IsAlive) {
00229                                 processingThread.Abort();
00230                                 if (udpListener != null) { udpListener.Close(); }
00231                                 processingThread.Join();
00232                                 Console.WriteLine("... Segment Receiver Stopped.");
00233                         } else {
00234                                 Console.WriteLine("... Segment Receiver Already Stopped!");
00235                         }
00236                 }
00237 
00239                 private void Listen() { 
00240                         IPEndPoint currentEndPoint;
00241                         Segment receivedSegment;
00242                         int localPortNumber = -1;
00243                         if (thisStack.settings.MyDeviceID == thisStack.Settings.SERVER_DEVICE_ID) { // we are the server
00244                                 localPortNumber = thisStack.Settings.ServerReceivePortNumber;
00245                         } else {// we are a client
00246                                 localPortNumber = thisStack.Settings.ClientReceivePortNumber;
00247                         }
00248                         // create UDP socket bound to listen on IP address from settings
00249                         udpListener = new UdpClient(new IPEndPoint(IPAddress.Any, localPortNumber));
00250                         try {
00251                                 while(true) {/* We want to do as little as possible here so we can run this in a tight loop */
00252                                         try {
00253                                                 // create end point to hold receieved address
00254                                                 currentEndPoint = new IPEndPoint(IPAddress.Any, localPortNumber);
00255                                                 receivedSegment = new Segment(udpListener.Receive(ref currentEndPoint));
00256                                                 receivedSegment.Headers.TimeReceived = DateTime.Now.ToUniversalTime();
00257                                                 Console.WriteLine("  < Got #{0}", receivedSegment.Headers.SequenceNumber);
00258                                                 thisStack.GetRemoteHostByID(receivedSegment.Headers.SourceDeviceID).AcceptIncomingSegment(receivedSegment);
00259                                         } catch (ThreadAbortException) {
00260                                         } catch (Exception ex) {
00261                                                 Console.Error.WriteLine("\n *** Error in Protocol Stack ***\nUnable to correctly receive segment.\nSkipping over and continuting. Details:\n\n(" + ex + ")");
00262                                         }
00263                                 }
00264                         } catch (ThreadAbortException) {
00265                                 Console.WriteLine("Segment Receive Listen Thread Aborted");
00266                         } finally {
00267                                 if (udpListener != null) { udpListener.Close(); }
00268                         }
00269         }
00270 }
00271 
00272         /*************************************************************************/
00276         public class ConnectionReset : ApplicationException {}
00277         /*************************************************************************/
00281         public delegate void ConnectionResetDelegate();
00282         /*************************************************************************/
00283 }

Generated on Mon May 8 22:07:27 2006 by  doxygen 1.3.9.1