Main Page | Class Hierarchy | Class List | File List | Class Members

SmartDeviceClient/SmartProtocolStack/RemoteHost/MessageAssembly.cs

00001 using System;
00002 using System.Collections;
00003 using System.Collections.Specialized;
00004 using System.Threading;
00005 using GPRSWeb.SmartDeviceClient.Common;
00006 
00007 namespace GPRSWeb.SmartDeviceClient.SmartProtocolStack.RemoteHost
00008 {
00009         /*************************************************************************/
00014         public class MessageAssembly
00015         {
00017                 AssemblyLine assemblyLine; // segment storage and processing
00018                 protected MessageQueue messageQueueOutput; // message destination
00019                 protected StackInterface thisStack;
00020                 ChunkAssembler chunkAssembler;
00021                 Thread processingThread;
00022 
00024 
00025 
00026 
00027 
00028 
00029                 public MessageAssembly(RemoteHostComms remoteHost) {
00030                         thisStack = remoteHost.thisStack;
00031                         assemblyLine = new AssemblyLine();
00032                         chunkAssembler = new ChunkAssembler();
00033                         messageQueueOutput = thisStack.ReceivedMessages;
00034                 }
00035 
00037                 public void Process(Segment s) {
00038                         // insert segment into assembly line
00039                         lock(assemblyLine.SyncRoot) {
00040                                 assemblyLine.Add(s);
00041                                 if (s.IsRetransmission) {
00042                                         Console.WriteLine("Just added retransmissoin {0} to assembly line", s.Headers.SequenceNumber);
00043                                 }
00044                                 Monitor.PulseAll(assemblyLine.SyncRoot);
00045                         }
00046                 }
00047 
00048                 public void Start() {
00049                         if (processingThread == null || !processingThread.IsAlive) {
00050                                 processingThread = new Thread(new ThreadStart(Run));                            
00051                                 assemblyLine.Clear();
00052                                 chunkAssembler.Init();
00053                                 processingThread.Start();
00054                         } else {
00055                                 Console.Error.WriteLine("*** Error starting Message Assembly thread: Thread already started! ***");
00056                         }
00057                 }
00058 
00059                 public void Stop() {
00060                         if (processingThread != null && processingThread.IsAlive) {
00061                                 processingThread.Abort();
00062                                 processingThread.Join();
00063                         }
00064                 }
00065 
00066                 void Run() {
00067                         Console.WriteLine("Started Assembly line assembler");
00068                         Message msg = null;
00069                         try {
00070                                 while(true) {
00071                                         /* check for completed assembly lines */
00072                                         msg = null;
00073                                         lock (assemblyLine.SyncRoot) {
00074                                                 while (!assemblyLine.HoldsCompleteMessage) {
00075                                                         Monitor.Wait(assemblyLine.SyncRoot);
00076                                                 }
00077                                                 // syphon off the messages
00078                                                 msg = assemblyLine.ExtractCompleteMessage();
00079                                                 Monitor.PulseAll(assemblyLine.SyncRoot);
00080                                         }
00081                                         // Send this message somewhere for action.
00082                                         // check for retransmission message
00083                                         if (msg.Type == MessageType.Retransmission) {
00084                                                 RetransmissionMessage retrans = (RetransmissionMessage)msg;
00085                                                 Segment retransSegment = retrans.RawSegment;
00086                                                 retransSegment.IsRetransmission = true;
00087                                                 Console.WriteLine("Just got retransmission of {0}. Adding to assembly. Retrans flag {1}", retrans.RawSegment.Headers.SequenceNumber, retrans.RawSegment.IsRetransmission);
00088                                                 thisStack.GetRemoteHostByID(retransSegment.Headers.SourceDeviceID).AcceptIncomingSegment(retransSegment);
00089                                         } else if (msg.Type == MessageType.Chunk) {
00090                                                 Console.WriteLine("> Got Message Chunk");
00091                                                 chunkAssembler.Add(msg);
00092                                                 while (chunkAssembler.ContainsCompleteMessage()) {
00093                                                         Message reChunkedMsg = chunkAssembler.ExtractCompleteMessage();
00094                                                         if (reChunkedMsg != null) {     
00095                                                                 Console.WriteLine("> Got a re-constituted chunked message");
00096                                                                 messageQueueOutput.EnqueueBlocking(reChunkedMsg);
00097                                                         } else {
00098                                                                 throw new ApplicationException("Extracted a Null chunked message");
00099                                                         }
00100                                                 }
00101                                         } else {
00102                                                 messageQueueOutput.EnqueueBlocking(msg);
00103                                         }
00104                                 }
00105                         } catch (ThreadAbortException) {
00106                                 Console.WriteLine("Shutting down message assembly");
00107                         }
00108                         
00109                 }
00110         }
00111 
00112         class SortedList : ListDictionary {
00113                 public new void Add(object key, object value) {
00114                         base.Add(key, value);
00115 //                      base.
00116                 }
00117 
00118                 public object GetByIndex(int i) {
00119                         return base[i];
00120                 }
00121 
00122                 public new void Remove(object key) {
00123                 }
00124         }
00125 
00126         /*************************************************************************/
00131         class AssemblyLine : SortedList {
00132                 // TODO: Convert to sorted list
00133 
00135 
00136 
00137 
00138                 private int startCount; 
00142                 private int finishCount;
00143 
00145 
00146 
00147 
00148                 public AssemblyLine() : base() {
00149                         startCount = 0;
00150                         finishCount = 0;
00151                 }
00152 
00155 
00156 
00157 
00158 
00159                 public void Add(Segment seg) {
00160                         this.Add(seg.Headers.SequenceNumber, seg);
00161                         if (seg.Headers.IsFirstInMessage) { startCount++; }
00162                         if (seg.Headers.IsLastInMessage) { finishCount++; }
00163                 //      PrintContents();
00164                 }
00165 
00166 
00171                 public bool HoldsCompleteMessage {
00172                         get { 
00173                                 int i, j = -1;
00174                                 bool result = false;
00175                                 if (startCount > 0  && finishCount > 0) { 
00176                                         /* if we have the same (non-zero) number of starts and finishes */
00177 
00178                                         this.IdentifyCompleteMessage(out i, out j);
00179                                         if (i == -1 || j == -1) { result = false; } else { result = true; }
00180                                 }
00181                                 return result;
00182                         }
00183                 }
00184 
00189                 public Message ExtractCompleteMessage() {
00190                         Message result;
00191                         int startIndex, finishIndex;
00192                         SegmentData[] segmentData;
00193                         uint[] seqNumbers;
00194                         // run through the list, looking for contigious sequencing betwen a start and finish
00195                         IdentifyCompleteMessage(out startIndex, out finishIndex);
00196                         segmentData = new SegmentData[finishIndex - startIndex + 1];
00197                         seqNumbers = new uint[finishIndex - startIndex + 1];
00198                         for (int i = startIndex; i <= finishIndex; i++) {
00199                                 segmentData[i-startIndex] = ((Segment)this.GetByIndex(i)).Data;
00200                                 seqNumbers[i-startIndex] = ((Segment)this.GetByIndex(i)).Headers.SequenceNumber;
00201                         }
00202                         foreach(uint seqNumber in seqNumbers) {
00203                                 Remove(seqNumber);
00204                         }
00205                         result = CreateTypedMessage(segmentData);
00206                         startCount--;
00207                         finishCount--;
00208                         return result;
00209                 }
00210 
00211                 public void PrintContents() {
00212                         Console.Write("StartCount: {0}, FinishCouunt: {1}. Contents:", startCount, finishCount);
00213                         for (int i = 0; i < this.Count; i++) {
00214                                 Segment s = (Segment)this.GetByIndex(i);
00215                                 Console.Write("[{0}] {1} {2}{3},", i, s.Headers.SequenceNumber, s.Headers.IsFirstInMessage ? "F" : "_", s.Headers.IsLastInMessage ? "L" : "_");
00216                         }
00217                         Console.WriteLine();
00218                 }
00224                 private void IdentifyCompleteMessage(out int startIndex, out int finishIndex) {
00225                         // run through the list, looking for contigious sequencing betwen a start and finish segment;
00226                         startIndex = -1;
00227                         finishIndex = -1;
00228                         bool foundIt = false;
00229                         int index = 0;
00230                         int potentialStartIndex;
00231                         uint lastSeqSeen;
00232                         Segment currentSegment;
00233 
00234                         // scan through until we find a start node. 
00235                         // once we have found a start node, scan through until we get a gap or a finish node.
00236                         // if we hit a gap, find another start node
00237                         // if we hit a finish, return start and finish.
00238                         while (!foundIt && index < Count) {
00239                                 currentSegment = (Segment)this.GetByIndex(index);
00240                                 if (!currentSegment.Headers.IsFirstInMessage) {
00241                                         index++; // cycle through until we hit 1st start segment
00242                                 } else { // we've hit a start segment @ index.
00243                                         potentialStartIndex = index;
00244                                         // quick special case test (kludge, could be more graceful)
00245                                         if (!currentSegment.Headers.IsLastInMessage) { // if we don't have a 1st and last same seg.
00246                                                 // start off lastSeqSeen....
00247                                                 lastSeqSeen = currentSegment.Headers.SequenceNumber;
00248                                                 index++;
00249                                                 if (index >= Count) { break; } // kludge
00250                                                 currentSegment = (Segment)this.GetByIndex(index);
00251                                                 while ((!currentSegment.Headers.IsLastInMessage) && (currentSegment.Headers.SequenceNumber == (lastSeqSeen +1))) {
00252                                                         // while we haven't hit a finish node, and there is not a gap in the sequence
00253                                                         lastSeqSeen = currentSegment.Headers.SequenceNumber;
00254                                                         index++;
00255                                                         if (index >= Count) { break; } // kludge
00256                                                         currentSegment = (Segment)this.GetByIndex(index);
00257                                                         // TODO: Exception handling if run off end of list.
00258                                                 } 
00259                                                 /* we need to check for a gap before we check for last in message, because we 
00260                                                  * could miss the penultimate segment */
00261                                                 // here when hit finish node or segment muckup
00262                                                 if (currentSegment.Headers.SequenceNumber == (lastSeqSeen + 1)) { // if we're still in sequence
00263                                                         // must have dropped out due to finding a finish
00264                                                         startIndex = potentialStartIndex;
00265                                                         finishIndex = index;
00266                                                         foundIt = true;
00267                                                 } else {
00268                                                         Console.WriteLine("Gap In sequence detected");
00269                                                         //index++; /* we hit a gap, so we move on to the next one and fall out into the
00270                                                         //                * main while !foundIt loop */
00271                                                 }
00272                                         } else { // we do have first and last sequence on same segment
00273                                                 startIndex = potentialStartIndex;
00274                                                 finishIndex = index;
00275                                                 foundIt = true;
00276                                         }
00277                                 }
00278                         }
00279                 }       
00280 
00281                 public Message CreateTypedMessage(SegmentData[] src) {
00282                         return CreateTypedMessageStatic(src);
00283                 }
00284 
00285                 public static Message CreateTypedMessageStatic(SegmentData[] src) {
00286                         byte[] data = SegmentData.ArrayToByteArray(src);
00287                         MessageType msgType = (MessageType)src[0].Data[0];
00288                         Message result;
00289                         switch(msgType) {
00290                                 case MessageType.Noop: { result = new EmptyMessage(); break; }
00291                                 case MessageType.Retransmission: { result = new RetransmissionMessage(data); break; }
00292                                 case MessageType.CacheIndex: { result = new CacheIndexMessage(data); break; }
00293                                 case MessageType.NoChange: { result = new NoChangeMessage(data); break; }
00294                                 case MessageType.HTTPRequest: { result = new HTTPRequestMessage(data); break; }
00295                                 case MessageType.HTTPResponse: { result = new HTTPResponseMessage(data); break; }
00296                                 case MessageType.CacheUpdateHTTPResponse: { result = new CacheUpdateHTTPResponseMsg(data); break; }
00297                                 case MessageType.CacheUpdateNoChange: {result = new CacheUpdateNoChangeMsg(data); break; }
00298                                 case MessageType.Chunk: { result = new MessageChunk(data); break; }
00299                                 case MessageType.CacheIndexRequest : { result = new CacheIndexRequestMessage(data); break; }
00300                                 default: { throw new NotSupportedException(String.Format("Message type of {0} is not recognised", msgType)); }
00301                         }
00302                         return result;
00303                 }
00304         }
00305         /*************************************************************************/
00306         public class ChunkAssembler {
00307                 /*********************************************************************/
00311                 class AssemblyContainer {
00312                         /*****************************************************************/
00316                         class ChunkedMessageAssembler {
00317                                 private int first;
00318                                 private int last;
00319                                 private int count;
00320                                 MessageChunk[] chunkArray;
00321 
00322                                 public int ChunksOutstanding {
00323                                         get { return ((last - first) + 1) - count; }
00324                                 }
00325 
00326                                 public bool BelongsHere(MessageChunk chunk) {
00327                                         return (chunk.offset >= first && chunk.offset <= last);
00328                                 }
00329 
00330                                 public ChunkedMessageAssembler(MessageChunk initialChunk) {
00331                                         first = initialChunk.firstInMessage;
00332                                         last = initialChunk.lastInMessage;
00333                                         chunkArray = new MessageChunk[(last - first) + 1];
00334                                         count = 0;
00335                                         Add(initialChunk);
00336                                 }
00337 
00338                                 public void Add(MessageChunk chunk) {
00339                                         if (chunk.firstInMessage == first && chunk.lastInMessage == last) {
00340                                                 if (chunkArray[chunk.offset - first] == null) { // -first to return to zero based.
00341                                                         chunkArray[chunk.offset - first] = chunk;
00342                                                         count++;
00343                                                 } else {
00344                                                         throw new ApplicationException("Attempting to assign chunk to somewhere full");
00345                                                 }
00346                                         } else {
00347                                                 throw new ApplicationException("Attempting to add a chunk to a ChunkedMessageAssember that doesn't match first and last");
00348                                         }
00349                                 }
00350 
00351                                 public Message ExtractMessage() {
00352                                         if (ChunksOutstanding == 0) {
00353                                                 return AssemblyLine.CreateTypedMessageStatic(MessageChunk.GetMessageFromChunks(chunkArray));
00354                                         } else {
00355                                                 throw new ApplicationException("Still chunks outstanding");
00356                                         }
00357                                 }
00358 
00359                         }
00360                         /*****************************************************************/
00361                         public DateTime timestamp;
00362                         private ArrayList chunkedMessageAssemblers;
00363                                                 
00364                         public AssemblyContainer(DateTime timestamp) {
00365                                 this.timestamp = timestamp;
00366                                 chunkedMessageAssemblers = new ArrayList();
00367                         }
00368 
00369                         public void AddChunk(MessageChunk chunk) {
00370                                 bool foundHome = false;
00371                                 foreach(ChunkedMessageAssembler assembler in chunkedMessageAssemblers) {
00372                                         if (assembler.BelongsHere(chunk)) {
00373                                                 assembler.Add(chunk);
00374                                         }
00375                                         foundHome = true;
00376                                         break;
00377                                 }
00378                                 if (!foundHome) {
00379                                         chunkedMessageAssemblers.Add(new ChunkedMessageAssembler(chunk));
00380                                 }
00381                         }
00382 
00383                         public bool ContainsCompleteMessage() {
00384                                 bool result = false;
00385                                 foreach (ChunkedMessageAssembler chunkedAssembler in chunkedMessageAssemblers) {
00386                                         result = (chunkedAssembler.ChunksOutstanding == 0);
00387                                         if (result) { break; }
00388                                 }
00389                                 return result;
00390                         }
00391 
00392                         public Message ExtractCompleteMessage() {
00393                                 Message result = null;
00394                                 foreach (ChunkedMessageAssembler chunkedAssembler in chunkedMessageAssemblers) {
00395                                         if (chunkedAssembler.ChunksOutstanding == 0) {
00396                                                 result = chunkedAssembler.ExtractMessage();
00397                                         }
00398                                         if (result != null) { 
00399                                                 chunkedMessageAssemblers.Remove(chunkedAssembler);
00400                                                 break; }
00401                                 }
00402                                 return result;
00403                         }
00404                 }
00405                 /*********************************************************************/
00406                 private ListDictionary assemblyContainers;
00407                 
00408                 public ChunkAssembler() {
00409                         assemblyContainers = new ListDictionary();
00410                 }
00411 
00412 
00413                 public void Add(Message msg) {
00414                         if (msg.Type != MessageType.Chunk) {
00415                                 throw new ApplicationException("Unable to assemble non-chunked message in message chunk assembly");
00416                         } else { // we do have a message chunk.
00417                                 MessageChunk chunk = (MessageChunk)msg;
00418                                 AssemblyContainer currentContainer;
00419                                 if (assemblyContainers.Contains(chunk.timestamp)) {
00420                                         currentContainer = (AssemblyContainer)assemblyContainers[chunk.timestamp];
00421                                 } else {
00422                                         currentContainer = new AssemblyContainer(chunk.timestamp);
00423                                         assemblyContainers.Add(chunk.timestamp, currentContainer);
00424                                 }
00425                                 currentContainer.AddChunk(chunk);
00426                         }
00427                 }
00428 
00429                 public void Init() {
00430                         assemblyContainers.Clear();
00431                 }
00432 
00433                 public bool ContainsCompleteMessage() {
00434                         bool result = false;
00435                         foreach (AssemblyContainer container in assemblyContainers.Values) {
00436                                 result = container.ContainsCompleteMessage();
00437                                 if (result) {break;}
00438                         }
00439                         return result;
00440                 }
00441 
00442                 public Message ExtractCompleteMessage() {
00443                         Message result = null;
00444                         foreach (AssemblyContainer container in assemblyContainers.Values) { 
00445                                 result = container.ExtractCompleteMessage();
00446                                 if (result != null) { break; }
00447                         }
00448                         return result;
00449                 }
00450 
00451 
00452         }
00453 }
00454 
00455 

Generated on Mon May 8 22:07:27 2006 by  doxygen 1.3.9.1