QueryOperator.cs source code in C# .NET

Source code for the .NET framework in C#

                        

Code:

/ 4.0 / 4.0 / untmp / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Core / System / Linq / Parallel / QueryOperators / QueryOperator.cs / 1305376 / QueryOperator.cs

                            // ==++== 
//
//   Copyright (c) Microsoft Corporation.  All rights reserved.
//
// ==--== 
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
// 
// QueryOperator.cs 
//
// [....] 
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-

using System.Collections; 
using System.Collections.Generic;
using System.Threading; 
using System.Threading.Tasks; 
using System.Diagnostics.Contracts;
 
namespace System.Linq.Parallel
{
    /// 
    /// This is the abstract base class for all query operators in the system. It 
    /// implements the ParallelQuery{T} type so that it can be bound as the source
    /// of parallel queries and so that it can be returned as the result of parallel query 
    /// operations. Not much is in here, although it does serve as the "entry point" for 
    /// opening all query operators: it will lazily analyze and cache a plan the first
    /// time the tree is opened, and will open the tree upon calls to GetEnumerator. 
    ///
    /// Notes:
    ///     This class implements ParallelQuery so that any parallel query operator
    ///     can bind to the parallel query provider overloads. This allows us to string 
    ///     together operators w/out the user always specifying AsParallel, e.g.
    ///     Select(Where(..., ...), ...), and so forth. 
    ///  
    /// 
    internal abstract class QueryOperator : ParallelQuery 
    {
        protected bool m_outputOrdered;

        internal QueryOperator(QuerySettings settings) 
            :this(false, settings)
        { 
        } 

        internal QueryOperator(bool isOrdered, QuerySettings settings) 
            :base(settings)
        {
            m_outputOrdered = isOrdered;
        } 

        //---------------------------------------------------------------------------------------- 
        // Opening the query operator will do whatever is necessary to begin enumerating its 
        // results. This includes in some cases actually introducing parallelism, enumerating
        // other query operators, and so on. This is abstract and left to the specific concrete 
        // operator classes to implement.
        //
        // Arguments:
        //     settings - various flags and settings to control query execution 
        //     preferStriping - flag representing whether the caller prefers striped partitioning
        //                      over range partitioning 
        // 
        // Return Values:
        //     Either a single enumerator, or a partition (for partition parallelism). 
        //

        internal abstract QueryResults Open(QuerySettings settings, bool preferStriping);
 
        //---------------------------------------------------------------------------------------
        // The GetEnumerator method is the standard IEnumerable mechanism for walking the 
        // contents of a query. Note that GetEnumerator is only ever called on the root node: 
        // we then proceed by calling Open on all of the subsequent query nodes.
        // 
        // Arguments:
        //     usePipelining     - whether the returned enumerator will pipeline (i.e. return
        //                         control to the caller when the query is spawned) or not
        //                         (i.e. use the calling thread to execute the query).  Note 
        //                         that there are some conditions during which this hint will
        //                         be ignored -- currently, that happens only if a sort is 
        //                         found anywhere in the query graph. 
        //     suppressOrderPreservation - whether to shut order preservation off, regardless
        //                                 of the contents of the query 
        //
        // Return Value:
        //     An enumerator that retrieves elements from the query output.
        // 
        // Notes:
        //     The default mode of execution is to pipeline the query execution with respect 
        //     to the GetEnumerator caller (aka the consumer). An overload is available 
        //     that can be used to override the default with an explicit choice.
        // 

        public override IEnumerator GetEnumerator()
        {
 
            // Buffering is unspecified and  order preservation is not suppressed.
            return GetEnumerator(null, false); 
        } 

        public IEnumerator GetEnumerator(ParallelMergeOptions? mergeOptions) 
        {
            // Pass through the value supplied for pipelining, and do not suppress
            // order preservation by default.
            return GetEnumerator(mergeOptions, false); 
        }
 
        //--------------------------------------------------------------------------------------- 
        // Is the output of this operator ordered?
        // 

        internal bool OutputOrdered
        {
            get { return m_outputOrdered; } 
        }
 
        internal virtual IEnumerator GetEnumerator(ParallelMergeOptions? mergeOptions, bool suppressOrderPreservation) 
        {
            // Return a dummy enumerator that will call back GetOpenedEnumerator() on 'this' QueryOperator 
            // the first time the user calls MoveNext(). We do this to prevent executing the query if user
            // never calls MoveNext().
            return new QueryOpeningEnumerator(this, mergeOptions, suppressOrderPreservation);
        } 

