MsmqReceiveHelper.cs source code in C# .NET

Source code for the .NET framework in C#

                        

Code:

/ WCF / WCF / 3.5.30729.1 / untmp / Orcas / SP / ndp / cdf / src / WCF / ServiceModel / System / ServiceModel / Channels / MsmqReceiveHelper.cs / 1 / MsmqReceiveHelper.cs

                            //------------------------------------------------------------ 
// Copyright (c) Microsoft Corporation.  All rights reserved.
//-----------------------------------------------------------
namespace System.ServiceModel.Channels
{ 
    using System.ServiceModel.Diagnostics;
    using System.Threading; 
    using System.Transactions; 

    sealed class MsmqReceiveHelper 
    {
        IPoisonHandlingStrategy poisonHandler;
        string queueName;
        MsmqQueue queue; 
        MsmqReceiveParameters receiveParameters;
        Uri uri; 
        string instanceId; 
        IMsmqMessagePool pool;
        MsmqInputChannelBase channel; 
        MsmqChannelListenerBase listener;
        ServiceModelActivity activity;

        internal MsmqReceiveHelper(MsmqReceiveParameters receiveParameters, Uri uri, IMsmqMessagePool messagePool, MsmqInputChannelBase channel, MsmqChannelListenerBase listener) 
        {
            this.queueName = receiveParameters.AddressTranslator.UriToFormatName(uri); 
            this.receiveParameters = receiveParameters; 
            this.uri = uri;
            this.queue = new MsmqQueue(this.receiveParameters.AddressTranslator.UriToFormatName(uri), UnsafeNativeMethods.MQ_RECEIVE_ACCESS); 
            this.instanceId = uri.ToString().ToUpperInvariant();
            this.pool = messagePool;
            this.poisonHandler = Msmq.CreatePoisonHandler(this);
            this.channel = channel; 
            this.listener = listener;
        } 
 
        internal ServiceModelActivity Activity
        { 
            get { return this.activity; }
        }

        IPoisonHandlingStrategy PoisonHandler 
        {
            get { return this.poisonHandler; } 
        } 

        internal MsmqReceiveParameters MsmqReceiveParameters 
        {
            get { return this.receiveParameters; }
        }
 
        internal MsmqInputChannelBase Channel
        { 
            get { return this.channel; } 
        }
 
        internal MsmqChannelListenerBase ChannelListener
        {
            get { return this.listener; }
        } 

        internal Uri ListenUri 
        { 
            get { return this.uri; }
        } 

        internal string InstanceId
        {
            get { return this.instanceId; } 
        }
 
        internal MsmqQueue Queue 
        {
            get { return this.queue; } 
        }

        internal bool Transactional
        { 
            get { return this.receiveParameters.ExactlyOnce; }
        } 
 
        internal void Open()
        { 
            this.activity = MsmqDiagnostics.StartListenAtActivity(this);
            using (MsmqDiagnostics.BoundOpenOperation(this))
            {
                this.queue.EnsureOpen(); 
                this.poisonHandler.Open();
            } 
        } 

        internal void Close() 
        {
            using (ServiceModelActivity.BoundOperation(this.Activity))
            {
                this.poisonHandler.Dispose(); 
                this.queue.Dispose();
            } 
            ServiceModelActivity.Stop(this.activity); 
        }
 
        internal MsmqInputMessage TakeMessage()
        {
            return this.pool.TakeMessage();
        } 

        internal void ReturnMessage(MsmqInputMessage message) 
        { 
            this.pool.ReturnMessage(message);
        } 

        internal static void TryAbortTransactionCurrent()
        {
            if (null != Transaction.Current) 
            {
                try 
                { 
                    Transaction.Current.Rollback();
                } 
                catch (TransactionAbortedException ex)
                {
                    MsmqDiagnostics.ExpectedException(ex);
                } 
                catch (ObjectDisposedException ex)
                { 
                    MsmqDiagnostics.ExpectedException(ex); 
                }
            } 
        }

