OrderPreservingPipeliningSpoolingTask.cs source code in C# .NET

Source code for the .NET framework in C#

                        

Code:

/ 4.0 / 4.0 / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Core / System / Linq / Parallel / Scheduling / OrderPreservingPipeliningSpoolingTask.cs / 1305376 / OrderPreservingPipeliningSpoolingTask.cs

                            // ==++== 
//
//   Copyright (c) Microsoft Corporation.  All rights reserved.
//
// ==--== 
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
// 
// OrderPreservingPipeliningSpoolingTask.cs 
//
// [....] 
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-

using System; 
using System.Collections.Generic;
using System.Linq; 
using System.Linq.Parallel; 
using System.Text;
using System.Threading; 
using System.Threading.Tasks;
using System.Diagnostics.Contracts;

namespace System.Linq.Parallel 
{
    class OrderPreservingPipeliningSpoolingTask : SpoolingTaskBase 
    { 
        private readonly QueryTaskGroupState m_taskGroupState; // State shared among tasks.
        private readonly TaskScheduler m_taskScheduler; // The task manager to execute the query. 
        private readonly QueryOperatorEnumerator m_partition; // The source partition.
        private readonly bool[] m_consumerWaiting; // Whether a consumer is waiting on a particular producer
        private readonly bool[] m_producerWaiting; // Whether a particular producer is waiting on the consumer
        private readonly bool[] m_producerDone; // Whether each producer is done 
        private readonly int m_partitionIndex; // Index of the partition owned by this task.
 
        private readonly Queue>[] m_buffers; // The buffer for the results 
        private readonly object m_bufferLock; // A lock for the buffer
 
        /// 
        /// Whether the producer is allowed to buffer up elements before handing a chunk to the consumer.
        /// If false, the producer will make each result available to the consumer immediately after it is
        /// produced. 
        /// 
        private readonly bool m_autoBuffered; 
 
        /// 
        /// The number of elements to accumulate on the producer before copying the elements to the 
        /// producer-consumer buffer. This constant is only used in the AutoBuffered mode.
        ///
        /// Experimentally, 16 appears to be sufficient buffer size to compensate for the synchronization
        /// cost. 
        /// 
        private const int PRODUCER_BUFFER_AUTO_SIZE = 16; 
 
        /// 
        /// Constructor 
        /// 
        internal OrderPreservingPipeliningSpoolingTask(
            QueryOperatorEnumerator partition,
            QueryTaskGroupState taskGroupState, 
            bool[] consumerWaiting,
            bool[] producerWaiting, 
            bool[] producerDone, 
            int partitionIndex,
            Queue>[] buffers, 
            object bufferLock,
            TaskScheduler taskScheduler,
            bool autoBuffered)
            :base(partitionIndex, taskGroupState) 
        {
            Contract.Assert(partition != null); 
            Contract.Assert(taskGroupState != null); 
            Contract.Assert(consumerWaiting != null);
            Contract.Assert(producerWaiting != null && producerWaiting.Length == consumerWaiting.Length); 
            Contract.Assert(producerDone != null && producerDone.Length == consumerWaiting.Length);
            Contract.Assert(buffers != null && buffers.Length == consumerWaiting.Length);
            Contract.Assert(partitionIndex >= 0 && partitionIndex < consumerWaiting.Length);
 
            m_partition = partition;
            m_taskGroupState = taskGroupState; 
            m_producerDone = producerDone; 
            m_consumerWaiting = consumerWaiting;
            m_producerWaiting = producerWaiting; 
            m_partitionIndex = partitionIndex;
            m_buffers = buffers;
            m_bufferLock = bufferLock;
            m_taskScheduler = taskScheduler; 
            m_autoBuffered = autoBuffered;
        } 
 