        //--------------------------------------------------------------------------------------- 
        // The GetOpenedEnumerator method return an enumerator that walks the contents of a query. 
        // The enumerator will be "opened", which means that PLINQ will start executing the query
        // immediately, even before the user calls MoveNext() for the first time. 
        //
        internal IEnumerator GetOpenedEnumerator(ParallelMergeOptions? mergeOptions, bool suppressOrder, bool forEffect,
            QuerySettings querySettings)
        { 
            // If the top-level enumerator forces a premature merge, run the query sequentially.
            if (querySettings.ExecutionMode.Value == ParallelExecutionMode.Default && LimitsParallelism) 
            { 
                IEnumerable opSequential = AsSequentialQuery(querySettings.CancellationState.ExternalCancellationToken);
                return ExceptionAggregator.WrapEnumerable(opSequential, querySettings.CancellationState).GetEnumerator(); 
            }

            QueryResults queryResults = GetQueryResults(querySettings);
 
            if (mergeOptions == null)
            { 
                mergeOptions = querySettings.MergeOptions; 
            }
 
            Contract.Assert(mergeOptions != null);

            // Top-level pre-emptive cancellation test.
            // This handles situations where cancellation has occured before execution commences 
            // The handling for in-execution occurs in QueryTaskGroupState.QueryEnd()
 
            if(querySettings.CancellationState.MergedCancellationToken.IsCancellationRequested) 
            {
                if (querySettings.CancellationState.ExternalCancellationToken.IsCancellationRequested) 
                    throw new OperationCanceledException(querySettings.CancellationState.ExternalCancellationToken);
                else
                    throw new OperationCanceledException();
            } 

            bool orderedMerge = OutputOrdered && !suppressOrder; 
 
            PartitionedStreamMerger merger = new PartitionedStreamMerger(forEffect, mergeOptions.GetValueOrDefault(),
                                                                                           querySettings.TaskScheduler, 
                                                                                           orderedMerge,
                                                                                           querySettings.CancellationState,
                                                                                           querySettings.QueryId);
 
            queryResults.GivePartitionedStream(merger); // hook up the data flow between the operator-executors, starting from the merger.
 
            if (forEffect) 
            {
                return null; 
            }

            return merger.MergeExecutor.GetEnumerator();
        } 

 
        // This method is called only once on the 'head operator' which is the last specified operator in the query 
        // This method then recursively uses Open() to prepare itself and the other enumerators.
        private QueryResults GetQueryResults(QuerySettings querySettings) 
        {
            TraceHelpers.TraceInfo("[timing]: {0}: starting execution - QueryOperator<>::GetQueryResults", DateTime.Now.Ticks);

            // All mandatory query settings must be specified 
            Contract.Assert(querySettings.TaskScheduler != null);
            Contract.Assert(querySettings.DegreeOfParallelism.HasValue); 
            Contract.Assert(querySettings.ExecutionMode.HasValue); 

            // Now just open the query tree's root operator, supplying a specific DOP 
            return Open(querySettings, false);
        }

        //---------------------------------------------------------------------------------------- 
        // Executes the query and returns the results in an array.
        // 
 
