ReliableReplySessionChannel.cs source code in C# .NET

Source code for the .NET framework in C#

                        

Code:

/ WCF / WCF / 3.5.30729.1 / untmp / Orcas / SP / ndp / cdf / src / WCF / ServiceModel / System / ServiceModel / Channels / ReliableReplySessionChannel.cs / 1 / ReliableReplySessionChannel.cs

                            //------------------------------------------------------------------------------ 
// Copyright (c) Microsoft Corporation.  All rights reserved.
//-----------------------------------------------------------------------------
namespace System.ServiceModel.Channels
{ 
    using System.Collections.Generic;
    using System.ServiceModel; 
    using System.ServiceModel.Diagnostics; 
    using System.Threading;
    using System.Xml; 

    // Note on locking:
    // The following rule must be followed in order to avoid deadlocks: ReliableRequestContext
    // locks MUST NOT be taken while under the ReliableReplySessionChannel lock. 
    //
    // lock(context)-->lock(channel) ok. 
    // lock(channel)-->lock(context) BAD! 
    //
    sealed class ReliableReplySessionChannel : ReplyChannel, IReplySessionChannel 
    {
        List acked = new List();
        static WaitCallback asyncReceiveComplete = new WaitCallback(AsyncReceiveCompleteStatic);
        IServerReliableChannelBinder binder; 
        ReplyHelper closeSequenceReplyHelper;
        ReliableInputConnection connection; 
        bool contextAborted; 
        DeliveryStrategy deliveryStrategy;
        ReliableRequestContext lastReply; 
        bool lastReplyAcked;
        Int64 lastReplySequenceNumber = Int64.MinValue;
        ReliableChannelListenerBase listener;
        InterruptibleWaitObject messagingCompleteWaitObject; 
        Int64 nextReplySequenceNumber;
        static AsyncCallback onReceiveCompleted = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(OnReceiveCompletedStatic)); 
        string perfCounterId; 
        Dictionary requestsByRequestSequenceNumber = new Dictionary();
        Dictionary requestsByReplySequenceNumber = new Dictionary(); 
        ServerReliableSession session;
        ReplyHelper terminateSequenceReplyHelper;

        public ReliableReplySessionChannel( 
            ReliableChannelListenerBase listener,
            IServerReliableChannelBinder binder, 
            FaultHelper faultHelper, 
            UniqueId inputID,
            UniqueId outputID) 
            : base(listener, binder.LocalAddress)
        {
            this.listener = listener;
            this.connection = new ReliableInputConnection(); 
            this.connection.ReliableMessagingVersion = this.listener.ReliableMessagingVersion;
            this.binder = binder; 
            this.session = new ServerReliableSession(this, listener, binder, faultHelper, inputID, outputID); 
            this.session.UnblockChannelCloseCallback = this.UnblockClose;
 
            if (this.listener.Ordered)
                this.deliveryStrategy = new OrderedDeliveryStrategy(this, this.listener.MaxTransferWindowSize, true);
            else
                this.deliveryStrategy = new UnorderedDeliveryStrategy(this, this.listener.MaxTransferWindowSize); 
            this.binder.Faulted += OnBinderFaulted;
            this.binder.OnException += OnBinderException; 
            if (this.listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005) 
            {
                this.messagingCompleteWaitObject = new InterruptibleWaitObject(false); 
            }
            this.session.Open(TimeSpan.Zero);

            if (PerformanceCounters.PerformanceCountersEnabled) 
                this.perfCounterId = this.listener.Uri.ToString().ToUpperInvariant();
 
            if (binder.HasSession) 
            {
                try 
                {
                    this.StartReceiving(false);
                }
#pragma warning suppress 56500 // covered by FxCOP 
                catch (Exception e)
                { 
                    if (DiagnosticUtility.IsFatal(e)) 
                    {
                        throw; 
                    }

                    this.session.OnUnknownException(e);
                } 
            }
        } 
 
        public IServerReliableChannelBinder Binder
        { 
            get
            {
                return this.binder;
            } 
        }
 
        bool IsMessagingCompleted 
        {
            get 
            {
                lock (this.ThisLock)
                {
                    return this.connection.AllAdded && (this.requestsByRequestSequenceNumber.Count == 0) && this.lastReplyAcked; 
                }
            } 
        } 

        MessageVersion MessageVersion 
        {
            get
            {
                return this.listener.MessageVersion; 
            }
        } 
 
        int PendingRequestContexts
        { 
            get
            {
                lock (this.ThisLock)
                { 
                    return (this.requestsByRequestSequenceNumber.Count - this.requestsByReplySequenceNumber.Count);
                } 
            } 
        }
 
        public IInputSession Session
        {
            get
            { 
                return this.session;
            } 
        } 

        void AbortContexts() 
        {
            lock (this.ThisLock)
            {
                if (this.contextAborted) 
                    return;
                this.contextAborted = true; 
            } 

            Dictionary.ValueCollection contexts = this.requestsByRequestSequenceNumber.Values; 

            foreach (ReliableRequestContext request in contexts)
            {
                request.Abort(); 
            }
 
            this.requestsByRequestSequenceNumber.Clear(); 
            this.requestsByReplySequenceNumber.Clear();
 

            if (this.listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005)
            {
                if (this.lastReply != null) 
                    this.lastReply.Abort();
            } 
        } 

        void AddAcknowledgementHeader(Message message) 
        {
            WsrmUtilities.AddAcknowledgementHeader(
                this.listener.ReliableMessagingVersion,
                message, 
                this.session.InputID,
                this.connection.Ranges, 
                this.connection.IsLastKnown, 
                this.listener.MaxTransferWindowSize - this.deliveryStrategy.EnqueuedCount);
        } 

        static void AsyncReceiveCompleteStatic(object state)
        {
            IAsyncResult result = (IAsyncResult)state; 
            ReliableReplySessionChannel channel = (ReliableReplySessionChannel)(result.AsyncState);
            try 
            { 
                if (channel.HandleReceiveComplete(result))
                { 
                    channel.StartReceiving(true);
                }
            }
#pragma warning suppress 56500 // covered by FxCOP 
            catch (Exception e)
            { 
                if (DiagnosticUtility.IsFatal(e)) 
                {
                    throw; 
                }

                channel.session.OnUnknownException(e);
            } 
        }
 
        IAsyncResult BeginCloseBinder(TimeSpan timeout, AsyncCallback callback, object state) 
        {
            return this.binder.BeginClose(timeout, MaskingMode.Handled, callback, state); 
        }

        IAsyncResult BeginCloseOutput(TimeSpan timeout, AsyncCallback callback, object state)
        { 
            if (this.listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005)
            { 
                ReliableRequestContext reply = this.lastReply; 

                if (reply == null) 
                    return new CloseOutputCompletedAsyncResult(callback, state);
                else
                    return reply.BeginReplyInternal(null, timeout, callback, state);
            } 
            else
            { 
                lock (this.ThisLock) 
                {
                    this.ThrowIfClosed(); 
                    this.CreateCloseSequenceReplyHelper();
                }

                return this.closeSequenceReplyHelper.BeginWaitAndReply(timeout, callback, state); 
            }
        } 
 
        IAsyncResult BeginUnregisterChannel(TimeSpan timeout, AsyncCallback callback, object state)
        { 
            return this.listener.OnReliableChannelBeginClose(this.session.InputID,
                this.session.OutputID, timeout, callback, state);
        }
 
        Message CreateAcknowledgement(SequenceRangeCollection ranges)
        { 
            Message message = WsrmUtilities.CreateAcknowledgmentMessage( 
                this.MessageVersion,
                this.listener.ReliableMessagingVersion, 
                this.session.InputID,
                ranges,
                this.connection.IsLastKnown,
                this.listener.MaxTransferWindowSize - this.deliveryStrategy.EnqueuedCount); 

            return message; 
        } 