        /// 
        /// This method is responsible for enumerating results and enqueueing them to 
        /// the output buffer as appropriate.  Each base class implements its own.
        /// 
        protected override void SpoolingWork()
        { 
            TOutput element = default(TOutput);
            int key = default(int); 
 
            int chunkSize = m_autoBuffered ? PRODUCER_BUFFER_AUTO_SIZE : 1;
            Pair[] chunk = new Pair[chunkSize]; 
            var partition = m_partition;
            CancellationToken cancelToken = m_taskGroupState.CancellationState.MergedCancellationToken;

            int lastChunkSize; 
            do
            { 
                lastChunkSize = 0; 
                while (lastChunkSize < chunkSize && partition.MoveNext(ref element, ref key))
                { 
                    chunk[lastChunkSize] = new Pair(key, element);
                    lastChunkSize++;
                }
 
                if (lastChunkSize == 0) break;
 
                lock (m_bufferLock) 
                {
                    // Check if the query has been cancelled. 
                    if (cancelToken.IsCancellationRequested)
                    {
                        break;
                    } 

                    for (int i = 0; i < lastChunkSize; i++) 
                    { 
                        m_buffers[m_partitionIndex].Enqueue(chunk[i]);
                    } 

                    if (m_consumerWaiting[m_partitionIndex])
                    {
                        Monitor.Pulse(m_bufferLock); 
                        m_consumerWaiting[m_partitionIndex] = false;
                    } 
 
                    // If the producer buffer is too large, wait.
                    // Note: we already checked for cancellation after acquiring the lock on this producer. 
                    // That guarantees that the consumer will eventually wake up the producer.
                    if (m_buffers[m_partitionIndex].Count >= OrderPreservingPipeliningMergeHelper.MAX_BUFFER_SIZE)
                    {
                        m_producerWaiting[m_partitionIndex] = true; 
                        Monitor.Wait(m_bufferLock);
                    } 
                } 
            } while (lastChunkSize == chunkSize);
        } 


        /// 
        /// Creates and begins execution of a new set of spooling tasks. 
        /// 
        public static void Spool( 
            QueryTaskGroupState groupState, PartitionedStream partitions, 
            bool[] consumerWaiting, bool[] producerWaiting, bool[] producerDone,
            Queue>[] buffers, object[] bufferLocks, 
            TaskScheduler taskScheduler, bool autoBuffered)
        {
            Contract.Assert(groupState != null);
            Contract.Assert(partitions != null); 
            Contract.Assert(producerDone != null && producerDone.Length == partitions.PartitionCount);
            Contract.Assert(buffers != null && buffers.Length == partitions.PartitionCount); 
            Contract.Assert(bufferLocks != null); 

            int degreeOfParallelism = partitions.PartitionCount; 

            // Initialize the buffers and buffer locks.
            for (int i = 0; i < degreeOfParallelism; i++)
            { 
                buffers[i] = new Queue>(OrderPreservingPipeliningMergeHelper.INITIAL_BUFFER_SIZE);
                bufferLocks[i] = new object(); 
            } 

            // Ensure all tasks in this query are parented under a common root. Because this 
            // is a pipelined query, we detach it from the parent (to avoid blocking the calling
            // thread), and run the query on a separate thread.
            Task rootTask = new Task(
                () => 
                {
                    for (int i = 0; i < degreeOfParallelism; i++) 
                    { 
                        QueryTask asyncTask = new OrderPreservingPipeliningSpoolingTask(
                            partitions[i], groupState, consumerWaiting, producerWaiting, 
                            producerDone, i, buffers, bufferLocks[i], taskScheduler, autoBuffered);
                        asyncTask.RunAsynchronously(taskScheduler);
                    }
                }); 

            // Begin the query on the calling thread. 
            groupState.QueryBegin(rootTask); 

            // And schedule it for execution.  This is done after beginning to ensure no thread tries to 
            // end the query before its root task has been recorded properly.
            rootTask.Start(taskScheduler);

            // We don't call QueryEnd here; when we return, the query is still executing, and the 
            // last enumerator to be disposed of will call QueryEnd for us.
        } 
 
        /// 
        /// Dispose the underlying enumerator and wake up the consumer if necessary. 
        /// 
        protected override void SpoolingFinally()
        {
            // Let the consumer know that this producer is done. 
            lock (m_bufferLock)
            { 
                m_producerDone[m_partitionIndex] = true; 
                if (m_consumerWaiting[m_partitionIndex])
                { 
                    Monitor.Pulse(m_bufferLock);
                    m_consumerWaiting[m_partitionIndex] = false;
                }
            } 

            // Call the base implementation. 
            base.SpoolingFinally(); 

            // Dispose of the source enumerator *after* signaling that the task is done. 
            // We call Dispose() last to ensure that if it throws an exception, we will not cause a deadlock.
            m_partition.Dispose();
        }
    } 
}

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
// ==++== 
//
//   Copyright (c) Microsoft Corporation.  All rights reserved.
//
// ==--== 
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
// 
// OrderPreservingPipeliningSpoolingTask.cs 
//
// [....] 
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-