        internal TOutput[] ExecuteAndGetResultsAsArray()
        { 
            QuerySettings querySettings =
                SpecifiedQuerySettings
                .WithPerExecutionSettings()
                .WithDefaults(); 

            QueryLifecycle.LogicalQueryExecutionBegin(querySettings.QueryId); 
            try 
            {
                if (querySettings.ExecutionMode.Value == ParallelExecutionMode.Default && LimitsParallelism) 
                {
                    IEnumerable opSequential = AsSequentialQuery(querySettings.CancellationState.ExternalCancellationToken);
                    IEnumerable opSequentialWithCancelChecks = CancellableEnumerable.Wrap(opSequential, querySettings.CancellationState.ExternalCancellationToken);
                    return ExceptionAggregator.WrapEnumerable(opSequentialWithCancelChecks, querySettings.CancellationState).ToArray(); 
                }
 
                QueryResults results = GetQueryResults(querySettings); 

                if (results.IsIndexible && OutputOrdered) 
                {
                    // The special array-based merge performs better if the output is ordered, because
                    // it does not have to pay for ordering. In the unordered case, we it appears that
                    // the stop-and-go merge performs a little better. 
                    ArrayMergeHelper merger = new ArrayMergeHelper(SpecifiedQuerySettings, results);
                    merger.Execute(); 
                    TOutput[] output = merger.GetResultsAsArray(); 
                    querySettings.CleanStateAtQueryEnd();
                    return output; 
                }
                else
                {
                    PartitionedStreamMerger merger = 
                        new PartitionedStreamMerger(false, ParallelMergeOptions.FullyBuffered, querySettings.TaskScheduler,
                            OutputOrdered, querySettings.CancellationState, querySettings.QueryId); 
                    results.GivePartitionedStream(merger); 
                    TOutput[] output = merger.MergeExecutor.GetResultsAsArray();
                    querySettings.CleanStateAtQueryEnd(); 
                    return output;
                }
            }
            finally 
            {
                QueryLifecycle.LogicalQueryExecutionEnd(querySettings.QueryId); 
            } 
        }
 
        //---------------------------------------------------------------------------------------
        // Returns an enumerable that represents the query executing sequentially.
        //
        // Note that iterating the returned enumerable will not wrap exceptions AggregateException. 
        // Before this enumerable is returned to the user, we must wrap it with an
        // ExceptionAggregator. 
        // 

        internal abstract IEnumerable AsSequentialQuery(CancellationToken token); 


        //----------------------------------------------------------------------------------------
        // Whether this operator performs a premature merge. 
        //
 
        internal abstract bool LimitsParallelism { get; } 

        //---------------------------------------------------------------------------------------- 
        // The state of the order index of the results returned by this operator.
        //

        internal abstract OrdinalIndexState OrdinalIndexState { get; } 

        //--------------------------------------------------------------------------------------- 
        // A helper method that executes the query rooted at the openedChild operator, and returns 
        // the results as ListQueryResults.
        // 

        internal static ListQueryResults ExecuteAndCollectResults(
            PartitionedStream openedChild,
            int partitionCount, 
            bool outputOrdered,
            bool useStriping, 
            QuerySettings settings) 
        {
            TaskScheduler taskScheduler = settings.TaskScheduler; 



            MergeExecutor executor = MergeExecutor.Execute( 
                openedChild, false, ParallelMergeOptions.FullyBuffered, taskScheduler, outputOrdered,
                settings.CancellationState, settings.QueryId); 
            return new ListQueryResults(executor.GetResultsAsArray(), partitionCount, useStriping); 
        }
 

        //----------------------------------------------------------------------------------------
        // Returns a QueryOperator for any IEnumerable data source. This will just do a
        // cast and return a reference to the same data source if the source is another query 
        // operator, but will lazily allocate a scan operation and return that otherwise.
        // 
        // Arguments: 
        //    source  - any enumerable data source to be wrapped
        // 
        // Return Value:
        //    A query operator.
        //
 
        internal static QueryOperator AsQueryOperator(IEnumerable source)
        { 
            Contract.Assert(source != null); 

            // Just try casting the data source to a query operator, in the case that 
            // our child is just another query operator.
            QueryOperator sourceAsOperator = source as QueryOperator;

            if (sourceAsOperator == null) 
            {
                OrderedParallelQuery orderedQuery = source as OrderedParallelQuery; 
                if (orderedQuery != null) 
                {
                    // We have to handle OrderedParallelQuery specially. In all other cases, 
                    // ParallelQuery *is* the QueryOperator. But, OrderedParallelQuery
                    // is not QueryOperator, it only has a reference to one. Ideally, we
                    // would want SortQueryOperator to inherit from OrderedParallelQuery,
                    // but that conflicts with other constraints on our class hierarchy. 
                    sourceAsOperator = (QueryOperator)orderedQuery.SortOperator;
                } 
                else 
                {
                    // If the cast failed, then the data source is a real piece of data. We 
                    // just construct a new scan operator on top of it.
                    sourceAsOperator = new ScanQueryOperator(source);
                }
            } 

            Contract.Assert(sourceAsOperator != null); 
 
            return sourceAsOperator;
        } 
    }
}

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
                        

Link Menu

Network programming in C#, Network Programming in VB.NET, Network Programming in .NET
This book is available now!
Buy at Amazon US or
Buy at Amazon UK