Main Page | Class Hierarchy | Class List | File List | Class Members

ProtocolStack/MessageAssembly.cs

00001 using System;
00002 using System.Collections;
00003 using System.Collections.Specialized;
00004 using System.Threading;
00005 using Common;
00006 
00007 namespace ProtocolStack
00008 {
00009         /*************************************************************************/
00014         public class MessageAssembly
00015         {
00017                 AssemblyLine assemblyLine; // segment storage and processing
00018                 protected IncomingMessageQueue messageQueueOutput; // message destination
00019                 protected StackInterface thisStack;
00020                 ChunkAssembler chunkAssembler;
00021                 Thread processingThread;
00022 
00024 
00025 
00026 
00027 
00028 
00029                 public MessageAssembly(RemoteHostComms remoteHost) {
00030                         thisStack = remoteHost.thisStack;
00031                         assemblyLine = new AssemblyLine();
00032                         chunkAssembler = new ChunkAssembler();
00033                         messageQueueOutput = thisStack.ReceivedMessages;
00034                 }
00035 
00037                 public void Process(Segment s) {
00038                         // insert segment into assembly line
00039                         lock(assemblyLine.SyncRoot) {
00040                                 assemblyLine.Add(s);
00041                                 if (s.IsRetransmission) {
00042                                         Console.WriteLine("Just added retransmissoin {0} to assembly line", s.Headers.SequenceNumber);
00043                                 }
00044                                 Monitor.PulseAll(assemblyLine.SyncRoot);
00045                         }
00046                 }
00047 
00048                 public void Start() {
00049                         if (processingThread == null || !processingThread.IsAlive) {
00050                                 processingThread = new Thread(new ThreadStart(Run));                            
00051                                 assemblyLine.Clear();
00052                                 chunkAssembler.Init();
00053                                 processingThread.Start();
00054                         } else {
00055                                 Console.Error.WriteLine("*** Error starting Message Assembly thread: Thread already started! ***");
00056                         }
00057                 }
00058 
00059                 public void Stop() {
00060                         if (processingThread != null && processingThread.IsAlive) {
00061                                 processingThread.Abort();
00062                                 processingThread.Join();
00063                         }
00064                 }
00065 
00066                 void Run() {
00067                         Console.WriteLine("Started Assembly line assembler");
00068                         Message msg = null;
00069                         try {
00070                                 while(true) {
00071                                         /* check for completed assembly lines */
00072                                         msg = null;
00073                                         lock (assemblyLine.SyncRoot) {
00074                                                 while (!assemblyLine.HoldsCompleteMessage) {
00075                                                         Monitor.Wait(assemblyLine.SyncRoot);
00076                                                 }
00077                                                 // syphon off the messages
00078                                                 msg = assemblyLine.ExtractCompleteMessage();
00079                                                 Monitor.PulseAll(assemblyLine.SyncRoot);
00080                                         }
00081                                         // Send this message somewhere for action.
00082                                         // check for retransmission message
00083                                         if (msg.Type == MessageType.Retransmission) {
00084                                                 RetransmissionMessage retrans = (RetransmissionMessage)msg;
00085                                                 Segment retransSegment = retrans.RawSegment;
00086                                                 retransSegment.IsRetransmission = true;
00087                                                 retransSegment.Headers.TimeReceived = DateTime.Now;
00088                                                 Console.WriteLine("Just got retransmission of {0}. Adding to assembly. Retrans flag {1}", retrans.RawSegment.Headers.SequenceNumber, retrans.RawSegment.IsRetransmission);
00089                                                 thisStack.GetRemoteHostByID(retransSegment.Headers.SourceDeviceID).AcceptIncomingSegment(retransSegment);
00090                                         } else if (msg.Type == MessageType.Chunk) {
00091                                                 Console.WriteLine("  > Got Chunk");
00092                                                 chunkAssembler.Add(msg);
00093                                                 while (chunkAssembler.ContainsCompleteMessage()) {
00094                                                         Message reChunkedMsg = chunkAssembler.ExtractCompleteMessage();
00095                                                         if (reChunkedMsg != null) {     
00096                                                                 Console.WriteLine(">>> Assembled Chunked Message");
00097                                                                 messageQueueOutput.EnqueueBlocking(reChunkedMsg);
00098                                                         } else {
00099                                                                 throw new ApplicationException("Extracted a Null chunked message");
00100                                                         }
00101                                                 }
00102                                         } else {
00103                                                 messageQueueOutput.EnqueueBlocking(msg);
00104                                         }
00105                                 }
00106                         } catch (ThreadAbortException) {
00107                                 Console.WriteLine("Shutting down message assembly");
00108                         }
00109                         
00110                 }
00111         }
00112 
00113 
00114         /*************************************************************************/
00119         class AssemblyLine : SortedList {
00120                 // TODO: Include comparator that deals correctly with wraparound
00121 
00123 
00124 
00125 
00126                 private int startCount; 
00130                 private int finishCount;
00131 
00133 
00134 
00135 
00136                 public AssemblyLine() : base() {
00137                         startCount = 0;
00138                         finishCount = 0;
00139                 }
00140 
00143 
00144 
00145 
00146 
00147                 public void Add(Segment seg) {
00148                         this.Add(seg.Headers.SequenceNumber, seg);
00149                         if (seg.Headers.IsFirstInMessage) { startCount++; }
00150                         if (seg.Headers.IsLastInMessage) { finishCount++; }
00151                 //      PrintContents();
00152                 }
00153 
00154 
00159                 public bool HoldsCompleteMessage {
00160                         get { 
00161                                 int i, j = -1;
00162                                 bool result = false;
00163                                 if (startCount > 0  && finishCount > 0) { 
00164                                         /* if we have the same (non-zero) number of starts and finishes */
00165 
00166                                         this.IdentifyCompleteMessage(out i, out j);
00167                                         if (i == -1 || j == -1) { result = false; } else { result = true; }
00168                                 }
00169                                 return result;
00170                         }
00171                 }
00172 
00177                 public Message ExtractCompleteMessage() {
00178                         Message result;
00179                         int startIndex, finishIndex;
00180                         SegmentData[] segmentData;
00181                         uint[] seqNumbers;
00182                         // run through the list, looking for contigious sequencing betwen a start and finish
00183                         IdentifyCompleteMessage(out startIndex, out finishIndex);
00184                         segmentData = new SegmentData[finishIndex - startIndex + 1];
00185                         seqNumbers = new uint[finishIndex - startIndex + 1];
00186                         for (int i = startIndex; i <= finishIndex; i++) {
00187                                 segmentData[i-startIndex] = ((Segment)this.GetByIndex(i)).Data;
00188                                 seqNumbers[i-startIndex] = ((Segment)this.GetByIndex(i)).Headers.SequenceNumber;
00189                         }
00190                         foreach(uint seqNumber in seqNumbers) {
00191                                 Remove(seqNumber);
00192                         }
00193                         result = CreateTypedMessage(segmentData);
00194                         startCount--;
00195                         finishCount--;
00196                         return result;
00197                 }
00198 
00199                 public void PrintContents() {
00200                         Console.Write("StartCount: {0}, FinishCouunt: {1}. Contents:", startCount, finishCount);
00201                         for (int i = 0; i < this.Count; i++) {
00202                                 Segment s = (Segment)this.GetByIndex(i);
00203                                 Console.Write("[{0}] {1} {2}{3},", i, s.Headers.SequenceNumber, s.Headers.IsFirstInMessage ? "F" : "_", s.Headers.IsLastInMessage ? "L" : "_");
00204                         }
00205                         Console.WriteLine();
00206                 }
00212                 private void IdentifyCompleteMessage(out int startIndex, out int finishIndex) {
00213                         // run through the list, looking for contigious sequencing betwen a start and finish segment;
00214                         startIndex = -1;
00215                         finishIndex = -1;
00216                         bool foundIt = false;
00217                         int index = 0;
00218                         int potentialStartIndex;
00219                         uint lastSeqSeen;
00220                         Segment currentSegment;
00221 
00222                         // scan through until we find a start node. 
00223                         // once we have found a start node, scan through until we get a gap or a finish node.
00224                         // if we hit a gap, find another start node
00225                         // if we hit a finish, return start and finish.
00226                         while (!foundIt && index < Count) {
00227                                 currentSegment = (Segment)this.GetByIndex(index);
00228                                 if (!currentSegment.Headers.IsFirstInMessage) {
00229                                         index++; // cycle through until we hit 1st start segment
00230                                 } else { // we've hit a start segment @ index.
00231                                         potentialStartIndex = index;
00232                                         // quick special case test (kludge, could be more graceful)
00233                                         if (!currentSegment.Headers.IsLastInMessage) { // if we don't have a 1st and last same seg.
00234                                                 // start off lastSeqSeen....
00235                                                 lastSeqSeen = currentSegment.Headers.SequenceNumber;
00236                                                 index++;
00237                                                 if (index >= Count) { break; } // kludge
00238                                                 currentSegment = (Segment)this.GetByIndex(index);
00239                                                 while ((!currentSegment.Headers.IsLastInMessage) && (currentSegment.Headers.SequenceNumber == (lastSeqSeen +1))) {
00240                                                         // while we haven't hit a finish node, and there is not a gap in the sequence
00241                                                         lastSeqSeen = currentSegment.Headers.SequenceNumber;
00242                                                         index++;
00243                                                         if (index >= Count) { break; } // kludge
00244                                                         currentSegment = (Segment)this.GetByIndex(index);
00245                                                         // TODO: Exception handling if run off end of list.
00246                                                 } 
00247                                                 /* we need to check for a gap before we check for last in message, because we 
00248                                                  * could miss the penultimate segment */
00249                                                 // here when hit finish node or segment muckup
00250                                                 if (currentSegment.Headers.SequenceNumber == (lastSeqSeen + 1)) { // if we're still in sequence
00251                                                         // must have dropped out due to finding a finish
00252                                                         startIndex = potentialStartIndex;
00253                                                         finishIndex = index;
00254                                                         foundIt = true;
00255                                                 } else {
00256                                                         Console.WriteLine("Gap In sequence detected");
00257                                                         //index++; /* we hit a gap, so we move on to the next one and fall out into the
00258                                                         //                * main while !foundIt loop */
00259                                                 }
00260                                         } else { // we do have first and last sequence on same segment
00261                                                 startIndex = potentialStartIndex;
00262                                                 finishIndex = index;
00263                                                 foundIt = true;
00264                                         }
00265                                 }
00266                         }
00267                 }       
00268 
00269                 public Message CreateTypedMessage(SegmentData[] src) {
00270                         return CreateTypedMessageStatic(src);
00271                 }
00272 
00273                 public static Message CreateTypedMessageStatic(SegmentData[] src) {
00274                         byte[] data = SegmentData.ArrayToByteArray(src);
00275                         MessageType msgType = (MessageType)src[0].Data[0];
00276                         Message result;
00277                         switch(msgType) {
00278                                 case MessageType.Noop: { result = new EmptyMessage(); break; }
00279                                 case MessageType.Retransmission: { result = new RetransmissionMessage(data); break; }
00280                                 case MessageType.CacheIndex: { result = new CacheIndexMessage(data); break; }
00281                                 case MessageType.NoChange: { result = new NoChangeMessage(data); break; }
00282                                 case MessageType.HTTPRequest: { result = new HTTPRequestMessage(data); break; }
00283                                 case MessageType.HTTPResponse: { result = new HTTPResponseMessage(data); break; }
00284                                 case MessageType.CacheUpdateHTTPResponse: { result = new CacheUpdateHTTPResponseMsg(data); break; }
00285                                 case MessageType.CacheUpdateNoChange: {result = new CacheUpdateNoChangeMsg(data); break; }
00286                                 case MessageType.Chunk: { result = new MessageChunk(data); break; }
00287                                 case MessageType.CacheIndexRequest : { result = new CacheIndexRequestMessage(data); break; }
00288                                 default: { throw new NotSupportedException(String.Format("Message type of {0} is not recognised", msgType)); }
00289                         }
00290                         return result;
00291                 }
00292         }
00293         /*************************************************************************/
00294         public class ChunkAssembler {
00295                 /*********************************************************************/
00299                 class AssemblyContainer {
00300                         /*****************************************************************/
00304                         class ChunkedMessageAssembler {
00305                                 private int first;
00306                                 private int last;
00307                                 private int count;
00308                                 MessageChunk[] chunkArray;
00309 
00310                                 public int ChunksOutstanding {
00311                                         get { return ((last - first) + 1) - count; }
00312                                 }
00313 
00314                                 public bool BelongsHere(MessageChunk chunk) {
00315                                         return (chunk.offset >= first && chunk.offset <= last);
00316                                 }
00317 
00318                                 public ChunkedMessageAssembler(MessageChunk initialChunk) {
00319                                         first = initialChunk.firstInMessage;
00320                                         last = initialChunk.lastInMessage;
00321                                         chunkArray = new MessageChunk[(last - first) + 1];
00322                                         count = 0;
00323                                         Add(initialChunk);
00324                                 }
00325 
00326                                 public void Add(MessageChunk chunk) {
00327                                         if (chunk.firstInMessage == first && chunk.lastInMessage == last) {
00328                                                 if (chunkArray[chunk.offset - first] == null) { // -first to return to zero based.
00329                                                         chunkArray[chunk.offset - first] = chunk;
00330                                                         count++;
00331                                                 } else {
00332                                                         throw new ApplicationException("Attempting to assign chunk to somewhere full");
00333                                                 }
00334                                         } else {
00335                                                 throw new ApplicationException("Attempting to add a chunk to a ChunkedMessageAssember that doesn't match first and last");
00336                                         }
00337                                 }
00338 
00339                                 public Message ExtractMessage() {
00340                                         if (ChunksOutstanding == 0) {
00341                                                 return AssemblyLine.CreateTypedMessageStatic(MessageChunk.GetMessageFromChunks(chunkArray));
00342                                         } else {
00343                                                 throw new ApplicationException("Still chunks outstanding");
00344                                         }
00345                                 }
00346 
00347                         }
00348                         /*****************************************************************/
00349                         public DateTime timestamp;
00350                         private ArrayList chunkedMessageAssemblers;
00351                                                 
00352                         public AssemblyContainer(DateTime timestamp) {
00353                                 this.timestamp = timestamp;
00354                                 chunkedMessageAssemblers = new ArrayList();
00355                         }
00356 
00357                         public void AddChunk(MessageChunk chunk) {
00358                                 bool foundHome = false;
00359                                 foreach(ChunkedMessageAssembler assembler in chunkedMessageAssemblers) {
00360                                         if (assembler.BelongsHere(chunk)) {
00361                                                 assembler.Add(chunk);
00362                                         }
00363                                         foundHome = true;
00364                                         break;
00365                                 }
00366                                 if (!foundHome) {
00367                                         chunkedMessageAssemblers.Add(new ChunkedMessageAssembler(chunk));
00368                                 }
00369                         }
00370 
00371                         public bool ContainsCompleteMessage() {
00372                                 bool result = false;
00373                                 foreach (ChunkedMessageAssembler chunkedAssembler in chunkedMessageAssemblers) {
00374                                         result = (chunkedAssembler.ChunksOutstanding == 0);
00375                                         if (result) { break; }
00376                                 }
00377                                 return result;
00378                         }
00379 
00380                         public Message ExtractCompleteMessage() {
00381                                 Message result = null;
00382                                 foreach (ChunkedMessageAssembler chunkedAssembler in chunkedMessageAssemblers) {
00383                                         if (chunkedAssembler.ChunksOutstanding == 0) {
00384                                                 result = chunkedAssembler.ExtractMessage();
00385                                         }
00386                                         if (result != null) { 
00387                                                 chunkedMessageAssemblers.Remove(chunkedAssembler);
00388                                                 break; }
00389                                 }
00390                                 return result;
00391                         }
00392                 }
00393                 /*********************************************************************/
00394                 private ListDictionary assemblyContainers;
00395                 
00396                 public ChunkAssembler() {
00397                         assemblyContainers = new ListDictionary();
00398                 }
00399 
00400 
00401                 public void Add(Message msg) {
00402                         if (msg.Type != MessageType.Chunk) {
00403                                 throw new ApplicationException("Unable to assemble non-chunked message in message chunk assembly");
00404                         } else { // we do have a message chunk.
00405                                 MessageChunk chunk = (MessageChunk)msg;
00406                                 AssemblyContainer currentContainer;
00407                                 if (assemblyContainers.Contains(chunk.timestamp)) {
00408                                         currentContainer = (AssemblyContainer)assemblyContainers[chunk.timestamp];
00409                                 } else {
00410                                         currentContainer = new AssemblyContainer(chunk.timestamp);
00411                                         assemblyContainers.Add(chunk.timestamp, currentContainer);
00412                                 }
00413                                 currentContainer.AddChunk(chunk);
00414                         }
00415                 }
00416 
00417                 public void Init() {
00418                         assemblyContainers.Clear();
00419                 }
00420 
00421                 public bool ContainsCompleteMessage() {
00422                         bool result = false;
00423                         foreach (AssemblyContainer container in assemblyContainers.Values) {
00424                                 result = container.ContainsCompleteMessage();
00425                                 if (result) {break;}
00426                         }
00427                         return result;
00428                 }
00429 
00430                 public Message ExtractCompleteMessage() {
00431                         Message result = null;
00432                         foreach (AssemblyContainer container in assemblyContainers.Values) { 
00433                                 result = container.ExtractCompleteMessage();
00434                                 if (result != null) { break; }
00435                         }
00436                         return result;
00437                 }
00438 
00439 
00440         }
00441 }
00442 
00443 

Generated on Mon May 8 22:07:27 2006 by  doxygen 1.3.9.1