        Message CreateSequenceClosedFault() 
        {
            Message message = new SequenceClosedFault(this.session.InputID).CreateMessage(
                this.listener.MessageVersion, this.listener.ReliableMessagingVersion);
            this.AddAcknowledgementHeader(message); 
            return message;
        } 
 
        bool CreateCloseSequenceReplyHelper()
        { 
            if (this.State == CommunicationState.Faulted || this.Aborted)
            {
                return false;
            } 

            if (this.closeSequenceReplyHelper == null) 
            { 
                this.closeSequenceReplyHelper = new ReplyHelper(this, CloseSequenceReplyProvider.Instance,
                    true); 
            }

            return true;
        } 

        bool CreateTerminateSequenceReplyHelper() 
        { 
            if (this.State == CommunicationState.Faulted || this.Aborted)
            { 
                return false;
            }

            if (this.terminateSequenceReplyHelper == null) 
            {
                this.terminateSequenceReplyHelper = new ReplyHelper(this, 
                    TerminateSequenceReplyProvider.Instance, false); 
            }
 
            return true;
        }

        void CloseOutput(TimeSpan timeout) 
        {
            if (this.listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005) 
            { 
                ReliableRequestContext reply = this.lastReply;
 
                if (reply != null)
                    reply.ReplyInternal(null, timeout);
            }
            else 
            {
                lock (this.ThisLock) 
                { 
                    this.ThrowIfClosed();
                    this.CreateCloseSequenceReplyHelper(); 
                }

                this.closeSequenceReplyHelper.WaitAndReply(timeout);
            } 
        }
 
        bool ContainsRequest(Int64 requestSeqNum) 
        {
            lock (this.ThisLock) 
            {
                bool haveRequestInDictionary = this.requestsByRequestSequenceNumber.ContainsKey(requestSeqNum);

                if (this.listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005) 
                {
                    return (haveRequestInDictionary 
                        || ((this.lastReply != null) && (this.lastReply.RequestSequenceNumber == requestSeqNum) && (!this.lastReplyAcked))); 
                }
                else 
                {
                    return haveRequestInDictionary;
                }
            } 
        }
 
        void EndCloseBinder(IAsyncResult result) 
        {
            this.binder.EndClose(result); 
        }

        void EndCloseOutput(IAsyncResult result)
        { 
            if (this.listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005)
            { 
                if (result is CloseOutputCompletedAsyncResult) 
                    CloseOutputCompletedAsyncResult.End(result);
                else 
                    this.lastReply.EndReplyInternal(result);
            }
            else
            { 
                this.closeSequenceReplyHelper.EndWaitAndReply(result);
            } 
        } 

        void EndUnregisterChannel(IAsyncResult result) 
        {
            this.listener.OnReliableChannelEndClose(result);
        }
 
        public override T GetProperty()
        { 
            if (typeof(T) == typeof(IReplySessionChannel)) 
            {
                return (T)(object)this; 
            }

            T baseProperty = base.GetProperty();
 
            if (baseProperty != null)
            { 
                return baseProperty; 
            }
 
            T innerProperty = this.binder.Channel.GetProperty();
            if ((innerProperty == null) && (typeof(T) == typeof(FaultConverter)))
            {
                return (T)(object)FaultConverter.GetDefaultFaultConverter(this.listener.MessageVersion); 
            }
            else 
            { 
                return innerProperty;
            } 
        }

        bool HandleReceiveComplete(IAsyncResult result)
        { 
            RequestContext context;
 
            if (this.Binder.EndTryReceive(result, out context)) 
            {
                if (context == null) 
                {
                    bool terminated = false;

                    lock (this.ThisLock) 
                    {
                        terminated = this.connection.Terminate(); 
                    } 

                    if (!terminated && (this.Binder.State == CommunicationState.Opened)) 
                    {
                        Exception e = new CommunicationException(SR.GetString(SR.EarlySecurityClose));
                        this.session.OnLocalFault(e, (Message)null, null);
                    } 

                    return false; 
                } 

                WsrmMessageInfo info = WsrmMessageInfo.Get(this.listener.MessageVersion, 
                    this.listener.ReliableMessagingVersion, this.binder.Channel, this.binder.GetInnerSession(),
                    context.RequestMessage);

                this.StartReceiving(false); 
                this.ProcessRequest(context, info);
                return false; 
            } 

            return true; 
        }

        protected override void OnAbort()
        { 
            if (this.closeSequenceReplyHelper != null)
            { 
                this.closeSequenceReplyHelper.Abort(); 
            }
 
            this.connection.Abort(this);
            if (this.terminateSequenceReplyHelper != null)
            {
                this.terminateSequenceReplyHelper.Abort(); 
            }
            this.session.Abort(); 
            this.AbortContexts(); 
            if (this.listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005)
            { 
                this.messagingCompleteWaitObject.Abort(this);
            }
            this.listener.OnReliableChannelAbort(this.session.InputID, this.session.OutputID);
            base.OnAbort(); 
        }
 
        protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state) 
        {
            this.ThrowIfCloseInvalid(); 
            bool wsrmFeb2005 = this.listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005;

            OperationWithTimeoutBeginCallback[] beginOperations =
                new OperationWithTimeoutBeginCallback[] { 
                    new OperationWithTimeoutBeginCallback (this.BeginCloseOutput),
                    wsrmFeb2005 
                    ? new OperationWithTimeoutBeginCallback(this.connection.BeginClose) 
                    : new OperationWithTimeoutBeginCallback(this.BeginTerminateSequence),
                    wsrmFeb2005 
                    ? new OperationWithTimeoutBeginCallback(this.messagingCompleteWaitObject.BeginWait)
                    : new OperationWithTimeoutBeginCallback(this.connection.BeginClose),
                    new OperationWithTimeoutBeginCallback(this.session.BeginClose),
                    new OperationWithTimeoutBeginCallback(this.BeginCloseBinder), 
                    new OperationWithTimeoutBeginCallback(this.BeginUnregisterChannel),
                    new OperationWithTimeoutBeginCallback(base.OnBeginClose) 
                }; 

            OperationEndCallback[] endOperations = 
                new OperationEndCallback[] {
                    new OperationEndCallback(this.EndCloseOutput),
                    wsrmFeb2005
                    ? new OperationEndCallback(this.connection.EndClose) 
                    : new OperationEndCallback(this.EndTerminateSequence),
                    wsrmFeb2005 
                    ? new OperationEndCallback(this.messagingCompleteWaitObject.EndWait) 
                    : new OperationEndCallback(this.connection.EndClose),
                    new OperationEndCallback(this.session.EndClose), 
                    new OperationEndCallback(this.EndCloseBinder),
                    new OperationEndCallback(this.EndUnregisterChannel),
                    new OperationEndCallback(base.OnEndClose)
                }; 

            return OperationWithTimeoutComposer.BeginComposeAsyncOperations(timeout, 
                beginOperations, endOperations, callback, state); 
        }
 
        void OnBinderException(IReliableChannelBinder sender, Exception exception)
        {
            if (exception is QuotaExceededException)
                this.session.OnLocalFault(exception, (Message)null, null); 
            else
                this.EnqueueAndDispatch(exception, null, false); 
        } 

        void OnBinderFaulted(IReliableChannelBinder sender, Exception exception) 
        {
            this.binder.Abort();

            exception = new CommunicationException(SR.GetString(SR.EarlySecurityFaulted), exception); 
            this.session.OnLocalFault(exception, (Message)null, null);
        } 
 
        protected override void OnClose(TimeSpan timeout)
        { 
            this.ThrowIfCloseInvalid();
            TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
            this.CloseOutput(timeoutHelper.RemainingTimeExpireZero());
            if (this.listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005) 
            {
                this.connection.Close(timeoutHelper.RemainingTime()); 
                this.messagingCompleteWaitObject.Wait(timeoutHelper.RemainingTime()); 
            }
            else 
            {
                this.TerminateSequence(timeoutHelper.RemainingTime());
                this.connection.Close(timeoutHelper.RemainingTime());
            } 
            this.session.Close(timeoutHelper.RemainingTime());
            this.binder.Close(timeoutHelper.RemainingTime(), MaskingMode.Handled); 
            this.listener.OnReliableChannelClose(this.session.InputID, this.session.OutputID, 
                timeoutHelper.RemainingTime());
            base.OnClose(timeoutHelper.RemainingTime()); 
        }

        protected override void OnClosed()
        { 
            this.deliveryStrategy.Dispose();
            this.binder.Faulted -= this.OnBinderFaulted; 
 
            if (this.listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005)
            { 
                if (this.lastReply != null)
                {
                    this.lastReply.Abort();
                } 
            }
 
            base.OnClosed(); 
        }
 
        protected override void OnEndClose(IAsyncResult result)
        {
            OperationWithTimeoutComposer.EndComposeAsyncOperations(result);
        } 

        protected override void OnFaulted() 
        { 
            this.session.OnFaulted();
            this.UnblockClose(); 
            base.OnFaulted();
            if (PerformanceCounters.PerformanceCountersEnabled)
                PerformanceCounters.SessionFaulted(this.perfCounterId);
        } 

        static void OnReceiveCompletedStatic(IAsyncResult result) 
        { 
            if (result.CompletedSynchronously)
                return; 
            ReliableReplySessionChannel channel = (ReliableReplySessionChannel)(result.AsyncState);

            try
            { 
                if (channel.HandleReceiveComplete(result))
                { 
                    channel.StartReceiving(true); 
                }
            } 
#pragma warning suppress 56500 // covered by FxCOP
            catch (Exception e)
            {
                if (DiagnosticUtility.IsFatal(e)) 
                {
                    throw; 
                } 

                channel.session.OnUnknownException(e); 
            }
        }

        void OnTerminateSequenceCompleted() 
        {
            if ((this.session.Settings.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessaging11) 
                && this.connection.IsSequenceClosed) 
            {
                lock (this.ThisLock) 
                {
                    this.connection.Terminate();
                }
            } 
        }
 
        bool PrepareReply(ReliableRequestContext context) 
        {
            lock (this.ThisLock) 
            {
                if (this.Aborted || this.State == CommunicationState.Faulted || this.State == CommunicationState.Closed)
                    return false;
 
                long requestSequenceNumber = context.RequestSequenceNumber;
                bool wsrmFeb2005 = this.listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005; 
 
                if (wsrmFeb2005 && (this.connection.Last == requestSequenceNumber))
                { 
                    if (this.lastReply == null)
                        this.lastReply = context;
                    this.requestsByRequestSequenceNumber.Remove(requestSequenceNumber);
                    bool canReply = this.connection.AllAdded && (this.State == CommunicationState.Closing); 
                    if (!canReply)
                        return false; 
                } 
                else
                { 
                    if (this.State == CommunicationState.Closing)
                        return false;

                    if (!context.HasReply) 
                    {
                        this.requestsByRequestSequenceNumber.Remove(requestSequenceNumber); 
                        return true; 
                    }
                } 

                // won't throw if you do not need next sequence number
                if (this.nextReplySequenceNumber == Int64.MaxValue)
                { 
                    MessageNumberRolloverFault fault = new MessageNumberRolloverFault(this.session.OutputID);
                    throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(fault.CreateException()); 
                } 
                context.SetReplySequenceNumber(++this.nextReplySequenceNumber);
 
                if (wsrmFeb2005 && (this.connection.Last == requestSequenceNumber))
                {
                    if (!context.HasReply)
                        this.lastReplyAcked = true;   //If Last Reply has no user data, it does not need to be acked. Here we just set it as its ack received. 
                    this.lastReplySequenceNumber = this.nextReplySequenceNumber;
                    context.SetLastReply(this.lastReplySequenceNumber); 
                } 
                else if (context.HasReply)
                { 
                    this.requestsByReplySequenceNumber.Add(this.nextReplySequenceNumber, context);
                }

                return true; 
            }
        } 
 
        Message PrepareReplyMessage(Int64 replySequenceNumber, bool isLast, SequenceRangeCollection ranges, Message reply)
        { 
            this.AddAcknowledgementHeader(reply);

            WsrmUtilities.AddSequenceHeader(
                this.listener.ReliableMessagingVersion, 
                reply,
                this.session.OutputID, 
                replySequenceNumber, 
                isLast);
 
            return reply;
        }

        void ProcessAcknowledgment(WsrmAcknowledgmentInfo info) 
        {
            lock (this.ThisLock) 
            { 
                if (this.Aborted || this.State == CommunicationState.Faulted || this.State == CommunicationState.Closed)
                    return; 

                if (this.requestsByReplySequenceNumber.Count > 0)
                {
                    Int64 reply; 

                    this.acked.Clear(); 
 
                    foreach (KeyValuePair pair in this.requestsByReplySequenceNumber)
                    { 
                        reply = pair.Key;
                        if (info.Ranges.Contains(reply))
                        {
                            this.acked.Add(reply); 
                        }
                    } 
 
                    for (int i = 0; i < this.acked.Count; i++)
                    { 
                        reply = this.acked[i];
                        this.requestsByRequestSequenceNumber.Remove(
                            this.requestsByReplySequenceNumber[reply].RequestSequenceNumber);
                        this.requestsByReplySequenceNumber.Remove(reply); 
                    }
 
                    if (this.listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005) 
                    {
                        if (!this.lastReplyAcked && (this.lastReplySequenceNumber != Int64.MinValue)) 
                        {
                            this.lastReplyAcked = info.Ranges.Contains(this.lastReplySequenceNumber);
                        }
                    } 
                }
            } 
        } 

        void ProcessAckRequested(RequestContext context) 
        {
            try
            {
                using (Message reply = CreateAcknowledgement(this.connection.Ranges)) 
                {
                    context.Reply(reply); 
                } 
            }
            finally 
            {
                context.RequestMessage.Close();
                context.Close();
            } 
        }
 
        void ProcessShutdown11(RequestContext context, WsrmMessageInfo info) 
        {
            bool cleanup = true; 

            try
            {
                bool isTerminate = (info.TerminateSequenceInfo != null); 
                WsrmRequestInfo requestInfo = isTerminate
                    ? (WsrmRequestInfo)info.TerminateSequenceInfo 
                    : (WsrmRequestInfo)info.CloseSequenceInfo; 
                Int64 last = isTerminate ? info.TerminateSequenceInfo.LastMsgNumber : info.CloseSequenceInfo.LastMsgNumber;
 
                if (!WsrmUtilities.ValidateWsrmRequest(this.session, requestInfo, this.binder, context))
                {
                    cleanup = false;
                    return; 
                }
 
                bool scheduleShutdown = false; 
                Exception remoteFaultException = null;
                ReplyHelper closeHelper = null; 
                bool haveAllReplyAcks = true;
                bool isLastLargeEnough = true;
                bool isLastConsistent = true;
 
                lock (this.ThisLock)
                { 
                    if (!this.connection.IsLastKnown) 
                    {
                        // All requests and replies must be acknowledged. 
                        if (this.requestsByRequestSequenceNumber.Count == 0)
                        {
                            if (isTerminate)
                            { 
                                if (this.connection.SetTerminateSequenceLast(last, out isLastLargeEnough))
                                { 
                                    scheduleShutdown = true; 
                                }
                                else if (isLastLargeEnough) 
                                {
                                    remoteFaultException = new ProtocolException(SR.GetString(SR.EarlyTerminateSequence));
                                }
                            } 
                            else
                            { 
                                scheduleShutdown = this.connection.SetCloseSequenceLast(last); 
                                isLastLargeEnough = scheduleShutdown;
                            } 

                            if (scheduleShutdown)
                            {
                                // (1) !isTerminate && !IsLastKnown, CloseSequence received before TerminateSequence. 
                                // - Need to ensure helper to delay the reply until Close.
                                // (2) isTerminate && !IsLastKnown, TerminateSequence received before CloseSequence. 
                                // - Close not required, ensure it is created so we can bypass it. 
                                if (!this.CreateCloseSequenceReplyHelper())
                                { 
                                    return;
                                }

                                // Capture the helper in order to unblock it. 
                                if (isTerminate)
                                { 
                                    closeHelper = this.closeSequenceReplyHelper; 
                                }
 
                                this.session.SetFinalAck(this.connection.Ranges);
                                this.deliveryStrategy.Dispose();
                            }
                        } 
                        else
                        { 
                            haveAllReplyAcks = false; 
                        }
                    } 
                    else
                    {
                        isLastConsistent = (last == this.connection.Last);
                    } 
                }
 
                WsrmFault fault = null; 

                if (!isLastLargeEnough) 
                {
                    string faultString = SR.GetString(SR.SequenceTerminatedSmallLastMsgNumber);
                    string exceptionString = SR.GetString(SR.SmallLastMsgNumberExceptionString);
                    fault = SequenceTerminatedFault.CreateProtocolFault(this.session.InputID, faultString, 
                        exceptionString);
                } 
                else if (!haveAllReplyAcks) 
                {
                    string faultString = SR.GetString(SR.SequenceTerminatedNotAllRepliesAcknowledged); 
                    string exceptionString = SR.GetString(SR.NotAllRepliesAcknowledgedExceptionString);
                    fault = SequenceTerminatedFault.CreateProtocolFault(this.session.OutputID, faultString,
                        exceptionString);
                } 
                else if (!isLastConsistent)
                { 
                    string faultString = SR.GetString(SR.SequenceTerminatedInconsistentLastMsgNumber); 
                    string exceptionString = SR.GetString(SR.InconsistentLastMsgNumberExceptionString);
                    fault = SequenceTerminatedFault.CreateProtocolFault(this.session.InputID, 
                        faultString, exceptionString);
                }
                else if (remoteFaultException != null)
                { 
                    Message message = WsrmUtilities.CreateTerminateMessage(this.MessageVersion,
                        this.listener.ReliableMessagingVersion, this.session.OutputID); 
                    this.AddAcknowledgementHeader(message); 

                    using (message) 
                    {
                        context.Reply(message);
                    }
 
                    this.session.OnRemoteFault(remoteFaultException);
                    return; 
                } 

                if (fault != null) 
                {
                    this.session.OnLocalFault(fault.CreateException(), fault, context);
                    cleanup = false;
                    return; 
                }
 
                if (isTerminate) 
                {
                    if (closeHelper != null) 
                    {
                        closeHelper.UnblockWaiter();
                    }
 
                    lock (this.ThisLock)
                    { 
                        if (!this.CreateTerminateSequenceReplyHelper()) 
                        {
                            return; 
                        }
                    }
                }
 
                ReplyHelper replyHelper = isTerminate ? this.terminateSequenceReplyHelper : this.closeSequenceReplyHelper;
 
                if (!replyHelper.TransferRequestContext(context, info)) 
                {
                    replyHelper.Reply(context, info, this.DefaultSendTimeout, MaskingMode.All); 

                    if (isTerminate)
                    {
                        this.OnTerminateSequenceCompleted(); 
                    }
                } 
                else 
                {
                    cleanup = false; 
                }

                if (scheduleShutdown)
                { 
                    IOThreadScheduler.ScheduleCallback(this.ShutdownCallback, null);
                } 
            } 
            finally
            { 
                if (cleanup)
                {
                    context.RequestMessage.Close();
                    context.Close(); 
                }
            } 
        } 

        public void ProcessDemuxedRequest(RequestContext context, WsrmMessageInfo info) 
        {
            try
            {
                this.ProcessRequest(context, info); 
            }
#pragma warning suppress 56500 // covered by FxCOP 
            catch (Exception e) 
            {
                if (DiagnosticUtility.IsFatal(e)) 
                    throw;

                this.session.OnUnknownException(e);
            } 
        }
 
        void ProcessRequest(RequestContext context, WsrmMessageInfo info) 
        {
            bool closeMessage = true; 
            bool closeContext = true;

            try
            { 
                if (!this.session.ProcessInfo(info, context))
                { 
                    closeMessage = false; 
                    closeContext = false;
                    return; 
                }

                if (!this.session.VerifyDuplexProtocolElements(info, context))
                { 
                    closeMessage = false;
                    closeContext = false; 
                    return; 
                }
 
                this.session.OnRemoteActivity(false);

                if (info.CreateSequenceInfo != null)
                { 
                    EndpointAddress acksTo;
 
                    if (WsrmUtilities.ValidateCreateSequence(info, this.listener, this.binder.Channel, out acksTo)) 
                    {
                        Message response = WsrmUtilities.CreateCreateSequenceResponse(this.listener.MessageVersion, 
                            this.listener.ReliableMessagingVersion, true, info.CreateSequenceInfo,
                            this.listener.Ordered, this.session.InputID, acksTo);

                        using (context) 
                        {
                            using (response) 
                            { 
                                if (this.Binder.AddressResponse(info.Message, response))
                                    context.Reply(response, this.DefaultSendTimeout); 
                            }
                        }
                    }
                    else 
                    {
                        this.session.OnLocalFault(info.FaultException, info.FaultReply, context); 
                    } 

                    closeContext = false; 
                    return;
                }

                closeContext = false; 
                if (info.AcknowledgementInfo != null)
                { 
                    ProcessAcknowledgment(info.AcknowledgementInfo); 
                    closeContext = (info.Action == WsrmIndex.GetSequenceAcknowledgementActionString(this.listener.ReliableMessagingVersion));
                } 

                if (!closeContext)
                {
                    closeMessage = false; 
                    if (info.SequencedMessageInfo != null)
                    { 
                        ProcessSequencedMessage(context, info.Action, info.SequencedMessageInfo); 
                    }
                    else if (info.TerminateSequenceInfo != null) 
                    {
                        if (this.listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005)
                        {
                            ProcessTerminateSequenceFeb2005(context, info); 
                        }
                        else if (info.TerminateSequenceInfo.Identifier == this.session.InputID) 
                        { 
                            ProcessShutdown11(context, info);
                        } 
                        else    // Identifier == OutputID
                        {
                            WsrmFault fault = SequenceTerminatedFault.CreateProtocolFault(this.session.InputID,
                                SR.GetString(SR.SequenceTerminatedUnsupportedTerminateSequence), 
                                SR.GetString(SR.UnsupportedTerminateSequenceExceptionString));
 
                            this.session.OnLocalFault(fault.CreateException(), fault, context); 
                            closeMessage = false;
                            closeContext = false; 
                            return;
                        }
                    }
                    else if (info.CloseSequenceInfo != null) 
                    {
                        ProcessShutdown11(context, info); 
                    } 
                    else if (info.AckRequestedInfo != null)
                    { 
                        ProcessAckRequested(context);
                    }
                }
 
                if (this.listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005)
                { 
                    if (this.IsMessagingCompleted) 
                    {
                        this.messagingCompleteWaitObject.Set(); 
                    }
                }
            }
            finally 
            {
                if (closeMessage) 
                    info.Message.Close(); 

                if (closeContext) 
                    context.Close();
            }
        }
 
        // A given reliable request can be in one of three states:
        // 1. Known and Processing: A ReliableRequestContext exists in requestTable but the outcome for 
        //      for the request is unknown. Any transport request referencing this reliable request 
        //      (by means of the sequence number) must be held until the outcome becomes known.
        // 2. Known and Processed: A ReliableRequestContext exists in the requestTable and the outcome for 
        //      for the request is known. The ReliableRequestContext holds that outcome. Any transport requests
        //      referening this reliable request must send the response dictated by the outcome.
        // 3. Unknown: No ReliableRequestContext exists in the requestTable for the referenced reliable request.
        //      In this case a new ReliableRequestContext is added to the requestTable to await some outcome. 
        //
        // There are 4 possible outcomes for a reliable request: 
        //  a. It is captured and the user replies. Transport replies are then copies of the user's reply. 
        //  b. It is captured and the user closes the context. Transport replies are then acknowledgments
        //      that include the sequence number of the reliable request. 
        //  c. It is captured and and the user aborts the context. Transport contexts are then aborted.
        //  d. It is not captured. In this case an acknowledgment that includes all sequence numbers
        //      previously captured is sent. Note two sub-cases here:
        //          1. It is not captured because it is dropped (e.g. it doesn't fit in the buffer). In this 
        //              case the reliable request's sequence number is not in the acknowledgment.
        //          2. It is not captured because it is a duplicate. In this case the reliable request's 
        //              sequence number is included in the acknowledgment. 
        //
        // By following these rules it is possible to support one-way and two-operations without having 
        // knowledge of them (the user drives using the request context we give them) and at the same time
        // it is possible to forget about past replies once acknowledgments for them are received.
        void ProcessSequencedMessage(RequestContext context, string action, WsrmSequencedMessageInfo info)
        { 
            ReliableRequestContext reliableContext = null;
            WsrmFault fault = null; 
            bool needDispatch = false; 
            bool scheduleShutdown = false;
            bool wsrmFeb2005 = this.listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005; 
            bool wsrm11 = this.listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessaging11;
            Int64 requestSequenceNumber = info.SequenceNumber;
            bool isLast = wsrmFeb2005 && info.LastMessage;
            bool isLastOnly = wsrmFeb2005 && (action == WsrmFeb2005Strings.LastMessageAction); 
            bool isDupe;
            Message message = null; 
 
            lock (this.ThisLock)
            { 
                if (this.Aborted || this.State == CommunicationState.Faulted || this.State == CommunicationState.Closed)
                {
                    context.RequestMessage.Close();
                    context.Abort(); 
                    return;
                } 
 
                isDupe = this.connection.Ranges.Contains(requestSequenceNumber);
 
                if (!this.connection.IsValid(requestSequenceNumber, isLast))
                {
                    if (wsrmFeb2005)
                    { 
                        fault = new LastMessageNumberExceededFault(this.session.InputID);
                    } 
                    else 
                    {
                        message = this.CreateSequenceClosedFault(); 

                        if (PerformanceCounters.PerformanceCountersEnabled)
                            PerformanceCounters.MessageDropped(this.perfCounterId);
                    } 
                }
                else if (isDupe) 
                { 
                    if (PerformanceCounters.PerformanceCountersEnabled)
                        PerformanceCounters.MessageDropped(this.perfCounterId); 

                    if (!this.requestsByRequestSequenceNumber.TryGetValue(info.SequenceNumber, out reliableContext))
                    {
                        if ((this.lastReply != null) && (this.lastReply.RequestSequenceNumber == info.SequenceNumber)) 
                            reliableContext = this.lastReply;
                        else 
                            reliableContext = new ReliableRequestContext(context, info.SequenceNumber, this, true); 
                    }
 
                    reliableContext.SetAckRanges(this.connection.Ranges);
                }
                else if ((this.State == CommunicationState.Closing) && !isLastOnly)
                { 
                    if (wsrmFeb2005)
                    { 
                        fault = SequenceTerminatedFault.CreateProtocolFault(this.session.InputID, 
                            SR.GetString(SR.SequenceTerminatedSessionClosedBeforeDone),
                            SR.GetString(SR.SessionClosedBeforeDone)); 
                    }
                    else
                    {
                        message = this.CreateSequenceClosedFault(); 
                        if (PerformanceCounters.PerformanceCountersEnabled)
                            PerformanceCounters.MessageDropped(this.perfCounterId); 
                    } 
                }
                // In the unordered case we accept no more than MaxSequenceRanges ranges to limit the 
                // serialized ack size and the amount of memory taken by the ack ranges. In the
                // ordered case, the delivery strategy MaxTransferWindowSize quota mitigates this
                // threat.
                else if (this.deliveryStrategy.CanEnqueue(requestSequenceNumber) 
                    && (this.requestsByReplySequenceNumber.Count < this.listener.MaxTransferWindowSize)
                    && (this.listener.Ordered || this.connection.CanMerge(requestSequenceNumber))) 
                { 
                    this.connection.Merge(requestSequenceNumber, isLast);
                    reliableContext = new ReliableRequestContext(context, info.SequenceNumber, this, false); 
                    reliableContext.SetAckRanges(this.connection.Ranges);

                    if (!isLastOnly)
                    { 
                        needDispatch = this.deliveryStrategy.Enqueue(reliableContext, requestSequenceNumber);
                        this.requestsByRequestSequenceNumber.Add(info.SequenceNumber, reliableContext); 
                    } 
                    else
                    { 
                        this.lastReply = reliableContext;
                    }

                    scheduleShutdown = this.connection.AllAdded; 
                }
                else 
                { 
                    if (PerformanceCounters.PerformanceCountersEnabled)
                        PerformanceCounters.MessageDropped(this.perfCounterId); 
                }
            }

            if (fault != null) 
            {
                this.session.OnLocalFault(fault.CreateException(), fault, context); 
                return; 
            }
 
            if (reliableContext == null)
            {
                if (message != null)
                { 
                    using (message)
                    { 
                        context.Reply(message); 
                    }
                } 

                context.RequestMessage.Close();
                context.Close();
                return; 
            }
 
            if (isDupe && reliableContext.CheckForReplyOrAddInnerContext(context)) 
            {
                reliableContext.SendReply(context, MaskingMode.All); 
                return;
            }

            if (!isDupe && isLastOnly) 
            {
                reliableContext.Close(); 
            } 

            if (needDispatch) 
            {
                this.Dispatch();
            }
 
            if (scheduleShutdown)
            { 
                IOThreadScheduler.ScheduleCallback(this.ShutdownCallback, null); 
            }
        } 

        void ProcessTerminateSequenceFeb2005(RequestContext context, WsrmMessageInfo info)
        {
            bool cleanup = true; 

            try 
            { 
                Message message = null;
                bool isTerminateEarly; 
                bool haveAllReplyAcks;

                lock (this.ThisLock)
                { 
                    isTerminateEarly = !this.connection.Terminate();
                    haveAllReplyAcks = this.requestsByRequestSequenceNumber.Count == 0; 
                } 

                WsrmFault fault = null; 

                if (isTerminateEarly)
                {
                    fault = SequenceTerminatedFault.CreateProtocolFault(this.session.InputID, 
                        SR.GetString(SR.SequenceTerminatedEarlyTerminateSequence),
                        SR.GetString(SR.EarlyTerminateSequence)); 
                } 
                else if (!haveAllReplyAcks)
                { 
                    fault = SequenceTerminatedFault.CreateProtocolFault(this.session.InputID,
                        SR.GetString(SR.SequenceTerminatedBeforeReplySequenceAcked),
                        SR.GetString(SR.EarlyRequestTerminateSequence));
                } 

                if (fault != null) 
                { 
                    this.session.OnLocalFault(fault.CreateException(), fault, context);
                    cleanup = false; 
                    return;
                }

                message = WsrmUtilities.CreateTerminateMessage(this.MessageVersion, 
                    this.listener.ReliableMessagingVersion, this.session.OutputID);
                this.AddAcknowledgementHeader(message); 
 
                using (message)
                { 
                    context.Reply(message);
                }
            }
            finally 
            {
                if (cleanup) 
                { 
                    context.RequestMessage.Close();
                    context.Close(); 
                }
            }
        }
 
        void StartReceiving(bool canBlock)
        { 
            while (true) 
            {
                IAsyncResult result = this.binder.BeginTryReceive(TimeSpan.MaxValue, onReceiveCompleted, this); 

                if (!result.CompletedSynchronously)
                {
                    return; 
                }
                if (!canBlock) 
                { 
                    IOThreadScheduler.ScheduleCallback(asyncReceiveComplete, result);
                    return; 
                }
                if (!this.HandleReceiveComplete(result))
                    break;
            } 
        }
 
        void ShutdownCallback(object state) 
        {
            this.Shutdown(); 
        }

        void TerminateSequence(TimeSpan timeout)
        { 
            lock (this.ThisLock)
            { 
                this.ThrowIfClosed(); 
                this.CreateTerminateSequenceReplyHelper();
            } 

            this.terminateSequenceReplyHelper.WaitAndReply(timeout);
            this.OnTerminateSequenceCompleted();
        } 

        IAsyncResult BeginTerminateSequence(TimeSpan timeout, AsyncCallback callback, object state) 
        { 
            lock (this.ThisLock)
            { 
                this.ThrowIfClosed();
                this.CreateTerminateSequenceReplyHelper();
            }
 
            return this.terminateSequenceReplyHelper.BeginWaitAndReply(timeout, callback, state);
        } 
 
        void EndTerminateSequence(IAsyncResult result)
        { 
            this.terminateSequenceReplyHelper.EndWaitAndReply(result);
            this.OnTerminateSequenceCompleted();
        }
 
        void ThrowIfCloseInvalid()
        { 
            bool shouldFault = false; 

            if (this.listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005) 
            {
                if (this.PendingRequestContexts != 0 || this.connection.Ranges.Count > 1)
                {
                    shouldFault = true; 
                }
            } 
            else if (this.listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessaging11) 
            {
                if (this.PendingRequestContexts != 0) 
                {
                    shouldFault = true;
                }
            } 

            if (shouldFault) 
            { 
                WsrmFault fault = SequenceTerminatedFault.CreateProtocolFault(this.session.InputID,
                    SR.GetString(SR.SequenceTerminatedSessionClosedBeforeDone), SR.GetString(SR.SessionClosedBeforeDone)); 
                this.session.OnLocalFault(null, fault, null);
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(fault.CreateException());
            }
        } 

        void UnblockClose() 
        { 
            this.AbortContexts();
 
            if (this.listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005)
            {
                this.messagingCompleteWaitObject.Fault(this);
            } 
            else
            { 
                if (this.closeSequenceReplyHelper != null) 
                {
                    this.closeSequenceReplyHelper.Fault(); 
                }
                if (this.terminateSequenceReplyHelper != null)
                {
                    this.terminateSequenceReplyHelper.Fault(); 
                }
            } 
 
            this.connection.Fault(this);
        } 

        class CloseOutputCompletedAsyncResult : CompletedAsyncResult
        {
            public CloseOutputCompletedAsyncResult(AsyncCallback callback, object state) 
                : base(callback, state)
            { 
            } 
        }
 
        class ReliableRequestContext : RequestContextBase
        {
            MessageBuffer bufferedReply;
            ReliableReplySessionChannel channel; 
            List innerContexts = new List();
            bool isLastReply; 
            bool outcomeKnown; 
            SequenceRangeCollection ranges;
            Int64 requestSequenceNumber; 
            Int64 replySequenceNumber;

            public ReliableRequestContext(RequestContext context, Int64 requestSequenceNumber, ReliableReplySessionChannel channel, bool outcome)
                : base(context.RequestMessage, channel.DefaultCloseTimeout, channel.DefaultSendTimeout) 
            {
                this.channel = channel; 
                this.requestSequenceNumber = requestSequenceNumber; 
                this.outcomeKnown = outcome;
                if (!outcome) 
                    this.innerContexts.Add(context);
            }

            public bool CheckForReplyOrAddInnerContext(RequestContext innerContext) 
            {
                lock (this.ThisLock) 
                { 
                    if (this.outcomeKnown)
                        return true; 
                    this.innerContexts.Add(innerContext);
                    return false;
                }
            } 

            public bool HasReply 
            { 
                get
                { 
                    return (this.bufferedReply != null);
                }
            }
 
            public Int64 RequestSequenceNumber
            { 
                get 
                {
                    return this.requestSequenceNumber; 
                }
            }

            void AbortInnerContexts() 
            {
                for (int i = 0; i < this.innerContexts.Count; i++) 
                { 
                    this.innerContexts[i].Abort();
                    this.innerContexts[i].RequestMessage.Close(); 
                }
                this.innerContexts.Clear();
            }
 
            internal IAsyncResult BeginReplyInternal(Message reply, TimeSpan timeout, AsyncCallback callback, object state)
            { 
                bool needAbort = true; 
                bool needReply = true;
 
                try
                {
                    lock (this.ThisLock)
                    { 
                        if (this.ranges == null)
                        { 
                            DiagnosticUtility.DebugAssert("this.ranges != null"); 
                            throw DiagnosticUtility.ExceptionUtility.ThrowHelperInternal(false);
                        } 

                        if (this.Aborted)
                        {
                            needAbort = false; 
                            throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new CommunicationObjectAbortedException(SR.GetString(SR.RequestContextAborted)));
                        } 
 
                        if (this.outcomeKnown)
                        { 
                            needAbort = false;
                            needReply = false;
                        }
                        else 
                        {
                            if ((reply != null) && (this.bufferedReply == null)) 
                                this.bufferedReply = reply.CreateBufferedCopy(int.MaxValue); 

                            if (!this.channel.PrepareReply(this)) 
                            {
                                needAbort = false;
                                needReply = false;
                            } 
                            else
                            { 
                                this.outcomeKnown = true; 
                            }
                        } 
                    }

                    if (!needReply)
                        return new ReplyCompletedAsyncResult(callback, state); 

                    IAsyncResult result = new ReplyAsyncResult(this, timeout, callback, state); 
                    needAbort = false; 
                    return result;
                } 
                finally
                {
                    if (needAbort)
                    { 
                        this.AbortInnerContexts();
                        this.Abort(); 
                    } 
                }
            } 

            internal void EndReplyInternal(IAsyncResult result)
            {
                if (result is ReplyCompletedAsyncResult) 
                {
                    ReplyCompletedAsyncResult.End(result); 
                    return; 
                }
 
                bool throwing = true;

                try
                { 
                    ReplyAsyncResult.End(result);
                    this.innerContexts.Clear(); 
                    throwing = false; 
                }
                finally 
                {
                    if (throwing)
                    {
                        this.AbortInnerContexts(); 
                        this.Abort();
                    } 
                } 
            }
 
            protected override void OnAbort()
            {
                bool outcome;
                lock (this.ThisLock) 
                {
                    outcome = this.outcomeKnown; 
                    this.outcomeKnown = true; 
                }
 
                if (!outcome)
                {
                    this.AbortInnerContexts();
                } 

                if (this.channel.ContainsRequest(this.requestSequenceNumber)) 
                { 
                    Exception e = new ProtocolException(SR.GetString(SR.ReliableRequestContextAborted));
                    this.channel.session.OnLocalFault(e, (Message)null, null); 
                }
            }

            protected override IAsyncResult OnBeginReply(Message reply, TimeSpan timeout, AsyncCallback callback, object state) 
            {
                return this.BeginReplyInternal(reply, timeout, callback, state); 
            } 

            protected override void OnClose(TimeSpan timeout) 
            {
                // ReliableRequestContext.Close() relies on base.Close() to call reply if reply is not initiated.
                if (!this.ReplyInitiated)
                    this.OnReply(null, timeout); 
            }
 
            protected override void OnEndReply(IAsyncResult result) 
            {
                this.EndReplyInternal(result); 
            }

            protected override void OnReply(Message reply, TimeSpan timeout)
            { 
                this.ReplyInternal(reply, timeout);
            } 
 
            internal void ReplyInternal(Message reply, TimeSpan timeout)
            { 
                bool needAbort = true;

                try
                { 
                    lock (this.ThisLock)
                    { 
                        if (this.ranges == null) 
                        {
                            DiagnosticUtility.DebugAssert("this.ranges != null"); 
                            throw DiagnosticUtility.ExceptionUtility.ThrowHelperInternal(false);
                        }

                        if (this.Aborted) 
                        {
                            needAbort = false; 
                            throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new CommunicationObjectAbortedException(SR.GetString(SR.RequestContextAborted))); 
                        }
 
                        if (this.outcomeKnown)
                        {
                            needAbort = false;
                            return; 
                        }
 
                        if ((reply != null) && (this.bufferedReply == null)) 
                            this.bufferedReply = reply.CreateBufferedCopy(int.MaxValue);
 
                        if (!this.channel.PrepareReply(this))
                        {
                            needAbort = false;
                            return; 
                        }
 
                        this.outcomeKnown = true; 
                    }
 
                    TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
                    for (int i = 0; i < this.innerContexts.Count; i++)
                        SendReply(this.innerContexts[i], MaskingMode.Handled, ref timeoutHelper);
                    this.innerContexts.Clear(); 
                    needAbort = false;
                } 
                finally 
                {
                    if (needAbort) 
                    {
                        this.AbortInnerContexts();
                        this.Abort();
                    } 
                }
            } 
 
            public void SetAckRanges(SequenceRangeCollection ranges)
            { 
                if (this.ranges == null)
                    this.ranges = ranges;
            }
 
            public void SetLastReply(Int64 sequenceNumber)
            { 
                this.replySequenceNumber = sequenceNumber; 
                this.isLastReply = true;
                if (this.bufferedReply == null) 
                    this.bufferedReply = Message.CreateMessage(this.channel.MessageVersion, WsrmFeb2005Strings.LastMessageAction).CreateBufferedCopy(int.MaxValue);
            }

            public void SendReply(RequestContext context, MaskingMode maskingMode) 
            {
                TimeoutHelper timeoutHelper = new TimeoutHelper(this.DefaultSendTimeout); 
                SendReply(context, maskingMode, ref timeoutHelper); 
            }
 
            void SendReply(RequestContext context, MaskingMode maskingMode, ref TimeoutHelper timeoutHelper)
            {
                Message reply;
 
                if (!this.outcomeKnown)
                { 
                    DiagnosticUtility.DebugAssert("this.outcomeKnown"); 
                    throw DiagnosticUtility.ExceptionUtility.ThrowHelperInternal(false);
                } 

                if (this.bufferedReply != null)
                {
                    reply = this.bufferedReply.CreateMessage(); 
                    this.channel.PrepareReplyMessage(this.replySequenceNumber, this.isLastReply, this.ranges, reply);
                } 
                else 
                {
                    reply = this.channel.CreateAcknowledgement(this.ranges); 
                }
                this.channel.binder.SetMaskingMode(context, maskingMode);

                using (reply) 
                {
                    context.Reply(reply, timeoutHelper.RemainingTime()); 
                } 
                context.Close(timeoutHelper.RemainingTime());
            } 

            public void SetReplySequenceNumber(Int64 sequenceNumber)
            {
                this.replySequenceNumber = sequenceNumber; 
            }
 
            class ReplyCompletedAsyncResult : CompletedAsyncResult 
            {
                public ReplyCompletedAsyncResult(AsyncCallback callback, object state) 
                    : base(callback, state)
                {
                }
            } 

            class ReplyAsyncResult : AsyncResult 
            { 
                ReliableRequestContext context;
                int currentContext; 
                Message reply;
                TimeoutHelper timeoutHelper;
                static AsyncCallback replyCompleteStatic = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(ReplyCompleteStatic));
 
                public ReplyAsyncResult(ReliableRequestContext thisContext, TimeSpan timeout, AsyncCallback callback, object state)
                    : base(callback, state) 
                { 
                    this.timeoutHelper = new TimeoutHelper(timeout);
                    this.context = thisContext; 
                    if (this.SendReplies())
                    {
                        this.Complete(true);
                    } 
                }
 
                public static void End(IAsyncResult result) 
                {
                    AsyncResult.End(result); 
                }

                void HandleReplyComplete(IAsyncResult result)
                { 
                    RequestContext thisInnerContext = this.context.innerContexts[this.currentContext];
 
                    try 
                    {
                        thisInnerContext.EndReply(result); 
                        thisInnerContext.Close(this.timeoutHelper.RemainingTime());
                        this.currentContext++;
                    }
                    finally 
                    {
                        this.reply.Close(); 
                        this.reply = null; 
                    }
                } 

                static void ReplyCompleteStatic(IAsyncResult result)
                {
                    if (result.CompletedSynchronously) 
                        return;
 
                    Exception ex = null; 
                    ReplyAsyncResult thisPtr = null;
                    bool complete = false; 

                    try
                    {
                        thisPtr = (ReplyAsyncResult)result.AsyncState; 
                        thisPtr.HandleReplyComplete(result);
                        complete = thisPtr.SendReplies(); 
                    } 
                    catch (Exception e)
                    { 
                        if (DiagnosticUtility.IsFatal(e))
                            throw;
                        ex = e;
                        complete = true; 
                    }
 
                    if (complete) 
                        thisPtr.Complete(false, ex);
                } 

                bool SendReplies()
                {
                    while (this.currentContext < this.context.innerContexts.Count) 
                    {
                        if (this.context.bufferedReply != null) 
                        { 
                            this.reply = this.context.bufferedReply.CreateMessage();
                            this.context.channel.PrepareReplyMessage( 
                                this.context.replySequenceNumber, this.context.isLastReply,
                                this.context.ranges, this.reply);
                        }
                        else 
                        {
                            this.reply = this.context.channel.CreateAcknowledgement(this.context.ranges); 
                        } 

                        RequestContext thisInnerContext = this.context.innerContexts[this.currentContext]; 
                        this.context.channel.binder.SetMaskingMode(thisInnerContext, MaskingMode.Handled);

                        IAsyncResult result = thisInnerContext.BeginReply(this.reply, this.timeoutHelper.RemainingTime(), replyCompleteStatic, this);
 
                        if (!result.CompletedSynchronously)
                            return false; 
 
                        this.HandleReplyComplete(result);
                    } 
                    return true;
                }
            }
        } 

        class ReplyHelper 
        { 
            Message asyncMessage;
            bool canTransfer = true; 
            ReliableReplySessionChannel channel;
            WsrmMessageInfo info;
            ReplyProvider replyProvider;
            RequestContext requestContext; 
            bool throwTimeoutOnWait;
            InterruptibleWaitObject waitHandle; 
 
            internal ReplyHelper(ReliableReplySessionChannel channel, ReplyProvider replyProvider,
                bool throwTimeoutOnWait) 
            {
                this.channel = channel;
                this.replyProvider = replyProvider;
                this.throwTimeoutOnWait = throwTimeoutOnWait; 
            }
 
            object ThisLock 
            {
                get { return this.channel.ThisLock; } 
            }

            internal void Abort()
            { 
                this.Cleanup(true);
            } 
 
            void Cleanup(bool abort)
            { 
                lock (this.ThisLock)
                {
                    this.canTransfer = false;
                } 

                if (this.waitHandle != null) 
                { 
                    if (abort)
                    { 
                        this.waitHandle.Abort(this.channel);
                    }
                    else
                    { 
                        this.waitHandle.Fault(this.channel);
                    } 
                } 
            }
 
            internal void Fault()
            {
                this.Cleanup(false);
            } 

            internal void Reply(RequestContext context, WsrmMessageInfo info, TimeSpan timeout, MaskingMode maskingMode) 
            { 
                using (Message message = this.replyProvider.Provide(this.channel, info))
                { 
                    this.channel.binder.SetMaskingMode(context, maskingMode);
                    context.Reply(message, timeout);
                }
            } 

            IAsyncResult BeginReply(TimeSpan timeout, AsyncCallback callback, object state) 
            { 
                lock (this.ThisLock)
                { 
                    this.canTransfer = false;
                }

                if (this.requestContext == null) 
                {
                    return new ReplyCompletedAsyncResult(callback, state); 
                } 

                this.asyncMessage = this.replyProvider.Provide(this.channel, info); 
                bool throwing = true;

                try
                { 
                    this.channel.binder.SetMaskingMode(this.requestContext, MaskingMode.Handled);
                    IAsyncResult result = this.requestContext.BeginReply(this.asyncMessage, timeout, 
                        callback, state); 
                    throwing = false;
                    return result; 
                }
                finally
                {
                    if (throwing) 
                    {
                        this.asyncMessage.Close(); 
                        this.asyncMessage = null; 
                    }
                } 
            }

            void EndReply(IAsyncResult result)
            { 
                ReplyCompletedAsyncResult completedResult = result as ReplyCompletedAsyncResult;
                if (completedResult != null) 
                { 
                    completedResult.End();
                    return; 
                }

                try
                { 
                    this.requestContext.EndReply(result);
                } 
                finally 
                {
                    if (this.asyncMessage != null) 
                    {
                        this.asyncMessage.Close();
                    }
                } 
            }
 
            internal bool TransferRequestContext(RequestContext requestContext, WsrmMessageInfo info) 
            {
                RequestContext oldContext = null; 
                WsrmMessageInfo oldInfo = null;
                InterruptibleWaitObject waitHandle = null;

                lock (this.ThisLock) 
                {
                    if (!this.canTransfer) 
                    { 
                        return false;
                    } 
                    else
                    {
                        oldContext = this.requestContext;
                        oldInfo = this.info; 
                        this.requestContext = requestContext;
                        this.info = info; 
 
                        waitHandle = this.waitHandle;
                    } 
                }

                if (waitHandle != null)
                { 
                    waitHandle.Set();
                } 
 
                if (oldContext != null)
                { 
                    oldInfo.Message.Close();
                    oldContext.Close();
                }
 
                return true;
            } 
 
            internal void UnblockWaiter()
            { 
                this.TransferRequestContext(null, null);
            }

            internal void WaitAndReply(TimeSpan timeout) 
            {
                TimeoutHelper timeoutHelper = new TimeoutHelper(timeout); 
 
                lock (this.ThisLock)
                { 
                    if (this.requestContext != null)
                    {
                        this.canTransfer = false;
                    } 
                    else
                    { 
                        this.waitHandle = new InterruptibleWaitObject(false, this.throwTimeoutOnWait); 
                    }
                } 

                if (this.waitHandle != null)
                {
                    this.waitHandle.Wait(timeoutHelper.RemainingTime()); 

                    lock (this.ThisLock) 
                    { 
                        this.canTransfer = false;
 
                        if (this.requestContext == null)
                        {
                            return;
                        } 
                    }
                } 
 
                this.Reply(this.requestContext, this.info, timeoutHelper.RemainingTime(),
                    MaskingMode.Handled); 
            }

            internal IAsyncResult BeginWaitAndReply(TimeSpan timeout, AsyncCallback callback, object state)
            { 
                lock (this.ThisLock)
                { 
                    if (this.requestContext == null) 
                    {
                        this.waitHandle = new InterruptibleWaitObject(false, this.throwTimeoutOnWait); 
                    }
                }

                OperationWithTimeoutBeginCallback[] beginOperations = new OperationWithTimeoutBeginCallback[] { 
                    this.waitHandle != null
                    ? new OperationWithTimeoutBeginCallback (this.waitHandle.BeginWait) 
                    : default(OperationWithTimeoutBeginCallback), 
                    new OperationWithTimeoutBeginCallback (this.BeginReply),
                }; 

                OperationEndCallback[] endOperations = new OperationEndCallback[] {
                    this.waitHandle != null
                    ? new OperationEndCallback (this.waitHandle.EndWait) 
                    : default(OperationEndCallback),
                    new OperationEndCallback(this.EndReply), 
                }; 

                return OperationWithTimeoutComposer.BeginComposeAsyncOperations(timeout, beginOperations, 
                    endOperations, callback, state);
            }

            internal void EndWaitAndReply(IAsyncResult result) 
            {
                OperationWithTimeoutComposer.EndComposeAsyncOperations(result); 
            } 

            class ReplyCompletedAsyncResult : CompletedAsyncResult 
            {
                internal ReplyCompletedAsyncResult(AsyncCallback callback, object state)
                    : base(callback, state)
                { 
                }
 
                public void End() 
                {
                    AsyncResult.End(this); 
                }
            }
        }
 
        abstract class ReplyProvider
        { 
            internal abstract Message Provide(ReliableReplySessionChannel channel, WsrmMessageInfo info); 
        }
 
        class CloseSequenceReplyProvider : ReplyProvider
        {
            static CloseSequenceReplyProvider instance = new CloseSequenceReplyProvider();
 
            CloseSequenceReplyProvider()
            { 
            } 

            static internal ReplyProvider Instance 
            {
                get
                {
                    if (instance == null) 
                    {
                        instance = new CloseSequenceReplyProvider(); 
                    } 

                    return instance; 
                }
            }

            internal override Message Provide(ReliableReplySessionChannel channel, WsrmMessageInfo requestInfo) 
            {
                Message message = WsrmUtilities.CreateCloseSequenceResponse(channel.MessageVersion, 
                   requestInfo.CloseSequenceInfo.MessageId, channel.session.InputID); 
                channel.AddAcknowledgementHeader(message);
                return message; 
            }
        }

        class TerminateSequenceReplyProvider : ReplyProvider 
        {
            static TerminateSequenceReplyProvider instance = new TerminateSequenceReplyProvider(); 
 
            TerminateSequenceReplyProvider()
            { 
            }

            static internal ReplyProvider Instance
            { 
                get
                { 
                    if (instance == null) 
                    {
                        instance = new TerminateSequenceReplyProvider(); 
                    }

                    return instance;
                } 
            }
 
            internal override Message Provide(ReliableReplySessionChannel channel, WsrmMessageInfo requestInfo) 
            {
                Message message = WsrmUtilities.CreateTerminateResponseMessage(channel.MessageVersion, 
                   requestInfo.TerminateSequenceInfo.MessageId, channel.session.InputID);
                channel.AddAcknowledgementHeader(message);
                return message;
            } 
        }
    } 
} 

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
// Copyright (c) Microsoft Corporation. All rights reserved.
                        

Link Menu

Network programming in C#, Network Programming in VB.NET, Network Programming in .NET
This book is available now!
Buy at Amazon US or
Buy at Amazon UK