using System; 
using System.Collections.Generic;
using System.Linq; 
using System.Linq.Parallel; 
using System.Text;
using System.Threading; 
using System.Threading.Tasks;
using System.Diagnostics.Contracts;

namespace System.Linq.Parallel 
{
    class OrderPreservingPipeliningSpoolingTask : SpoolingTaskBase 
    { 
        private readonly QueryTaskGroupState m_taskGroupState; // State shared among tasks.
        private readonly TaskScheduler m_taskScheduler; // The task manager to execute the query. 
        private readonly QueryOperatorEnumerator m_partition; // The source partition.
        private readonly bool[] m_consumerWaiting; // Whether a consumer is waiting on a particular producer
        private readonly bool[] m_producerWaiting; // Whether a particular producer is waiting on the consumer
        private readonly bool[] m_producerDone; // Whether each producer is done 
        private readonly int m_partitionIndex; // Index of the partition owned by this task.
 
        private readonly Queue>[] m_buffers; // The buffer for the results 
        private readonly object m_bufferLock; // A lock for the buffer
 
        /// 
        /// Whether the producer is allowed to buffer up elements before handing a chunk to the consumer.
        /// If false, the producer will make each result available to the consumer immediately after it is
        /// produced. 
        /// 
        private readonly bool m_autoBuffered; 
 
        /// 
        /// The number of elements to accumulate on the producer before copying the elements to the 
        /// producer-consumer buffer. This constant is only used in the AutoBuffered mode.
        ///
        /// Experimentally, 16 appears to be sufficient buffer size to compensate for the synchronization
        /// cost. 
        /// 
        private const int PRODUCER_BUFFER_AUTO_SIZE = 16; 
 
        /// 
        /// Constructor 
        /// 
        internal OrderPreservingPipeliningSpoolingTask(
            QueryOperatorEnumerator partition,
            QueryTaskGroupState taskGroupState, 
            bool[] consumerWaiting,
            bool[] producerWaiting, 
            bool[] producerDone, 
            int partitionIndex,
            Queue>[] buffers, 
            object bufferLock,
            TaskScheduler taskScheduler,
            bool autoBuffered)
            :base(partitionIndex, taskGroupState) 
        {
            Contract.Assert(partition != null); 
            Contract.Assert(taskGroupState != null); 
            Contract.Assert(consumerWaiting != null);
            Contract.Assert(producerWaiting != null && producerWaiting.Length == consumerWaiting.Length); 
            Contract.Assert(producerDone != null && producerDone.Length == consumerWaiting.Length);
            Contract.Assert(buffers != null && buffers.Length == consumerWaiting.Length);
            Contract.Assert(partitionIndex >= 0 && partitionIndex < consumerWaiting.Length);
 
            m_partition = partition;
            m_taskGroupState = taskGroupState; 
            m_producerDone = producerDone; 
            m_consumerWaiting = consumerWaiting;
            m_producerWaiting = producerWaiting; 
            m_partitionIndex = partitionIndex;
            m_buffers = buffers;
            m_bufferLock = bufferLock;
            m_taskScheduler = taskScheduler; 
            m_autoBuffered = autoBuffered;
        } 
 