        internal void DropOrRejectReceivedMessage(MsmqMessageProperty messageProperty, bool reject)
        { 
            if (this.Transactional)
            { 
                TryAbortTransactionCurrent(); 
                IPostRollbackErrorStrategy postRollback = new SimplePostRollbackErrorStrategy(messageProperty.LookupId);
                MsmqQueue.MoveReceiveResult result = MsmqQueue.MoveReceiveResult.Unknown; 
                do
                {
                    using (MsmqEmptyMessage emptyMessage = new MsmqEmptyMessage())
                    { 
                        using (TransactionScope scope = new TransactionScope(TransactionScopeOption.RequiresNew))
                        { 
                            result = this.Queue.TryReceiveByLookupId(messageProperty.LookupId, emptyMessage, MsmqTransactionMode.CurrentOrThrow); 
                            if (MsmqQueue.MoveReceiveResult.Succeeded == result && reject)
                                this.Queue.MarkMessageRejected(messageProperty.LookupId); 
                            scope.Complete();
                        }
                    }
 
                    if (result == MsmqQueue.MoveReceiveResult.Succeeded)
                        MsmqDiagnostics.MessageConsumed(instanceId, messageProperty.MessageId, Msmq.IsRejectMessageSupported); 
 
                    if (result != MsmqQueue.MoveReceiveResult.MessageLockedUnderTransaction)
                        break; 
                }
                while (postRollback.AnotherTryNeeded());
            }
            else 
            {
                MsmqDiagnostics.MessageConsumed(instanceId, messageProperty.MessageId, false); 
            } 
        }
        // 
        internal static void MoveReceivedMessage(MsmqQueue queueFrom, MsmqQueue queueTo, long lookupId)
        {
            TryAbortTransactionCurrent();
 
            IPostRollbackErrorStrategy postRollback = new SimplePostRollbackErrorStrategy(lookupId);
            MsmqQueue.MoveReceiveResult result = MsmqQueue.MoveReceiveResult.Unknown; 
            do 
            {
                result = queueFrom.TryMoveMessage(lookupId, queueTo, MsmqTransactionMode.Single); 

                if (result != MsmqQueue.MoveReceiveResult.MessageLockedUnderTransaction)
                    break;
            } 
            while (postRollback.AnotherTryNeeded());
        } 
 
        internal void FinalDisposition(MsmqMessageProperty messageProperty)
        { 
            this.poisonHandler.FinalDisposition(messageProperty);
        }

        // WaitForMessage 
        internal bool WaitForMessage(TimeSpan timeout)
        { 
            using (MsmqEmptyMessage message = new MsmqEmptyMessage()) 
            {
                return (MsmqQueue.ReceiveResult.Timeout != this.queue.TryPeek(message, timeout)); 
            }
        }
        //
        internal IAsyncResult BeginWaitForMessage(TimeSpan timeout, AsyncCallback callback, object state) 
        {
            return new WaitForMessageAsyncResult(this.queue, timeout, callback, state); 
        } 
        //
        public bool EndWaitForMessage(IAsyncResult result) 
        {
            return WaitForMessageAsyncResult.End(result);
        }
 
        internal bool TryReceive(MsmqInputMessage msmqMessage, TimeSpan timeout, MsmqTransactionMode transactionMode, out MsmqMessageProperty property)
        { 
            property = null; 

            MsmqQueue.ReceiveResult receiveResult = this.Queue.TryReceive(msmqMessage, timeout, transactionMode); 
            if (MsmqQueue.ReceiveResult.OperationCancelled == receiveResult)
                return true;
            if (MsmqQueue.ReceiveResult.Timeout == receiveResult)
                return false; 
            else
            { 
                property = new MsmqMessageProperty(msmqMessage); 
                if (this.Transactional)
                { 
                    if (this.PoisonHandler.CheckAndHandlePoisonMessage(property))
                    {
                        long lookupId = property.LookupId;
                        property = null; 
                        throw DiagnosticUtility.ExceptionUtility.ThrowHelperCritical(new MsmqPoisonMessageException(lookupId));
                    } 
                } 
                return true;
            } 
        }
        //
        internal IAsyncResult BeginTryReceive(MsmqInputMessage msmqMessage, TimeSpan timeout, MsmqTransactionMode transactionMode, AsyncCallback callback, object state)
        { 
            if (this.receiveParameters.ExactlyOnce)
                return new TryTransactedReceiveAsyncResult(this, msmqMessage, timeout, transactionMode, callback, state); 
            else 
                return new TryNonTransactedReceiveAsyncResult(this, msmqMessage, timeout, callback, state);
        } 
        //
        internal bool EndTryReceive(IAsyncResult result, out MsmqInputMessage msmqMessage, out MsmqMessageProperty msmqProperty)
        {
            msmqMessage = null; 
            msmqProperty = null;
 
            if (null == result) 
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("result");
 
            if (this.receiveParameters.ExactlyOnce)
            {
                TryTransactedReceiveAsyncResult receiveResult = result as TryTransactedReceiveAsyncResult;
                if (null == receiveResult) 
                    throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgument(SR.GetString(SR.InvalidAsyncResult));
                return TryTransactedReceiveAsyncResult.End(receiveResult, out msmqMessage, out msmqProperty); 
            } 
            else
            { 
                TryNonTransactedReceiveAsyncResult receiveResult = result as TryNonTransactedReceiveAsyncResult;
                if (null == receiveResult)
                    throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgument(SR.GetString(SR.InvalidAsyncResult));
                return TryNonTransactedReceiveAsyncResult.End(receiveResult, out msmqMessage, out msmqProperty); 
            }
        } 
 
