SpoolingTask.cs source code in C# .NET

Source code for the .NET framework in C#

                        

Code:

/ 4.0 / 4.0 / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Core / System / Linq / Parallel / Scheduling / SpoolingTask.cs / 1305376 / SpoolingTask.cs

                            // ==++== 
//
//   Copyright (c) Microsoft Corporation.  All rights reserved.
//
// ==--== 
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
// 
// SpoolingTask.cs 
//
// [....] 
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-

using System.Threading; 
using System.Threading.Tasks;
using System.Diagnostics.Contracts; 
 
namespace System.Linq.Parallel
{ 

    /// 
    /// A factory class to execute spooling logic.
    ///  
    internal static class SpoolingTask
    { 
        //------------------------------------------------------------------------------------ 
        // Creates and begins execution of a new spooling task. Executes synchronously,
        // and by the time this API has returned all of the results have been produced. 
        //
        // Arguments:
        //     groupState      - values for inter-task communication
        //     partitions      - the producer enumerators 
        //     channels        - the producer-consumer channels
        //     taskScheduler   - the task manager on which to execute 
        // 

        internal static void SpoolStopAndGo( 
            QueryTaskGroupState groupState, PartitionedStream partitions,
            SynchronousChannel[] channels, TaskScheduler taskScheduler)
        {
            Contract.Assert(partitions.PartitionCount == channels.Length); 
            Contract.Assert(groupState != null);
 
            // Ensure all tasks in this query are parented under a common root. 
            Task rootTask = new Task(
                () => 
                {
                    int maxToRunInParallel = partitions.PartitionCount - 1;

                    // A stop-and-go merge uses the current thread for one task and then blocks before 
                    // returning to the caller, until all results have been accumulated. We do this by
                    // running the last partition on the calling thread. 
                    for (int i = 0; i < maxToRunInParallel; i++) 
                    {
                        TraceHelpers.TraceInfo("SpoolingTask::Spool: Running partition[{0}] asynchronously", i); 

                        QueryTask asyncTask = new StopAndGoSpoolingTask(i, groupState, partitions[i], channels[i]);
                        asyncTask.RunAsynchronously(taskScheduler);
                    } 

                    TraceHelpers.TraceInfo("SpoolingTask::Spool: Running partition[{0}] synchronously", maxToRunInParallel); 
 
                    // Run one task synchronously on the current thread.
                    QueryTask syncTask = new StopAndGoSpoolingTask( 
                        maxToRunInParallel, groupState, partitions[maxToRunInParallel], channels[maxToRunInParallel]);
                    syncTask.RunSynchronously(taskScheduler);
                });
 
            // Begin the query on the calling thread.
            groupState.QueryBegin(rootTask); 
 
            // We don't want to return until the task is finished.  Run it on the calling thread.
            rootTask.RunSynchronously(taskScheduler); 

            // Wait for the query to complete, propagate exceptions, and so on.
            // For pipelined queries, this step happens in the async enumerator.
            groupState.QueryEnd(false); 
        }
 
        //----------------------------------------------------------------------------------- 
        // Creates and begins execution of a new spooling task. Runs asynchronously.
        // 
        // Arguments:
        //     groupState      - values for inter-task communication
        //     partitions      - the producer enumerators
        //     channels        - the producer-consumer channels 
        //     taskScheduler   - the task manager on which to execute
        // 
 
        internal static void SpoolPipeline(
            QueryTaskGroupState groupState, PartitionedStream partitions, 
            AsynchronousChannel[] channels, TaskScheduler taskScheduler)
        {
            Contract.Assert(partitions.PartitionCount == channels.Length);
            Contract.Assert(groupState != null); 

            // Ensure all tasks in this query are parented under a common root. Because this 
            // is a pipelined query, we detach it from the parent (to avoid blocking the calling 
            // thread), and run the query on a separate thread.
            Task rootTask = new Task( 
                () =>
                {
                    // Create tasks that will enumerate the partitions in parallel. Because we're pipelining,
                    // we will begin running these tasks in parallel and then return. 
                    for (int i = 0; i < partitions.PartitionCount; i++)
                    { 
                        TraceHelpers.TraceInfo("SpoolingTask::Spool: Running partition[{0}] asynchronously", i); 

                        QueryTask asyncTask = new PipelineSpoolingTask(i, groupState, partitions[i], channels[i]); 
                        asyncTask.RunAsynchronously(taskScheduler);
                    }
                });
 
            // Begin the query on the calling thread.
            groupState.QueryBegin(rootTask); 
 
            // And schedule it for execution.  This is done after beginning to ensure no thread tries to
            // end the query before its root task has been recorded properly. 
            rootTask.Start(taskScheduler);

            // We don't call QueryEnd here; when we return, the query is still executing, and the
            // last enumerator to be disposed of will call QueryEnd for us. 
        }
 
