PeerFlooder.cs source code in C# .NET

Source code for the .NET framework in C#

                        

Code:

/ WCF / WCF / 3.5.30729.1 / untmp / Orcas / SP / ndp / cdf / src / WCF / ServiceModel / System / ServiceModel / Channels / PeerFlooder.cs / 1 / PeerFlooder.cs

                            //------------------------------------------------------------ 
// Copyright (c) Microsoft Corporation.  All rights reserved.
//-----------------------------------------------------------
namespace System.ServiceModel.Channels
{ 
    using System.Collections;
    using System.Collections.Generic; 
    using System.Collections.ObjectModel; 
    using System.Diagnostics;
    using System.IO; 
    using System.Runtime.Serialization;
    using System.ServiceModel;
    using System.ServiceModel.Diagnostics;
    using System.ServiceModel.Dispatcher; 
    using System.ServiceModel.Security;
    using System.Xml; 
    using System.Threading; 

    class PeerFlooder : PeerFlooderSimple 
    {

        PeerFlooder(PeerNodeConfig config, PeerNeighborManager neighborManager):base(config, neighborManager){}
 
        public static PeerFlooder CreateFlooder(PeerNodeConfig config, PeerNeighborManager neighborManager, IPeerNodeMessageHandling messageHandler )
        { 
            PeerFlooder flooder = new PeerFlooder(config, neighborManager); 
            flooder.messageHandler = messageHandler;
            return flooder; 
        }

    }
 
    interface IFlooderForThrottle
    { 
        void OnThrottleReached(); 
        void OnThrottleReleased();
    } 


    abstract class PeerFlooderBase: IFlooderForThrottle, IPeerFlooderContract where TFloodContract : Message
    { 
        protected PeerNodeConfig config;
        protected PeerNeighborManager neighborManager; 
        protected List neighbors; 
        private object thisLock = new object();
 
        internal IPeerNodeMessageHandling messageHandler;
        internal PeerThrottleHelper quotaHelper;
        long messageSequence;
 
        public event EventHandler ThrottleReached;
        public event EventHandler SlowNeighborKilled; 
        public event EventHandler ThrottleReleased; 
        public EventHandler OnMessageSentHandler;
 

        public PeerFlooderBase(PeerNodeConfig config, PeerNeighborManager neighborManager)
        {
            this.neighborManager = neighborManager; 
            this.neighbors = new List();
            this.config = config; 
            this.neighbors = this.neighborManager.GetConnectedNeighbors(); 
            this.quotaHelper = new PeerThrottleHelper(this, this.config.MaxPendingOutgoingCalls);
            OnMessageSentHandler = new EventHandler(OnMessageSent); 
        }

        void PruneNeighborCallback(IPeerNeighbor peer)
        { 
            lock(ThisLock)
            { 
                if(this.Neighbors.Count <= 1) 
                    return;
                if(DiagnosticUtility.ShouldTraceWarning) 
                {
                    string message = SR.GetString(SR.PeerThrottlePruning, this.config.MeshId);
                    PeerThrottleTraceRecord record = new PeerThrottleTraceRecord(this.config.MeshId, message);
                    TraceUtility.TraceEvent(TraceEventType.Warning, TraceCode.PeerFlooderReceiveMessageQuotaExceeded, 
                                                record, this, null);
                } 
            } 
            try
            { 
                peer.Abort(PeerCloseReason.NodeTooSlow, PeerCloseInitiator.LocalNode);
            }
            catch(Exception e)
            { 
                if(DiagnosticUtility.IsFatal(e)) throw;
                if (null != CloseNeighborIfKnownException(neighborManager, e, peer)) 
                { 
                    throw;
                } 
                DiagnosticUtility.ExceptionUtility.TraceHandledException(e, TraceEventType.Information);
            }
        }
 
