ZipQueryOperator.cs source code in C# .NET

Source code for the .NET framework in C#

                        

Code:

/ 4.0 / 4.0 / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Core / System / Linq / Parallel / QueryOperators / Binary / ZipQueryOperator.cs / 1305376 / ZipQueryOperator.cs

                            // ==++== 
//
//   Copyright (c) Microsoft Corporation.  All rights reserved.
//
// ==--== 
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
// 
// ZipQueryOperator.cs 
//
// [....] 
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-

using System.Collections.Generic; 
using System.Diagnostics.Contracts;
using System.Threading; 
 
namespace System.Linq.Parallel
{ 
    /// 
    /// A Zip operator combines two input data sources into a single output stream,
    /// using a pairwise element matching algorithm. For example, the result of zipping
    /// two vectors a = {0, 1, 2, 3} and b = {9, 8, 7, 6} is the vector of pairs, 
    /// c = {(0,9), (1,8), (2,7), (3,6)}. Because the expectation is that each element
    /// is matched with the element in the other data source at the same ordinal 
    /// position, the zip operator requires order preservation. 
    /// 
    ///  
    /// 
    /// 
    internal sealed class ZipQueryOperator
        : QueryOperator 
    {
 
        private readonly Func m_resultSelector; // To select result elements. 
        private readonly QueryOperator m_leftChild;
        private readonly QueryOperator m_rightChild; 
        private readonly bool m_prematureMergeLeft = false; // Whether to prematurely merge the left data source
        private readonly bool m_prematureMergeRight = false; // Whether to prematurely merge the right data source

        //---------------------------------------------------------------------------------------- 
        // Initializes a new zip operator.
        // 
        // Arguments: 
        //    leftChild     - the left data source from which to pull data.
        //    rightChild    - the right data source from which to pull data. 
        //

        internal ZipQueryOperator(
            ParallelQuery leftChildSource, IEnumerable rightChildSource, 
            Func resultSelector)
            :this( 
                QueryOperator.AsQueryOperator(leftChildSource), 
                QueryOperator.AsQueryOperator(rightChildSource),
                resultSelector) 
        {
        }

        private ZipQueryOperator( 
            QueryOperator left, QueryOperator right,
            Func resultSelector) 
            : base(left.SpecifiedQuerySettings.Merge(right.SpecifiedQuerySettings)) 
        {
            Contract.Assert(resultSelector != null, "operator cannot be null"); 

            m_leftChild = left;
            m_rightChild = right;
            m_resultSelector = resultSelector; 
            m_outputOrdered = m_leftChild.OutputOrdered || m_rightChild.OutputOrdered;
 
            m_prematureMergeLeft = m_leftChild.OrdinalIndexState != OrdinalIndexState.Indexible; 
            m_prematureMergeRight = m_rightChild.OrdinalIndexState != OrdinalIndexState.Indexible;
        } 

        //---------------------------------------------------------------------------------------
        // Just opens the current operator, including opening the children and wrapping them with
        // partitions as needed. 
        //
 
        internal override QueryResults Open(QuerySettings settings, bool preferStriping) 
        {
            // We just open our child operators, left and then right. 
            QueryResults leftChildResults = m_leftChild.Open(settings, preferStriping);
            QueryResults rightChildResults = m_rightChild.Open(settings, preferStriping);

            int partitionCount = settings.DegreeOfParallelism.Value; 
            if (m_prematureMergeLeft)
            { 
                PartitionedStreamMerger merger = new PartitionedStreamMerger( 
                    false, ParallelMergeOptions.FullyBuffered, settings.TaskScheduler, m_leftChild.OutputOrdered,
                    settings.CancellationState, settings.QueryId); 
                leftChildResults.GivePartitionedStream(merger);
                leftChildResults = new ListQueryResults(
                    merger.MergeExecutor.GetResultsAsArray(), partitionCount, preferStriping);
            } 

            if (m_prematureMergeRight) 
            { 
                PartitionedStreamMerger merger = new PartitionedStreamMerger(
                    false, ParallelMergeOptions.FullyBuffered, settings.TaskScheduler, m_rightChild.OutputOrdered, 
                    settings.CancellationState, settings.QueryId);
                rightChildResults.GivePartitionedStream(merger);
                rightChildResults = new ListQueryResults(
                    merger.MergeExecutor.GetResultsAsArray(), partitionCount, preferStriping); 
            }
 
            return new ZipQueryOperatorResults(leftChildResults, rightChildResults, m_resultSelector, partitionCount, preferStriping); 
        }
 
        //---------------------------------------------------------------------------------------
        // Returns an enumerable that represents the query executing sequentially.
        //
 
        internal override IEnumerable AsSequentialQuery(CancellationToken token)
        { 
            using(IEnumerator leftEnumerator = m_leftChild.AsSequentialQuery(token).GetEnumerator()) 
            using(IEnumerator rightEnumerator = m_rightChild.AsSequentialQuery(token).GetEnumerator())
            { 
                while(leftEnumerator.MoveNext() && rightEnumerator.MoveNext())
                {
                    yield return m_resultSelector(leftEnumerator.Current, rightEnumerator.Current);
                } 
            }
        } 
 
        //---------------------------------------------------------------------------------------
        // The state of the order index of the results returned by this operator. 
        //

        internal override OrdinalIndexState OrdinalIndexState
        { 
            get
            { 
                return OrdinalIndexState.Indexible; 
            }
        } 

        //----------------------------------------------------------------------------------------
        // Whether this operator performs a premature merge.
        // 

        internal override bool LimitsParallelism 
        { 
            get
            { 
                return m_prematureMergeLeft || m_prematureMergeRight;
            }
        }
 
        //---------------------------------------------------------------------------------------
        // A special QueryResults class for the Zip operator. It requires that both of the child 
        // QueryResults are indexible. 
        //
 
        internal class ZipQueryOperatorResults : QueryResults
        {
            private readonly QueryResults m_leftChildResults;
            private readonly QueryResults m_rightChildResults; 
            private readonly Func m_resultSelector; // To select result elements.
            private readonly int m_count; 
            private readonly int m_partitionCount; 
            private readonly bool m_preferStriping;
 
            internal ZipQueryOperatorResults(
                QueryResults leftChildResults, QueryResults rightChildResults,
                Func resultSelector, int partitionCount, bool preferStriping)
            { 
                m_leftChildResults = leftChildResults;
                m_rightChildResults = rightChildResults; 
                m_resultSelector = resultSelector; 
                m_partitionCount = partitionCount;
                m_preferStriping = preferStriping; 

                Contract.Assert(m_leftChildResults.IsIndexible);
                Contract.Assert(m_rightChildResults.IsIndexible);
 
                m_count = Math.Min(m_leftChildResults.Count, m_rightChildResults.Count);
            } 
 
            internal override int ElementsCount
            { 
                get { return m_count; }
            }

            internal override bool IsIndexible 
            {
                get { return true; } 
            } 

            internal override TOutput GetElement(int index) 
            {
                return m_resultSelector(m_leftChildResults.GetElement(index), m_rightChildResults.GetElement(index));
            }
 
            internal override void GivePartitionedStream(IPartitionedStreamRecipient recipient)
            { 
                PartitionedStream partitionedStream = ExchangeUtilities.PartitionDataSource(this, m_partitionCount, m_preferStriping); 
                recipient.Receive(partitionedStream);
            } 
        }
    }
}

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
// ==++== 
//
//   Copyright (c) Microsoft Corporation.  All rights reserved.
//
// ==--== 
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
// 
// ZipQueryOperator.cs 
//
// [....] 
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-