        //----------------------------------------------------------------------------------- 
        // Creates and begins execution of a new spooling task. This is a for-all style
        // execution, meaning that the query will be run fully (for effect) before returning 
        // and that there are no channels into which data will be queued.
        //
        // Arguments:
        //     groupState      - values for inter-task communication 
        //     partitions      - the producer enumerators
        //     taskScheduler   - the task manager on which to execute 
        // 

        internal static void SpoolForAll( 
            QueryTaskGroupState groupState, PartitionedStream partitions, TaskScheduler taskScheduler)
        {
            Contract.Assert(groupState != null);
 
            // Ensure all tasks in this query are parented under a common root.
            Task rootTask = new Task( 
                () => 
                {
                    int maxToRunInParallel = partitions.PartitionCount - 1; 

                    // Create tasks that will enumerate the partitions in parallel "for effect"; in other words,
                    // no data will be placed into any kind of producer-consumer channel.
                    for (int i = 0; i < maxToRunInParallel; i++) 
                    {
                        TraceHelpers.TraceInfo("SpoolingTask::Spool: Running partition[{0}] asynchronously", i); 
 
                        QueryTask asyncTask = new ForAllSpoolingTask(i, groupState, partitions[i]);
                        asyncTask.RunAsynchronously(taskScheduler); 
                    }

                    TraceHelpers.TraceInfo("SpoolingTask::Spool: Running partition[{0}] synchronously", maxToRunInParallel);
 
                    // Run one task synchronously on the current thread.
                    QueryTask syncTask = new ForAllSpoolingTask(maxToRunInParallel, groupState, partitions[maxToRunInParallel]); 
                    syncTask.RunSynchronously(taskScheduler); 
                });
 
            // Begin the query on the calling thread.
            groupState.QueryBegin(rootTask);

            // We don't want to return until the task is finished.  Run it on the calling thread. 
            rootTask.RunSynchronously(taskScheduler);
 
            // Wait for the query to complete, propagate exceptions, and so on. 
            // For pipelined queries, this step happens in the async enumerator.
            groupState.QueryEnd(false); 
        }
    }

    ///  
    /// A spooling task handles marshaling data from a producer to a consumer. It's given
    /// a single enumerator object that contains all of the production algorithms, a single 
    /// destination channel from which consumers draw results, and (optionally) a 
    /// synchronization primitive using which to notify asynchronous consumers.
    ///  
    /// 
    /// 
    internal class StopAndGoSpoolingTask : SpoolingTaskBase
    { 
        // The data source from which to pull data.
        private QueryOperatorEnumerator m_source; 
 
        // The destination channel into which data is placed. This can be null if we are
        // enumerating "for effect", e.g. forall loop. 
        private SynchronousChannel m_destination;

        //-----------------------------------------------------------------------------------
        // Creates, but does not execute, a new spooling task. 
        //
        // Arguments: 
        //     taskIndex   - the unique index of this task 
        //     source      - the producer enumerator
        //     destination - the destination channel into which to spool elements 
        //
        // Assumptions:
        //     Source cannot be null, although the other arguments may be.
        // 

        internal StopAndGoSpoolingTask( 
            int taskIndex, QueryTaskGroupState groupState, 
            QueryOperatorEnumerator source, SynchronousChannel destination)
            : base(taskIndex, groupState) 
        {
            Contract.Assert(source != null);
            m_source = source;
            m_destination = destination; 
        }
 