        void IFlooderForThrottle.OnThrottleReached()
        { 
            if(DiagnosticUtility.ShouldTraceInformation) 
            {
                string message = SR.GetString(SR.PeerThrottleWaiting, this.config.MeshId); 
                PeerThrottleTraceRecord record = new PeerThrottleTraceRecord(this.config.MeshId, message);
                TraceUtility.TraceEvent(TraceEventType.Information, TraceCode.PeerFlooderReceiveMessageQuotaExceeded,
                                            record, this, null);
            } 

            IPeerNeighbor peer = this.neighborManager.SlowestNeighbor(); 
            if(peer == null) 
                return;
            UtilityExtension extension = peer.Utility; 
            if(peer.IsConnected && extension != null)
            {
                if(extension.PendingMessages > PeerTransportConstants.MessageThreshold)
                { 
                    extension.BeginCheckPoint(new UtilityExtension.PruneNeighborCallback(PruneNeighborCallback));
                } 
                else 
                {
                    DiagnosticUtility.DebugAssert(false, "Neighbor is marked slow with messages "+extension.PendingMessages); 
                }
                FireReachedEvent();
            }
        } 

        void IFlooderForThrottle.OnThrottleReleased() 
        { 
            FireDequeuedEvent();
        } 

        public void FireDequeuedEvent(){FireEvent(ThrottleReleased);}

        public void FireReachedEvent(){FireEvent(ThrottleReached);} 

        public void FireKilledEvent(){FireEvent(SlowNeighborKilled);} 
 
        void FireEvent(EventHandler handler)
        { 
            if (handler != null)
                handler(this, EventArgs.Empty);
        }
 
        public virtual IAsyncResult BeginFloodEncodedMessage(byte[] id, MessageBuffer encodedMessage, TimeSpan timeout, AsyncCallback callback, object state)
        { 
            RecordOutgoingMessage(id); 
            SynchronizationContext syncContext = ThreadBehavior.GetCurrentSynchronizationContext();
            SynchronizationContext.SetSynchronizationContext(null); 

            if (neighbors.Count == 0)
            {
                return new CompletedAsyncResult(callback, state); 
            }
            try 
            { 
                return FloodMessageToNeighbors(encodedMessage, timeout, callback, state, -1, null, null, OnMessageSentHandler);
            } 
            finally
            {
                SynchronizationContext.SetSynchronizationContext(syncContext);
            } 

        } 
 
        protected virtual IAsyncResult BeginFloodReceivedMessage(IPeerNeighbor sender, MessageBuffer messageBuffer,
            TimeSpan timeout, AsyncCallback callback, object state, int index, MessageHeader hopHeader) 
        {
            quotaHelper.AcquireNoQueue();

            try 
            {
                return FloodMessageToNeighbors(messageBuffer, timeout, callback, state, index, hopHeader, sender, OnMessageSentHandler); 
            } 
            catch (Exception e)
            { 
                if(DiagnosticUtility.IsFatal(e)) throw;
                if (e is QuotaExceededException || (e is CommunicationException && e.InnerException is QuotaExceededException))
                {
                    DiagnosticUtility.ExceptionUtility.TraceHandledException(e, TraceEventType.Information); 
                    if (DiagnosticUtility.ShouldTraceError)
                    { 
                        PeerFlooderTraceRecord record = new PeerFlooderTraceRecord( 
                                                            this.config.MeshId,
                                                            sender.ListenAddress, 
                                                            e);
                        TraceUtility.TraceEvent(
                                    TraceEventType.Error,
                                    TraceCode.PeerFlooderReceiveMessageQuotaExceeded, 
                                    record,
                                    this, 
                                    null); 
                    }
                    return null; 
                }
                throw;
            }
        } 