using System.Collections.Generic; 
using System.Diagnostics.Contracts;
using System.Threading; 
 
namespace System.Linq.Parallel
{ 
    /// 
    /// A Zip operator combines two input data sources into a single output stream,
    /// using a pairwise element matching algorithm. For example, the result of zipping
    /// two vectors a = {0, 1, 2, 3} and b = {9, 8, 7, 6} is the vector of pairs, 
    /// c = {(0,9), (1,8), (2,7), (3,6)}. Because the expectation is that each element
    /// is matched with the element in the other data source at the same ordinal 
    /// position, the zip operator requires order preservation. 
    /// 
    ///  
    /// 
    /// 
    internal sealed class ZipQueryOperator
        : QueryOperator 
    {
 
        private readonly Func m_resultSelector; // To select result elements. 
        private readonly QueryOperator m_leftChild;
        private readonly QueryOperator m_rightChild; 
        private readonly bool m_prematureMergeLeft = false; // Whether to prematurely merge the left data source
        private readonly bool m_prematureMergeRight = false; // Whether to prematurely merge the right data source

        //---------------------------------------------------------------------------------------- 
        // Initializes a new zip operator.
        // 
        // Arguments: 
        //    leftChild     - the left data source from which to pull data.
        //    rightChild    - the right data source from which to pull data. 
        //

        internal ZipQueryOperator(
            ParallelQuery leftChildSource, IEnumerable rightChildSource, 
            Func resultSelector)
            :this( 
                QueryOperator.AsQueryOperator(leftChildSource), 
                QueryOperator.AsQueryOperator(rightChildSource),
                resultSelector) 
        {
        }

        private ZipQueryOperator( 
            QueryOperator left, QueryOperator right,
            Func resultSelector) 
            : base(left.SpecifiedQuerySettings.Merge(right.SpecifiedQuerySettings)) 
        {
            Contract.Assert(resultSelector != null, "operator cannot be null"); 

            m_leftChild = left;
            m_rightChild = right;
            m_resultSelector = resultSelector; 
            m_outputOrdered = m_leftChild.OutputOrdered || m_rightChild.OutputOrdered;
 
            m_prematureMergeLeft = m_leftChild.OrdinalIndexState != OrdinalIndexState.Indexible; 
            m_prematureMergeRight = m_rightChild.OrdinalIndexState != OrdinalIndexState.Indexible;
        } 

        //---------------------------------------------------------------------------------------
        // Just opens the current operator, including opening the children and wrapping them with
        // partitions as needed. 
        //
 
        internal override QueryResults Open(QuerySettings settings, bool preferStriping) 
        {
            // We just open our child operators, left and then right. 
            QueryResults leftChildResults = m_leftChild.Open(settings, preferStriping);
            QueryResults rightChildResults = m_rightChild.Open(settings, preferStriping);

            int partitionCount = settings.DegreeOfParallelism.Value; 
            if (m_prematureMergeLeft)
            { 
                PartitionedStreamMerger merger = new PartitionedStreamMerger( 
                    false, ParallelMergeOptions.FullyBuffered, settings.TaskScheduler, m_leftChild.OutputOrdered,
                    settings.CancellationState, settings.QueryId); 
                leftChildResults.GivePartitionedStream(merger);
                leftChildResults = new ListQueryResults(
                    merger.MergeExecutor.GetResultsAsArray(), partitionCount, preferStriping);
            } 

            if (m_prematureMergeRight) 
            { 
                PartitionedStreamMerger merger = new PartitionedStreamMerger(
                    false, ParallelMergeOptions.FullyBuffered, settings.TaskScheduler, m_rightChild.OutputOrdered, 
                    settings.CancellationState, settings.QueryId);
                rightChildResults.GivePartitionedStream(merger);
                rightChildResults = new ListQueryResults(
                    merger.MergeExecutor.GetResultsAsArray(), partitionCount, preferStriping); 
            }
 
            return new ZipQueryOperatorResults(leftChildResults, rightChildResults, m_resultSelector, partitionCount, preferStriping); 
        }
 
        //---------------------------------------------------------------------------------------
        // Returns an enumerable that represents the query executing sequentially.
        //
 
        internal override IEnumerable AsSequentialQuery(CancellationToken token)
        { 
            using(IEnumerator leftEnumerator = m_leftChild.AsSequentialQuery(token).GetEnumerator()) 
            using(IEnumerator rightEnumerator = m_rightChild.AsSequentialQuery(token).GetEnumerator())
            { 
                while(leftEnumerator.MoveNext() && rightEnumerator.MoveNext())
                {
                    yield return m_resultSelector(leftEnumerator.Current, rightEnumerator.Current);
                } 
            }
        } 
 
        //---------------------------------------------------------------------------------------
        // The state of the order index of the results returned by this operator. 
        //

        internal override OrdinalIndexState OrdinalIndexState
        { 
            get
            { 
                return OrdinalIndexState.Indexible; 
            }
        } 

        //----------------------------------------------------------------------------------------
        // Whether this operator performs a premature merge.
        // 

        internal override bool LimitsParallelism 
        { 
            get
            { 
                return m_prematureMergeLeft || m_prematureMergeRight;
            }
        }
 
        //---------------------------------------------------------------------------------------
        // A special QueryResults class for the Zip operator. It requires that both of the child 
        // QueryResults are indexible. 
        //
 
        internal class ZipQueryOperatorResults : QueryResults
        {
            private readonly QueryResults m_leftChildResults;
            private readonly QueryResults m_rightChildResults; 
            private readonly Func m_resultSelector; // To select result elements.
            private readonly int m_count; 
            private readonly int m_partitionCount; 
            private readonly bool m_preferStriping;
 
            internal ZipQueryOperatorResults(
                QueryResults leftChildResults, QueryResults rightChildResults,
                Func resultSelector, int partitionCount, bool preferStriping)
            { 
                m_leftChildResults = leftChildResults;
                m_rightChildResults = rightChildResults; 
                m_resultSelector = resultSelector; 
                m_partitionCount = partitionCount;
                m_preferStriping = preferStriping; 

                Contract.Assert(m_leftChildResults.IsIndexible);
                Contract.Assert(m_rightChildResults.IsIndexible);
 
                m_count = Math.Min(m_leftChildResults.Count, m_rightChildResults.Count);
            } 
 
            internal override int ElementsCount
            { 
                get { return m_count; }
            }

            internal override bool IsIndexible 
            {
                get { return true; } 
            } 

            internal override TOutput GetElement(int index) 
            {
                return m_resultSelector(m_leftChildResults.GetElement(index), m_rightChildResults.GetElement(index));
            }
 
            internal override void GivePartitionedStream(IPartitionedStreamRecipient recipient)
            { 
                PartitionedStream partitionedStream = ExchangeUtilities.PartitionDataSource(this, m_partitionCount, m_preferStriping); 
                recipient.Receive(partitionedStream);
            } 
        }
    }
}

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
                        

Link Menu

Network programming in C#, Network Programming in VB.NET, Network Programming in .NET
This book is available now!
Buy at Amazon US or
Buy at Amazon UK