        //------------------------------------------------------------------------------------ 
        // This method is responsible for enumerating results and enqueueing them to
        // the output channel(s) as appropriate.  Each base class implements its own. 
        //

        protected override void SpoolingWork()
        { 
            // We just enumerate over the entire source data stream, placing each element
            // into the destination channel. 
            TInputOutput current = default(TInputOutput); 
            TIgnoreKey keyUnused = default(TIgnoreKey);
 
            QueryOperatorEnumerator source = m_source;
            SynchronousChannel destination = m_destination;
            CancellationToken cancelToken = m_groupState.CancellationState.MergedCancellationToken;
 
            destination.Init();
            while (source.MoveNext(ref current, ref keyUnused)) 
            { 
                // If an abort has been requested, stop this worker immediately.
                if (cancelToken.IsCancellationRequested) 
                {
                    break;
                }
 
                destination.Enqueue(current);
            } 
        } 

        //----------------------------------------------------------------------------------- 
        // Ensure we signal that the channel is complete.
        //

        protected override void SpoolingFinally() 
        {
            // Call the base implementation. 
            base.SpoolingFinally(); 

            // Signal that we are done, in the case of asynchronous consumption. 
            if (m_destination != null)
            {
                m_destination.SetDone();
            } 

            // Dispose of the source enumerator *after* signaling that the task is done. 
            // We call Dispose() last to ensure that if it throws an exception, we will not cause a deadlock. 
            m_source.Dispose();
        } 
    }

    /// 
    /// A spooling task handles marshaling data from a producer to a consumer. It's given 
    /// a single enumerator object that contains all of the production algorithms, a single
    /// destination channel from which consumers draw results, and (optionally) a 
    /// synchronization primitive using which to notify asynchronous consumers. 
    /// 
    ///  
    /// 
    internal class PipelineSpoolingTask : SpoolingTaskBase
    {
        // The data source from which to pull data. 
        private QueryOperatorEnumerator m_source;
 
        // The destination channel into which data is placed. This can be null if we are 
        // enumerating "for effect", e.g. forall loop.
        private AsynchronousChannel m_destination; 

        //------------------------------------------------------------------------------------
        // Creates, but does not execute, a new spooling task.
        // 
        // Arguments:
        //     taskIndex   - the unique index of this task 
        //     source      - the producer enumerator 
        //     destination - the destination channel into which to spool elements
        // 
        // Assumptions:
        //     Source cannot be null, although the other arguments may be.
        //
 
        internal PipelineSpoolingTask(
            int taskIndex, QueryTaskGroupState groupState, 
            QueryOperatorEnumerator source, AsynchronousChannel destination) 
            : base(taskIndex, groupState)
        { 
            Contract.Assert(source != null);
            m_source = source;
            m_destination = destination;
        } 

        //------------------------------------------------------------------------------------ 
        // This method is responsible for enumerating results and enqueueing them to 
        // the output channel(s) as appropriate.  Each base class implements its own.
        // 

        protected override void SpoolingWork()
        {
            // We just enumerate over the entire source data stream, placing each element 
            // into the destination channel.
            TInputOutput current = default(TInputOutput); 
            TIgnoreKey keyUnused = default(TIgnoreKey); 

            QueryOperatorEnumerator source = m_source; 
            AsynchronousChannel destination = m_destination;
            CancellationToken cancelToken = m_groupState.CancellationState.MergedCancellationToken;

            while (source.MoveNext(ref current, ref keyUnused)) 
            {
                // If an abort has been requested, stop this worker immediately. 
                if (cancelToken.IsCancellationRequested) 
                {
                    break; 
                }

                destination.Enqueue(current);
            } 

            // Flush remaining data to the query consumer in preparation for channel shutdown. 
            destination.FlushBuffers(); 
        }
 
        //-----------------------------------------------------------------------------------
        // Ensure we signal that the channel is complete.
        //
 
        protected override void SpoolingFinally()
        { 
            // Call the base implementation. 
            base.SpoolingFinally();
 
            // Signal that we are done, in the case of asynchronous consumption.
            if (m_destination != null)
            {
                m_destination.SetDone(); 
            }
 
            // Dispose of the source enumerator *after* signaling that the task is done. 
            // We call Dispose() last to ensure that if it throws an exception, we will not cause a deadlock.
            m_source.Dispose(); 
        }
    }