        protected IAsyncResult BeginSendHelper(IPeerNeighbor neighbor, TimeSpan timeout, Message message, FloodAsyncResult fresult) 
        { 
            IAsyncResult result = null;
            bool fatal = false; 
            try
            {
                UtilityExtension.OnMessageSent(neighbor);
                result = neighbor.BeginSend(message, timeout, DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(fresult.OnSendComplete)), message); 
                fresult.AddResult(result, neighbor);
                if (result.CompletedSynchronously) 
                { 
                    neighbor.EndSend(result);
                    UtilityExtension.OnEndSend(neighbor, fresult); 
                }
                return result;
            }
            catch (Exception e) 
            {
                if(DiagnosticUtility.IsFatal(e)) 
                { 
                    fatal = true;
                    throw; 
                }
                if (null != CloseNeighborIfKnownException(neighborManager, e, neighbor))
                {
                    fresult.MarkEnd(false); 
                    throw;
                } 
 
                DiagnosticUtility.ExceptionUtility.TraceHandledException(e, TraceEventType.Information);
                return null; 
            }
            finally
            {
                if((result == null || result.CompletedSynchronously) && !fatal) 
                    message.Close();
 
            } 
        }
 
        public void OnMessageSent(object sender, EventArgs args)
        {
            quotaHelper.ItemDequeued();
        } 

        void KillSlowNeighbor() 
        { 
            IPeerNeighbor neighbor = this.neighborManager.SlowestNeighbor();
            if(neighbor != null) 
                neighbor.Abort(PeerCloseReason.NodeTooSlow, PeerCloseInitiator.LocalNode);
        }

 
        protected virtual IAsyncResult FloodMessageToNeighbors(MessageBuffer messageBuffer,
                                                               TimeSpan timeout, AsyncCallback callback, object state, 
                                                               int index, MessageHeader hopHeader, IPeerNeighbor except, 
                                                               EventHandler OnMessageSentCallback)
        { 
            long temp = Interlocked.Increment(ref messageSequence);
            FloodAsyncResult fresult = new FloodAsyncResult(this.neighborManager, timeout, callback, state);
            fresult.OnMessageSent += OnMessageSentCallback;
            List neighbors = this.Neighbors; 

            foreach (IPeerNeighbor neighbor in neighbors) 
            { 
                if(neighbor.Equals(except))
                    continue; 
                // Don't do anything if the neighbor is not connected
                if (PeerNeighborStateHelper.IsConnected(neighbor.State))
                {
                    Message fmessage = messageBuffer.CreateMessage(); 
                    if(index != -1)
                    { 
                        fmessage.Headers.ReplaceAt(index, hopHeader); 
                    }
 
                    // Don't do anything if the neighbor is not connected
                    if (PeerNeighborStateHelper.IsConnected(neighbor.State))
                    {
                        BeginSendHelper(neighbor, timeout, fmessage, fresult); 
                    }
                } 
            } 
            fresult.MarkEnd(true);
            return fresult; 

        }

        public void Open() 
        {
            OnOpen(); 
        } 

        public void Close() 
        {
            OnClose();
        }
 
        public abstract void OnOpen();
 
        public abstract void OnClose(); 

        public virtual void OnNeighborConnected(IPeerNeighbor neighbor) 
        {
            this.neighbors = this.neighborManager.GetConnectedNeighbors();
        }
 
        public virtual void OnNeighborClosed(IPeerNeighbor neighbor)
        { 
            this.neighbors = this.neighborManager.GetConnectedNeighbors(); 
        }
 
        public abstract void ProcessLinkUtility(IPeerNeighbor neighbor, TLinkContract utilityInfo);

        public abstract bool ShouldProcess(TFloodContract floodInfo);
        public abstract void RecordOutgoingMessage(byte[] id); 