        // TryReceiveAsyncResult (tx version)
        class TryTransactedReceiveAsyncResult : AsyncResult 
        {
            bool expired;
            MsmqReceiveHelper receiver;
            TimeoutHelper timeoutHelper; 
            Transaction txCurrent;
            MsmqInputMessage msmqMessage; 
            MsmqMessageProperty messageProperty; 
            MsmqTransactionMode transactionMode;
            static WaitCallback onComplete = OnComplete; 

            internal TryTransactedReceiveAsyncResult(
                MsmqReceiveHelper receiver,
                MsmqInputMessage msmqMessage, 
                TimeSpan timeout,
                MsmqTransactionMode transactionMode, 
                AsyncCallback callback, 
                object state)
                : base(callback, state) 
            {
                this.timeoutHelper = new TimeoutHelper(timeout);
                this.txCurrent = Transaction.Current;
                this.receiver = receiver; 
                this.msmqMessage = msmqMessage;
                this.transactionMode = transactionMode; 
                IOThreadScheduler.ScheduleCallback(onComplete, this); 
            }
 
            static void OnComplete(object parameter)
            {
                TryTransactedReceiveAsyncResult result = parameter as TryTransactedReceiveAsyncResult;
                Transaction savedTransaction = Transaction.Current; 
                Transaction.Current = result.txCurrent;
                try 
                { 
                    Exception ex = null;
                    try 
                    {
                        result.expired = ! result.receiver.TryReceive(result.msmqMessage, result.timeoutHelper.RemainingTime(), result.transactionMode, out result.messageProperty);
                    }
                    catch (Exception e) 
                    {
                        if (DiagnosticUtility.IsFatal(e)) 
                            throw; 
                        ex = e;
                    } 
                    result.Complete(false, ex);
                }
                finally
                { 
                    Transaction.Current = savedTransaction;
                } 
            } 

            internal static bool End(IAsyncResult result, out MsmqInputMessage msmqMessage, out MsmqMessageProperty property) 
            {
                TryTransactedReceiveAsyncResult receiveResult = AsyncResult.End(result);
                msmqMessage = receiveResult.msmqMessage;
                property = receiveResult.messageProperty; 
                return ! receiveResult.expired;
            } 
        } 

        // TryReceiveAsyncResult (non-tx version) 
        class TryNonTransactedReceiveAsyncResult : AsyncResult
        {
            MsmqQueue.ReceiveResult receiveResult;
            MsmqReceiveHelper receiver; 
            MsmqInputMessage msmqMessage;
            static AsyncCallback onCompleteStatic = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(OnCompleteStatic)); 
 

            internal TryNonTransactedReceiveAsyncResult(MsmqReceiveHelper receiver, MsmqInputMessage msmqMessage, TimeSpan timeout, AsyncCallback callback, object state) 
                : base(callback, state)
            {
                this.receiver = receiver;
                this.msmqMessage = msmqMessage; 
                receiver.Queue.BeginTryReceive(msmqMessage, timeout, onCompleteStatic, this);
            } 
 