    ///  
    /// A spooling task handles marshaling data from a producer to a consumer. It's given
    /// a single enumerator object that contains all of the production algorithms, a single 
    /// destination channel from which consumers draw results, and (optionally) a 
    /// synchronization primitive using which to notify asynchronous consumers.
    ///  
    /// 
    /// 
    internal class ForAllSpoolingTask : SpoolingTaskBase
    { 
        // The data source from which to pull data.
        private QueryOperatorEnumerator m_source; 
 
        //------------------------------------------------------------------------------------
        // Creates, but does not execute, a new spooling task. 
        //
        // Arguments:
        //     taskIndex   - the unique index of this task
        //     source      - the producer enumerator 
        //     destination - the destination channel into which to spool elements
        // 
        // Assumptions: 
        //     Source cannot be null, although the other arguments may be.
        // 

        internal ForAllSpoolingTask(
            int taskIndex, QueryTaskGroupState groupState,
            QueryOperatorEnumerator source) 
            : base(taskIndex, groupState)
        { 
            Contract.Assert(source != null); 
            m_source = source;
        } 

        //-----------------------------------------------------------------------------------
        // This method is responsible for enumerating results and enqueueing them to
        // the output channel(s) as appropriate.  Each base class implements its own. 
        //
 
        protected override void SpoolingWork() 
        {
            // We just enumerate over the entire source data stream for effect. 
            TInputOutput currentUnused = default(TInputOutput);
            TIgnoreKey keyUnused = default(TIgnoreKey);

            //Note: this only ever runs with a ForAll operator, and ForAllEnumerator performs cancellation checks 
            while (m_source.MoveNext(ref currentUnused, ref keyUnused))
                ; 
        } 

        //----------------------------------------------------------------------------------- 
        // Ensure we signal that the channel is complete.
        //

        protected override void SpoolingFinally() 
        {
            // Call the base implementation. 
            base.SpoolingFinally(); 

            // Dispose of the source enumerator 
            m_source.Dispose();
        }
    }
} 

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
// ==++== 
//
//   Copyright (c) Microsoft Corporation.  All rights reserved.
//
// ==--== 
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
// 
// SpoolingTask.cs 
//
// [....] 
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-

using System.Threading; 
using System.Threading.Tasks;
using System.Diagnostics.Contracts; 
 
namespace System.Linq.Parallel
{ 

    /// 
    /// A factory class to execute spooling logic.
    ///  
    internal static class SpoolingTask
    { 
        //------------------------------------------------------------------------------------ 
        // Creates and begins execution of a new spooling task. Executes synchronously,
        // and by the time this API has returned all of the results have been produced. 
        //
        // Arguments:
        //     groupState      - values for inter-task communication
        //     partitions      - the producer enumerators 
        //     channels        - the producer-consumer channels
        //     taskScheduler   - the task manager on which to execute 
        // 

        internal static void SpoolStopAndGo( 
            QueryTaskGroupState groupState, PartitionedStream partitions,
            SynchronousChannel[] channels, TaskScheduler taskScheduler)
        {
            Contract.Assert(partitions.PartitionCount == channels.Length); 
            Contract.Assert(groupState != null);
 
            // Ensure all tasks in this query are parented under a common root. 
            Task rootTask = new Task(
                () => 
                {
                    int maxToRunInParallel = partitions.PartitionCount - 1;

                    // A stop-and-go merge uses the current thread for one task and then blocks before 
                    // returning to the caller, until all results have been accumulated. We do this by
                    // running the last partition on the calling thread. 
                    for (int i = 0; i < maxToRunInParallel; i++) 
                    {
                        TraceHelpers.TraceInfo("SpoolingTask::Spool: Running partition[{0}] asynchronously", i); 

                        QueryTask asyncTask = new StopAndGoSpoolingTask(i, groupState, partitions[i], channels[i]);
                        asyncTask.RunAsynchronously(taskScheduler);
                    } 

                    TraceHelpers.TraceInfo("SpoolingTask::Spool: Running partition[{0}] synchronously", maxToRunInParallel); 
 
                    // Run one task synchronously on the current thread.
                    QueryTask syncTask = new StopAndGoSpoolingTask( 
                        maxToRunInParallel, groupState, partitions[maxToRunInParallel], channels[maxToRunInParallel]);
                    syncTask.RunSynchronously(taskScheduler);
                });
 
            // Begin the query on the calling thread.
            groupState.QueryBegin(rootTask); 
 
            // We don't want to return until the task is finished.  Run it on the calling thread.
            rootTask.RunSynchronously(taskScheduler); 

            // Wait for the query to complete, propagate exceptions, and so on.
            // For pipelined queries, this step happens in the async enumerator.
            groupState.QueryEnd(false); 
        }
 