        int UpdateHopCount(Message message, out MessageHeader hopHeader, out ulong currentValue) 
        { 
            int index = -1;
            currentValue = PeerTransportConstants.MaxHopCount; 
            hopHeader = null;
            try
            {
                // If a message contains multiple Hopcounts with our name and namespace or the message can't deserialize to a ulong then ignore the HopCount 
                index = message.Headers.FindHeader(PeerStrings.HopCountElementName, PeerStrings.HopCountElementNamespace);
                if(index != -1) 
                { 
                    currentValue = PeerMessageHelpers.GetHeaderULong(message.Headers, index);
                    hopHeader = MessageHeader.CreateHeader(PeerStrings.HopCountElementName, PeerStrings.HopCountElementNamespace, --currentValue, false); 
                }
            }
            catch (MessageHeaderException e)
            { 
                DiagnosticUtility.ExceptionUtility.TraceHandledException(e, TraceEventType.Warning);
            } 
            catch (CommunicationException e) 
            {
                DiagnosticUtility.ExceptionUtility.TraceHandledException(e, TraceEventType.Warning); 
            }
            catch(SerializationException e)
            {
                DiagnosticUtility.ExceptionUtility.TraceHandledException(e, TraceEventType.Warning); 
            }
            catch(XmlException e) 
            { 
                DiagnosticUtility.ExceptionUtility.TraceHandledException(e, TraceEventType.Warning);
            } 
            DiagnosticUtility.DebugAssert((index == -1) || (hopHeader != null), "Could not successfully create new HopCount Header!");
            return index;
        }
 
        public virtual IAsyncResult OnFloodedMessage(IPeerNeighbor neighbor, TFloodContract floodInfo, AsyncCallback callback, object state)
        { 
            bool process = false; 
            MessageBuffer messageBuffer = null;
            Message message = null; 
            Uri via;
            Uri to;
            int index=0;
            ulong remainingHops = PeerTransportConstants.MaxHopCount; 
            MessageHeader hopHeader = null;
            bool fatal = false; 
            PeerMessageProperty peerProperty = null; 
            IAsyncResult result = null;
 
            try
            {
                peerProperty = (PeerMessageProperty)floodInfo.Properties[PeerStrings.PeerProperty];
                if(!peerProperty.MessageVerified) 
                {
                    if(peerProperty.CacheMiss > UtilityExtension.AcceptableMissDistance) 
                    { 
                        UtilityExtension.ReportCacheMiss(neighbor, peerProperty.CacheMiss);
                    } 
                    result = new CompletedAsyncResult(callback,state);
                }
                else
                { 
                    process = true;
                    messageBuffer = floodInfo.CreateBufferedCopy((int)this.config.MaxReceivedMessageSize); 
                    message = messageBuffer.CreateMessage(); 
                    via = peerProperty.PeerVia;
                    to = peerProperty.PeerTo; 
                    message.Headers.To = message.Properties.Via = via;

                    index = UpdateHopCount(message, out hopHeader, out remainingHops);
 
                    PeerMessagePropagation propagateFlags = PeerMessagePropagation.LocalAndRemote;
                    if(peerProperty.SkipLocalChannels) 
                        propagateFlags = PeerMessagePropagation.Remote; 
                    else if(messageHandler.HasMessagePropagation)
                    { 
                        using(Message filterMessage = messageBuffer.CreateMessage())
                        {
                            propagateFlags = messageHandler.DetermineMessagePropagation(filterMessage, PeerMessageOrigination.Remote);
                        } 
                    }
 
                    if((propagateFlags & PeerMessagePropagation.Remote) != 0) 
                    {
                        if(remainingHops == 0) 
                            propagateFlags &= ~PeerMessagePropagation.Remote;
                    }
                    if((propagateFlags & PeerMessagePropagation.Remote) != 0)
                    { 
                        result = BeginFloodReceivedMessage(neighbor, messageBuffer, PeerTransportConstants.ForwardTimeout, callback, state, index, hopHeader);
                    } 
                    else 
                    {
                        result = new CompletedAsyncResult(callback,state); 
                    }
                    if((propagateFlags & PeerMessagePropagation.Local) != 0)
                    {
                        messageHandler.HandleIncomingMessage(messageBuffer, propagateFlags, index, hopHeader, via, to); 
                    }
                } 
                UtilityExtension.UpdateLinkUtility(neighbor, process); 
            }
            catch (Exception e) 
            {
                if(DiagnosticUtility.IsFatal(e))
                {
                    fatal = true; 
                    throw;
                } 
                if (null != CloseNeighborIfKnownException(neighborManager, e, neighbor)) 
                {
                    throw; 
                }
                DiagnosticUtility.ExceptionUtility.TraceHandledException(e, TraceEventType.Information);
            }
            finally 
            {
                if(!fatal) 
                { 
                    if(message != null)
                        message.Close(); 
                    if(messageBuffer != null)
                        messageBuffer.Close();
                }
            } 
            return result;
        } 
 