        /// 
        /// This method is responsible for enumerating results and enqueueing them to 
        /// the output buffer as appropriate.  Each base class implements its own.
        /// 
        protected override void SpoolingWork()
        { 
            TOutput element = default(TOutput);
            int key = default(int); 
 
            int chunkSize = m_autoBuffered ? PRODUCER_BUFFER_AUTO_SIZE : 1;
            Pair[] chunk = new Pair[chunkSize]; 
            var partition = m_partition;
            CancellationToken cancelToken = m_taskGroupState.CancellationState.MergedCancellationToken;

            int lastChunkSize; 
            do
            { 
                lastChunkSize = 0; 
                while (lastChunkSize < chunkSize && partition.MoveNext(ref element, ref key))
                { 
                    chunk[lastChunkSize] = new Pair(key, element);
                    lastChunkSize++;
                }
 
                if (lastChunkSize == 0) break;
 
                lock (m_bufferLock) 
                {
                    // Check if the query has been cancelled. 
                    if (cancelToken.IsCancellationRequested)
                    {
                        break;
                    } 

                    for (int i = 0; i < lastChunkSize; i++) 
                    { 
                        m_buffers[m_partitionIndex].Enqueue(chunk[i]);
                    } 

                    if (m_consumerWaiting[m_partitionIndex])
                    {
                        Monitor.Pulse(m_bufferLock); 
                        m_consumerWaiting[m_partitionIndex] = false;
                    } 
 
                    // If the producer buffer is too large, wait.
                    // Note: we already checked for cancellation after acquiring the lock on this producer. 
                    // That guarantees that the consumer will eventually wake up the producer.
                    if (m_buffers[m_partitionIndex].Count >= OrderPreservingPipeliningMergeHelper.MAX_BUFFER_SIZE)
                    {
                        m_producerWaiting[m_partitionIndex] = true; 
                        Monitor.Wait(m_bufferLock);
                    } 
                } 
            } while (lastChunkSize == chunkSize);
        } 


        /// 
        /// Creates and begins execution of a new set of spooling tasks. 
        /// 
        public static void Spool( 
            QueryTaskGroupState groupState, PartitionedStream partitions, 
            bool[] consumerWaiting, bool[] producerWaiting, bool[] producerDone,
            Queue>[] buffers, object[] bufferLocks, 
            TaskScheduler taskScheduler, bool autoBuffered)
        {
            Contract.Assert(groupState != null);
            Contract.Assert(partitions != null); 
            Contract.Assert(producerDone != null && producerDone.Length == partitions.PartitionCount);
            Contract.Assert(buffers != null && buffers.Length == partitions.PartitionCount); 
            Contract.Assert(bufferLocks != null); 

            int degreeOfParallelism = partitions.PartitionCount; 

            // Initialize the buffers and buffer locks.
            for (int i = 0; i < degreeOfParallelism; i++)
            { 
                buffers[i] = new Queue>(OrderPreservingPipeliningMergeHelper.INITIAL_BUFFER_SIZE);
                bufferLocks[i] = new object(); 
            } 

            // Ensure all tasks in this query are parented under a common root. Because this 
            // is a pipelined query, we detach it from the parent (to avoid blocking the calling
            // thread), and run the query on a separate thread.
            Task rootTask = new Task(
                () => 
                {
                    for (int i = 0; i < degreeOfParallelism; i++) 
                    { 
                        QueryTask asyncTask = new OrderPreservingPipeliningSpoolingTask(
                            partitions[i], groupState, consumerWaiting, producerWaiting, 
                            producerDone, i, buffers, bufferLocks[i], taskScheduler, autoBuffered);
                        asyncTask.RunAsynchronously(taskScheduler);
                    }
                }); 

            // Begin the query on the calling thread. 
            groupState.QueryBegin(rootTask); 

            // And schedule it for execution.  This is done after beginning to ensure no thread tries to 
            // end the query before its root task has been recorded properly.
            rootTask.Start(taskScheduler);

            // We don't call QueryEnd here; when we return, the query is still executing, and the 
            // last enumerator to be disposed of will call QueryEnd for us.
        } 
 
        /// 
        /// Dispose the underlying enumerator and wake up the consumer if necessary. 
        /// 
        protected override void SpoolingFinally()
        {
            // Let the consumer know that this producer is done. 
            lock (m_bufferLock)
            { 
                m_producerDone[m_partitionIndex] = true; 
                if (m_consumerWaiting[m_partitionIndex])
                { 
                    Monitor.Pulse(m_bufferLock);
                    m_consumerWaiting[m_partitionIndex] = false;
                }
            } 

            // Call the base implementation. 
            base.SpoolingFinally(); 

            // Dispose of the source enumerator *after* signaling that the task is done. 
            // We call Dispose() last to ensure that if it throws an exception, we will not cause a deadlock.
            m_partition.Dispose();
        }
    } 
}

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
                        

Link Menu

Network programming in C#, Network Programming in VB.NET, Network Programming in .NET
This book is available now!
Buy at Amazon US or
Buy at Amazon UK