        //----------------------------------------------------------------------------------- 
        // Creates and begins execution of a new spooling task. Runs asynchronously.
        // 
        // Arguments:
        //     groupState      - values for inter-task communication
        //     partitions      - the producer enumerators
        //     channels        - the producer-consumer channels 
        //     taskScheduler   - the task manager on which to execute
        // 
 
        internal static void SpoolPipeline(
            QueryTaskGroupState groupState, PartitionedStream partitions, 
            AsynchronousChannel[] channels, TaskScheduler taskScheduler)
        {
            Contract.Assert(partitions.PartitionCount == channels.Length);
            Contract.Assert(groupState != null); 

            // Ensure all tasks in this query are parented under a common root. Because this 
            // is a pipelined query, we detach it from the parent (to avoid blocking the calling 
            // thread), and run the query on a separate thread.
            Task rootTask = new Task( 
                () =>
                {
                    // Create tasks that will enumerate the partitions in parallel. Because we're pipelining,
                    // we will begin running these tasks in parallel and then return. 
                    for (int i = 0; i < partitions.PartitionCount; i++)
                    { 
                        TraceHelpers.TraceInfo("SpoolingTask::Spool: Running partition[{0}] asynchronously", i); 

                        QueryTask asyncTask = new PipelineSpoolingTask(i, groupState, partitions[i], channels[i]); 
                        asyncTask.RunAsynchronously(taskScheduler);
                    }
                });
 
            // Begin the query on the calling thread.
            groupState.QueryBegin(rootTask); 
 
            // And schedule it for execution.  This is done after beginning to ensure no thread tries to
            // end the query before its root task has been recorded properly. 
            rootTask.Start(taskScheduler);

            // We don't call QueryEnd here; when we return, the query is still executing, and the
            // last enumerator to be disposed of will call QueryEnd for us. 
        }
 
        //----------------------------------------------------------------------------------- 
        // Creates and begins execution of a new spooling task. This is a for-all style
        // execution, meaning that the query will be run fully (for effect) before returning 
        // and that there are no channels into which data will be queued.
        //
        // Arguments:
        //     groupState      - values for inter-task communication 
        //     partitions      - the producer enumerators
        //     taskScheduler   - the task manager on which to execute 
        // 

        internal static void SpoolForAll( 
            QueryTaskGroupState groupState, PartitionedStream partitions, TaskScheduler taskScheduler)
        {
            Contract.Assert(groupState != null);
 
            // Ensure all tasks in this query are parented under a common root.
            Task rootTask = new Task( 
                () => 
                {
                    int maxToRunInParallel = partitions.PartitionCount - 1; 

                    // Create tasks that will enumerate the partitions in parallel "for effect"; in other words,
                    // no data will be placed into any kind of producer-consumer channel.
                    for (int i = 0; i < maxToRunInParallel; i++) 
                    {
                        TraceHelpers.TraceInfo("SpoolingTask::Spool: Running partition[{0}] asynchronously", i); 
 
                        QueryTask asyncTask = new ForAllSpoolingTask(i, groupState, partitions[i]);
                        asyncTask.RunAsynchronously(taskScheduler); 
                    }

                    TraceHelpers.TraceInfo("SpoolingTask::Spool: Running partition[{0}] synchronously", maxToRunInParallel);
 
                    // Run one task synchronously on the current thread.
                    QueryTask syncTask = new ForAllSpoolingTask(maxToRunInParallel, groupState, partitions[maxToRunInParallel]); 
                    syncTask.RunSynchronously(taskScheduler); 
                });
 
            // Begin the query on the calling thread.
            groupState.QueryBegin(rootTask);

            // We don't want to return until the task is finished.  Run it on the calling thread. 
            rootTask.RunSynchronously(taskScheduler);
 
            // Wait for the query to complete, propagate exceptions, and so on. 
            // For pipelined queries, this step happens in the async enumerator.
            groupState.QueryEnd(false); 
        }
    }