        public virtual void EndFloodMessage(IAsyncResult result)
        { 
            if(result is CompletedAsyncResult)
            {
                CompletedAsyncResult.End(result);
                return; 
            }
            FloodAsyncResult fresult = result as FloodAsyncResult; 
            DiagnosticUtility.DebugAssert(fresult != null, "Invalid AsyncResult type in EndFloodResult"); 
            fresult.End();
 
        }

        protected long MaxReceivedMessageSize
        { 
            get { return config.MaxReceivedMessageSize; }
        } 
 
        protected MessageEncoder MessageEncoder
        { 
            get { return config.MessageEncoder; }
        }

        protected object ThisLock 
        {
            get { return this.thisLock; } 
        } 

        protected List Neighbors 
        {
            get { return this.neighbors; }
        }
 
        // Guaranteed not to throw anything other than fatal exceptions
        static internal Exception CloseNeighborIfKnownException(PeerNeighborManager neighborManager, Exception exception, IPeerNeighbor peer) 
        { 
            try
            { 
                //ignore this one since the channel is already closed.
                if (exception is ObjectDisposedException)
                    return null;
                else if ( 
                    (exception is CommunicationException && !(exception.InnerException is QuotaExceededException))
                    || (exception is TimeoutException) 
                    || (exception is InvalidOperationException) 
                    || (exception is MessageSecurityException)
                ) 
                {
                    //is this the right close reason?
                    neighborManager.CloseNeighbor(peer, PeerCloseReason.InternalFailure, PeerCloseInitiator.LocalNode, exception);
                    return null; 
                }
                else 
                { 
                    //exception that we dont know or cant act on.
                    //we will throw this exception to the user. 
                    return exception;
                }
            }
            catch(Exception e) 
            {
                if(DiagnosticUtility.IsFatal(e)) throw; 
                DiagnosticUtility.ExceptionUtility.TraceHandledException(e, TraceEventType.Information); 
                return e;
            } 
        }

        public static void EndFloodEncodedMessage(IAsyncResult result)
        { 
            CompletedAsyncResult cresult = result as CompletedAsyncResult;
            if(cresult != null) 
                CompletedAsyncResult.End(result); 
            else
            { 
                FloodAsyncResult fresult = result as FloodAsyncResult;
                if (fresult == null)
                    throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgument("result", SR.GetString(SR.InvalidAsyncResult));
                fresult.End(); 
            }
        } 
 
        public void EndFloodReceivedMessage(IAsyncResult result)
        { 
            FloodAsyncResult fresult = result as FloodAsyncResult;
            DiagnosticUtility.DebugAssert(fresult != null, "Invalid FloodAsyncResult instance during EndFloodReceivedMessage");
        }
 

        public class PeerThrottleHelper 
        { 
            int outgoingEnqueuedCount = 0;
            int outgoingQuota = 128; 
            IFlooderForThrottle flooder;


            public PeerThrottleHelper(IFlooderForThrottle flooder, int outgoingLimit) 
            {
                this.outgoingQuota = outgoingLimit; 
                this.flooder = flooder; 
            }
 
            public void ItemDequeued()
            {
                Interlocked.Decrement(ref outgoingEnqueuedCount);
            } 