            static void OnCompleteStatic(IAsyncResult result)
            { 
                (result.AsyncState as TryNonTransactedReceiveAsyncResult).OnComplete(result);
            }

            void OnComplete(IAsyncResult result) 
            {
                Exception ex = null; 
                try 
                {
                    receiveResult = receiver.Queue.EndTryReceive(result); 
                }
                catch (Exception e)
                {
                    if (DiagnosticUtility.IsFatal(e)) 
                        throw;
                    ex = e; 
                } 
                Complete(result.CompletedSynchronously, ex);
            } 

            internal static bool End(IAsyncResult result, out MsmqInputMessage msmqMessage, out MsmqMessageProperty property)
            {
                TryNonTransactedReceiveAsyncResult asyncResult = AsyncResult.End(result); 
                msmqMessage = asyncResult.msmqMessage;
                property = null; 
                if (MsmqQueue.ReceiveResult.Timeout == asyncResult.receiveResult) 
                    return false;
                else if (MsmqQueue.ReceiveResult.OperationCancelled == asyncResult.receiveResult) 
                    return true;
                else
                {
                    property = new MsmqMessageProperty(msmqMessage); 
                    return true;
                } 
            } 
        }
 
        // WaitForMessageAsyncResult
        class WaitForMessageAsyncResult : TypedAsyncResult
        {
            MsmqQueue msmqQueue; 
            MsmqEmptyMessage msmqMessage;
            static AsyncCallback onCompleteStatic = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(OnCompleteStatic)); 
 
            public WaitForMessageAsyncResult(MsmqQueue msmqQueue, TimeSpan timeout, AsyncCallback callback, object state)
                : base(callback, state) 
            {
                this.msmqMessage = new MsmqEmptyMessage();
                this.msmqQueue = msmqQueue;
                this.msmqQueue.BeginPeek(this.msmqMessage, timeout, onCompleteStatic, this); 
            }
 
            static void OnCompleteStatic(IAsyncResult result) 
            {
                ((WaitForMessageAsyncResult)result.AsyncState).OnComplete(result); 
            }

            void OnComplete(IAsyncResult result)
            { 
                this.msmqMessage.Dispose();
                MsmqQueue.ReceiveResult receiveResult = MsmqQueue.ReceiveResult.Unknown; 
                Exception ex = null; 
                try
                { 
                    receiveResult = this.msmqQueue.EndPeek(result);
                }
                catch (Exception e)
                { 
                    if (DiagnosticUtility.IsFatal(e))
                        throw; 
                    ex = e; 
                }
                if (null == ex) 
                    Complete(receiveResult != MsmqQueue.ReceiveResult.Timeout, result.CompletedSynchronously);
                else
                    Complete(result.CompletedSynchronously, ex);
            } 
        }
 
        // PostRollbackErrorStrategy 
        interface IPostRollbackErrorStrategy
        { 
            bool AnotherTryNeeded();
        }

        // SimplePostRollbackErrorStrategy 
        class SimplePostRollbackErrorStrategy : IPostRollbackErrorStrategy
        { 
            const int Attempts = 50; 
            const int MillisecondsToSleep = 100;
 
            int attemptsLeft = Attempts;
            long lookupId;

            internal SimplePostRollbackErrorStrategy(long lookupId) 
            {
                this.lookupId = lookupId; 
            } 

            bool IPostRollbackErrorStrategy.AnotherTryNeeded() 
            {
                if (--this.attemptsLeft > 0)
                {
                    if (attemptsLeft == (Attempts - 1)) 
                        MsmqDiagnostics.MessageLockedUnderTheTransaction(lookupId);
                    Thread.Sleep(TimeSpan.FromMilliseconds(MillisecondsToSleep)); 
                    return true; 
                }
                else 
                {
                    MsmqDiagnostics.MoveOrDeleteAttemptFailed(lookupId);
                    return false;
                } 
            }
        } 
    } 
}

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
// Copyright (c) Microsoft Corporation. All rights reserved.
                        

Link Menu

Network programming in C#, Network Programming in VB.NET, Network Programming in .NET
This book is available now!
Buy at Amazon US or
Buy at Amazon UK