    ///  
    /// A spooling task handles marshaling data from a producer to a consumer. It's given
    /// a single enumerator object that contains all of the production algorithms, a single 
    /// destination channel from which consumers draw results, and (optionally) a 
    /// synchronization primitive using which to notify asynchronous consumers.
    ///  
    /// 
    /// 
    internal class StopAndGoSpoolingTask : SpoolingTaskBase
    { 
        // The data source from which to pull data.
        private QueryOperatorEnumerator m_source; 
 
        // The destination channel into which data is placed. This can be null if we are
        // enumerating "for effect", e.g. forall loop. 
        private SynchronousChannel m_destination;

        //-----------------------------------------------------------------------------------
        // Creates, but does not execute, a new spooling task. 
        //
        // Arguments: 
        //     taskIndex   - the unique index of this task 
        //     source      - the producer enumerator
        //     destination - the destination channel into which to spool elements 
        //
        // Assumptions:
        //     Source cannot be null, although the other arguments may be.
        // 

        internal StopAndGoSpoolingTask( 
            int taskIndex, QueryTaskGroupState groupState, 
            QueryOperatorEnumerator source, SynchronousChannel destination)
            : base(taskIndex, groupState) 
        {
            Contract.Assert(source != null);
            m_source = source;
            m_destination = destination; 
        }
 
        //------------------------------------------------------------------------------------ 
        // This method is responsible for enumerating results and enqueueing them to
        // the output channel(s) as appropriate.  Each base class implements its own. 
        //

        protected override void SpoolingWork()
        { 
            // We just enumerate over the entire source data stream, placing each element
            // into the destination channel. 
            TInputOutput current = default(TInputOutput); 
            TIgnoreKey keyUnused = default(TIgnoreKey);
 
            QueryOperatorEnumerator source = m_source;
            SynchronousChannel destination = m_destination;
            CancellationToken cancelToken = m_groupState.CancellationState.MergedCancellationToken;
 
            destination.Init();
            while (source.MoveNext(ref current, ref keyUnused)) 
            { 
                // If an abort has been requested, stop this worker immediately.
                if (cancelToken.IsCancellationRequested) 
                {
                    break;
                }
 
                destination.Enqueue(current);
            } 
        } 

        //----------------------------------------------------------------------------------- 
        // Ensure we signal that the channel is complete.
        //

        protected override void SpoolingFinally() 
        {
            // Call the base implementation. 
            base.SpoolingFinally(); 

            // Signal that we are done, in the case of asynchronous consumption. 
            if (m_destination != null)
            {
                m_destination.SetDone();
            } 

            // Dispose of the source enumerator *after* signaling that the task is done. 
            // We call Dispose() last to ensure that if it throws an exception, we will not cause a deadlock. 
            m_source.Dispose();
        } 
    }

    /// 
    /// A spooling task handles marshaling data from a producer to a consumer. It's given 
    /// a single enumerator object that contains all of the production algorithms, a single
    /// destination channel from which consumers draw results, and (optionally) a 
    /// synchronization primitive using which to notify asynchronous consumers. 
    /// 
    ///  
    /// 
    internal class PipelineSpoolingTask : SpoolingTaskBase
    {
        // The data source from which to pull data. 
        private QueryOperatorEnumerator m_source;
 
        // The destination channel into which data is placed. This can be null if we are 
        // enumerating "for effect", e.g. forall loop.
        private AsynchronousChannel m_destination; 

        //------------------------------------------------------------------------------------
        // Creates, but does not execute, a new spooling task.
        // 
        // Arguments:
        //     taskIndex   - the unique index of this task 
        //     source      - the producer enumerator 
        //     destination - the destination channel into which to spool elements
        // 
        // Assumptions:
        //     Source cannot be null, although the other arguments may be.
        //
 