            public void AcquireNoQueue() 
            { 
                int value = Interlocked.Increment(ref outgoingEnqueuedCount);
                if (value >= outgoingQuota) 
                {
                    flooder.OnThrottleReached();
                }
            } 
        }
    } 
 
    class PeerFlooderSimple : PeerFlooderBase
    { 
        ListManager messageIds;
        const uint MaxBuckets = 5;

        internal PeerFlooderSimple(PeerNodeConfig config, PeerNeighborManager neighborManager) 
            : base(config, neighborManager)
        { 
            //we want a message id cache that holds message ids for atmost 5 mins. 
            this.messageIds = new ListManager(MaxBuckets);
        } 

        public override bool ShouldProcess(Message message)
        {
            return message.Properties.ContainsKey(PeerStrings.MessageVerified); 
        }
        public bool IsNotSeenBefore(Message message, out byte[] id, out int cacheHit) 
        { 
            cacheHit = -1;
            id = PeerNodeImplementation.DefaultId; 
            if(message is SecurityVerifiedMessage)
            {
                id = (message as SecurityVerifiedMessage).PrimarySignatureValue;
 
            }
            else 
            { 
                System.Xml.UniqueId messageId = PeerMessageHelpers.GetHeaderUniqueId(message.Headers, PeerStrings.MessageId, PeerStrings.Namespace);
                if(messageId == null) 
                    return false;
                if(messageId.IsGuid)
                {
                    id = new byte[16]; 
                    messageId.TryGetGuid(id, 0);
                } 
                else 
                    return false;
            } 
            cacheHit = messageIds.AddForLookup(id);
            if (cacheHit == -1)
            {
                return true; 
            }
            return false; 
 
        }
 
        public override void RecordOutgoingMessage(byte[] id)
        {
            this.messageIds.AddForFlood(id);
        } 

        public override void OnOpen() 
        { 
        }
 
        public override void OnClose()
        {
            this.messageIds.Close();
        } 

 
        public override IAsyncResult OnFloodedMessage(IPeerNeighbor neighbor, Message floodInfo, AsyncCallback callback, object state) 
        {
            return base.OnFloodedMessage(neighbor, floodInfo, callback, state); 
        }

        public override void EndFloodMessage(IAsyncResult result)
        { 
            base.EndFloodMessage(result);
 
        } 

        public override void ProcessLinkUtility(IPeerNeighbor neighbor, UtilityInfo utilityInfo) 
        {
            if (!PeerNeighborStateHelper.IsConnected(neighbor.State))
            {
                neighbor.Abort(PeerCloseReason.InvalidNeighbor, PeerCloseInitiator.LocalNode); 
                return;
            } 
 
            try
            { 
                UtilityExtension.ProcessLinkUtility(neighbor, utilityInfo);
            }
            catch (Exception e)
            { 
                if(DiagnosticUtility.IsFatal(e)) throw;
                if (null != CloseNeighborIfKnownException(neighborManager, e, neighbor)) 
                { 
                    throw;
                } 
                DiagnosticUtility.ExceptionUtility.TraceHandledException(e, TraceEventType.Information);
            }
        }
 
        class ListManager
        { 
            uint active;            //current bucket. 
            readonly uint buckets;
            bool disposed = false; 
            IOThreadTimer messagePruningTimer;
            //we service the hashtables every one minute
            static readonly int PruningTimout = 60 * 1000;
            static readonly int InitialCount = 1000; 
            Dictionary[] tables;
            //Hashtable[] tables; 
            object thisLock; 
            static NonceCache.NonceCacheImpl.NonceKeyComparer keyComparer = new NonceCache.NonceCacheImpl.NonceKeyComparer();
            const int NotFound = -1; 
            //creating this ListManager with n implies that the entries will be available for n minutes atmost.
            //in the n+1 minute, the timer message handler will kick in to clear older messages.
            //every minute, the
            public ListManager(uint buckets) 
            {
                if (!(buckets > 1)) 
                { 
                    DiagnosticUtility.DebugAssert("ListManager should be used atleast with 2 buckets");
                    throw DiagnosticUtility.ExceptionUtility.ThrowHelperInternal(false); 
                }
                this.buckets = buckets;
                tables = new Dictionary[buckets];
 
                for(uint i=0;i 0; i--)
                {
                    if (tables[(active + i) % buckets].TryGetValue(key, out flooded)) 
                    {
                        if(!flooded) 
                        { 
                            tables[(active + i) % buckets][key] = true;
                            return true; 
                        }
                        else
                            return false;
                    } 
                }
                tables[active].Add(key,true); 
                return true; 
            }
 
            //it does not use locks and expects the caller to hold the lock.
            internal int Contains(byte[] key)
            {
                int cache = NotFound; 
                uint i = 0;
                //check if the message is present in any of the buckets. 
                //assumption is that a hit is likely in the current or most recent bucket. 
                //we start looking in the current active table and then in the previous and then backwards ...
                for ( i = buckets; i > 0; i--) 
                {
                    if (tables[(active + i) % buckets].ContainsKey(key))
                        cache = (int)i;
                } 
                if (cache < 0)
                    return cache; 
                cache = (int)((active+buckets - i) % buckets); 
                return cache;
            } 

            private void OnTimeout(object state)
            {
                if (disposed) 
                    return;
                lock (ThisLock) 
                { 
                    if(disposed)
                        return; 
                    active = (active + 1) % (buckets);
                    tables[active] = NewCache(tables[active].Count);
                    messagePruningTimer.Set(PruningTimout);
                } 
            }
 
            Dictionary NewCache(int capacity) 
            {
                return new Dictionary(capacity, keyComparer); 
            }
        }
    }
 

    // this class should contain a collection of IAsyncResults returned from neighbor.BeginSend 
    // and complete once all sends have completed 
    class FloodAsyncResult : AsyncResult
    { 
        bool doneAdding = false;
        Exception exception;
        PeerNeighborManager pnm;
        bool isCompleted = false; 
        //async results who signaled completion but we have not called EndSend.
        List pending = new List(); 
        Dictionary results = new Dictionary(); 
        bool shouldCallComplete = false;
        object thisLock = new object(); 
        TimeoutHelper timeoutHelper;
        bool offNode = false;
        public event EventHandler OnMessageSent;
 

        public FloodAsyncResult(PeerNeighborManager owner, TimeSpan timeout, AsyncCallback callback, object state) 
            : base(callback, state) 
        {
            this.pnm = owner; 
            this.timeoutHelper = new TimeoutHelper(timeout);
        }

        object ThisLock 
        {
            get 
            { 
                return thisLock;
            } 
        }

        public void AddResult(IAsyncResult result, IPeerNeighbor neighbor)
        { 
            lock (ThisLock)
            { 
                this.results.Add(result, neighbor); 
            }
        } 

        //user wants to end business. This method is called as a result of EndSend on the flooder.
        //internal methods do not call this. we are asserting that this method should not be called in case of failed BeginX
        public void End() 
        {
            if (!(this.doneAdding && this.shouldCallComplete)) 
            { 
                DiagnosticUtility.DebugAssert("Unexpected end!");
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperInternal(false); 
            }
            if (this.isCompleted)
            {
                return; 
            }
 
            //simply wait on the base's event handle 
            bool completed = TimeoutHelper.WaitOne(this.AsyncWaitHandle, this.timeoutHelper.RemainingTime(), false);
            if (!completed) 
            {
                // a time out occurred - if mo message went off node then tell AsyncResult to throw.
                if (!offNode)
                { 
                    try
                    { 
                        throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new TimeoutException()); 
                    }
                    catch(Exception e) 
                    {
                        if(DiagnosticUtility.IsFatal(e)) throw;
                        DiagnosticUtility.ExceptionUtility.TraceHandledException(e, TraceEventType.Information);
                        this.exception = e; 
                    }
                } 
                //otherwise trace that the timeout was not sufficient for complete send 
                lock (ThisLock)
                { 
                    if (this.isCompleted)
                        return;
                    this.isCompleted = true;
                } 
                CompleteOp(false);
            } 
            AsyncResult.End(this); 
        }
 
        //this method marks the end of BeginX by the flooder.
        //if there were errors during BeginX, this method may be prematurely called
        //in this case, our only job is to call EndX on successful BeginX calls. we do not report back to caller in this case.
        //base.Complete will not be called and End() will not be called. User has already received exception during BeginX 
        //if there was no exception during BeginX, excep param is null. In this case, we call base.Complete upon the last EndX
        public void MarkEnd(bool success) 
        { 
            bool callComplete = false;
            try 
            {
                lock (this.ThisLock)
                {
                    foreach (IAsyncResult result in pending) 
                    {
                        OnSendComplete(result); 
                    } 
                    pending.Clear();
                    this.doneAdding = true; 
                    this.shouldCallComplete = success; //only call base.Complete if there is no error during BeginX
                    if (this.results.Count == 0)
                    {
                        this.isCompleted = true; 
                        callComplete = true;
                    } 
                } 
            }
            finally 
            {
                if (callComplete)
                {
                    CompleteOp(true); 
                }
            } 
 
        }
 

        //this is the callback routine for async completion on channel BeginSend() operations.
        //if we are done, simply return. This can happen if user called sync EndX.
        //if the flooder is still processing BeginSend(), then we probably cant complete. In this case, add the result to pending and return 
        //main thread will flush the pending completions in MarkEnd().
        //otherwise, call EndX on the result and remove it from results. 
        //if this is the last invoke, signal user using base.Complete AND isCompleted=true 
        internal void OnSendComplete(IAsyncResult result)
        { 
            bool callComplete = false;
            IPeerNeighbor neighbor = null;
            bool fatal = false;
            if (isCompleted) 
                return;
            Message message = (Message)result.AsyncState; 
 
            //wait until flooder had a chance to call all outgoing channels and give us Async results.
            lock (ThisLock) 
            {
                if (isCompleted)
                    return;
 
                if (!this.results.TryGetValue(result, out neighbor))
                { 
                    if (!doneAdding) 
                        this.pending.Add(result);
                    else 
                    {
                        DiagnosticUtility.DebugAssert("IAsyncResult is un-accounted for.");
                        throw DiagnosticUtility.ExceptionUtility.ThrowHelperInternal(false);
                    } 
                    return;
                } 
                this.results.Remove(result); 

                try 
                {
                    //try doing this only if the async result is marked !CompletedSynchronously.
                    if (!result.CompletedSynchronously)
                    { 
                        neighbor.EndSend(result);
                        offNode = true; 
                        UtilityExtension.OnEndSend(neighbor, this); 
                    }
                } 
                catch (Exception e)
                {
                    if(DiagnosticUtility.IsFatal(e))
                    { 
                        fatal = true;
                        throw; 
                    } 

                    Exception temp = PeerFlooder.CloseNeighborIfKnownException(pnm, e, neighbor); 
                    //we want to return the very first exception to the user.
                    if (temp != null && this.doneAdding && !this.shouldCallComplete)
                        throw;
                    if (this.exception == null) 
                    {
                        this.exception = temp; 
                    } 
                    DiagnosticUtility.ExceptionUtility.TraceHandledException(e, TraceEventType.Information);
                } 
                finally
                {
                    if(message != null && !result.CompletedSynchronously && !fatal)
                        message.Close(); 
                }
                //dont want to call Complete from the lock. 
                //we just decide if this thread should call complete and call outside the lock. 
                if (this.results.Count == 0 && this.doneAdding && this.shouldCallComplete)
                { 
                    this.isCompleted = true;
                    callComplete = true;
                }
            } 
            //if we are done with callbacks and beginx calls,
            if (callComplete && this.shouldCallComplete) 
            { 
                CompleteOp(false);
            } 
        }

        void CompleteOp(bool sync)
        { 
            //call the callback upon finish
            OnMessageSent(this, EventArgs.Empty); 
            base.Complete(sync, this.exception); 
        }
 
    }
}

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
// Copyright (c) Microsoft Corporation. All rights reserved.
                        

Link Menu

Network programming in C#, Network Programming in VB.NET, Network Programming in .NET
This book is available now!
Buy at Amazon US or
Buy at Amazon UK