        internal PipelineSpoolingTask(
            int taskIndex, QueryTaskGroupState groupState, 
            QueryOperatorEnumerator source, AsynchronousChannel destination) 
            : base(taskIndex, groupState)
        { 
            Contract.Assert(source != null);
            m_source = source;
            m_destination = destination;
        } 

        //------------------------------------------------------------------------------------ 
        // This method is responsible for enumerating results and enqueueing them to 
        // the output channel(s) as appropriate.  Each base class implements its own.
        // 

        protected override void SpoolingWork()
        {
            // We just enumerate over the entire source data stream, placing each element 
            // into the destination channel.
            TInputOutput current = default(TInputOutput); 
            TIgnoreKey keyUnused = default(TIgnoreKey); 

            QueryOperatorEnumerator source = m_source; 
            AsynchronousChannel destination = m_destination;
            CancellationToken cancelToken = m_groupState.CancellationState.MergedCancellationToken;

            while (source.MoveNext(ref current, ref keyUnused)) 
            {
                // If an abort has been requested, stop this worker immediately. 
                if (cancelToken.IsCancellationRequested) 
                {
                    break; 
                }

                destination.Enqueue(current);
            } 

            // Flush remaining data to the query consumer in preparation for channel shutdown. 
            destination.FlushBuffers(); 
        }
 
        //-----------------------------------------------------------------------------------
        // Ensure we signal that the channel is complete.
        //
 
        protected override void SpoolingFinally()
        { 
            // Call the base implementation. 
            base.SpoolingFinally();
 
            // Signal that we are done, in the case of asynchronous consumption.
            if (m_destination != null)
            {
                m_destination.SetDone(); 
            }
 
            // Dispose of the source enumerator *after* signaling that the task is done. 
            // We call Dispose() last to ensure that if it throws an exception, we will not cause a deadlock.
            m_source.Dispose(); 
        }
    }

    ///  
    /// A spooling task handles marshaling data from a producer to a consumer. It's given
    /// a single enumerator object that contains all of the production algorithms, a single 
    /// destination channel from which consumers draw results, and (optionally) a 
    /// synchronization primitive using which to notify asynchronous consumers.
    ///  
    /// 
    /// 
    internal class ForAllSpoolingTask : SpoolingTaskBase
    { 
        // The data source from which to pull data.
        private QueryOperatorEnumerator m_source; 
 
        //------------------------------------------------------------------------------------
        // Creates, but does not execute, a new spooling task. 
        //
        // Arguments:
        //     taskIndex   - the unique index of this task
        //     source      - the producer enumerator 
        //     destination - the destination channel into which to spool elements
        // 
        // Assumptions: 
        //     Source cannot be null, although the other arguments may be.
        // 

        internal ForAllSpoolingTask(
            int taskIndex, QueryTaskGroupState groupState,
            QueryOperatorEnumerator source) 
            : base(taskIndex, groupState)
        { 
            Contract.Assert(source != null); 
            m_source = source;
        } 

        //-----------------------------------------------------------------------------------
        // This method is responsible for enumerating results and enqueueing them to
        // the output channel(s) as appropriate.  Each base class implements its own. 
        //
 
        protected override void SpoolingWork() 
        {
            // We just enumerate over the entire source data stream for effect. 
            TInputOutput currentUnused = default(TInputOutput);
            TIgnoreKey keyUnused = default(TIgnoreKey);

            //Note: this only ever runs with a ForAll operator, and ForAllEnumerator performs cancellation checks 
            while (m_source.MoveNext(ref currentUnused, ref keyUnused))
                ; 
        } 

        //----------------------------------------------------------------------------------- 
        // Ensure we signal that the channel is complete.
        //

        protected override void SpoolingFinally() 
        {
            // Call the base implementation. 
            base.SpoolingFinally(); 

            // Dispose of the source enumerator 
            m_source.Dispose();
        }
    }
} 

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
                        

Link Menu

Network programming in C#, Network Programming in VB.NET, Network Programming in .NET
This book is available now!
Buy at Amazon US or
Buy at Amazon UK