SingleQueryOperator.cs source code in C# .NET

Source code for the .NET framework in C#

                        

Code:

/ 4.0 / 4.0 / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Core / System / Linq / Parallel / QueryOperators / Unary / SingleQueryOperator.cs / 1305376 / SingleQueryOperator.cs

                            // ==++== 
//
//   Copyright (c) Microsoft Corporation.  All rights reserved.
//
// ==--== 
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
// 
// SingleQueryOperator.cs 
//
// [....] 
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-

using System.Collections.Generic; 
using System.Threading;
using System.Diagnostics.Contracts; 
 
namespace System.Linq.Parallel
{ 
    /// 
    /// Single searches the input to find the sole element that satisfies the (optional)
    /// predicate.  If multiple such elements are found, the caller is responsible for
    /// producing an error.  There is some degree of cross-partition synchronization to 
    /// proactively hault the search if we ever determine there are multiple elements
    /// satisfying the search in the input. 
    ///  
    /// 
    internal sealed class SingleQueryOperator : UnaryQueryOperator 
    {

        private readonly Func m_predicate; // The optional predicate used during the search.
 
        //----------------------------------------------------------------------------------------
        // Initializes a new Single operator. 
        // 
        // Arguments:
        //     child                - the child whose data we will reverse 
        //

        internal SingleQueryOperator(IEnumerable child, Func predicate)
            :base(child) 
        {
            Contract.Assert(child != null, "child data source cannot be null"); 
            m_predicate = predicate; 
        }
 
        //---------------------------------------------------------------------------------------
        // Just opens the current operator, including opening the child and wrapping it with
        // partitions as needed.
        // 

        internal override QueryResults Open( 
            QuerySettings settings, bool preferStriping) 
        {
            QueryResults childQueryResults = Child.Open(settings, false); 
            return new UnaryQueryOperatorResults(childQueryResults, this, settings, preferStriping);
        }

        internal override void WrapPartitionedStream( 
            PartitionedStream inputStream, IPartitionedStreamRecipient recipient, bool preferStriping, QuerySettings settings)
        { 
            int partitionCount = inputStream.PartitionCount; 
            PartitionedStream outputStream = new PartitionedStream(
                partitionCount, Util.GetDefaultComparer(), OrdinalIndexState.Shuffled); 

            Shared totalElementCount = new Shared(0);
            for (int i = 0; i < partitionCount; i++)
            { 
                outputStream[i] = new SingleQueryOperatorEnumerator(inputStream[i], m_predicate, totalElementCount);
            } 
 
            recipient.Receive(outputStream);
        } 

        //---------------------------------------------------------------------------------------
        // Returns an enumerable that represents the query executing sequentially.
        // 

        internal override IEnumerable AsSequentialQuery(CancellationToken token) 
        { 
            Contract.Assert(false, "This method should never be called as it is an ending operator with LimitsParallelism=false.");
            throw new NotSupportedException(); 
        }

        //---------------------------------------------------------------------------------------
        // Whether this operator performs a premature merge. 
        //
 
        internal override bool LimitsParallelism 
        {
            get { return false; } 
        }

        //----------------------------------------------------------------------------------------
        // The enumerator type responsible for executing the Single operation. 
        //
 
        class SingleQueryOperatorEnumerator : QueryOperatorEnumerator 
        {
 
            private QueryOperatorEnumerator m_source; // The data source to enumerate.
            private Func m_predicate; // The optional predicate used during the search.
            private bool m_alreadySearched; // Whether we have searched our input already.
            private bool m_yieldExtra; // Whether we found more than one element. 

            // Data shared among partitions. 
            private Shared m_totalElementCount; // The total count of elements found. 

            //--------------------------------------------------------------------------------------- 
            // Instantiates a new enumerator.
            //

            internal SingleQueryOperatorEnumerator(QueryOperatorEnumerator source, 
                                                   Func predicate, Shared totalElementCount)
            { 
                Contract.Assert(source != null); 
                Contract.Assert(totalElementCount != null);
 
                m_source = source;
                m_predicate = predicate;
                m_totalElementCount = totalElementCount;
            } 

            //---------------------------------------------------------------------------------------- 
            // Straightforward IEnumerator methods. 
            //
 
            internal override bool MoveNext(ref TSource currentElement, ref int currentKey)
            {
                Contract.Assert(m_source != null);
 
                if (m_alreadySearched)
                { 
                    // If we've already searched, we will "fake out" the caller by returning an extra 
                    // element at the end in the case that we've found more than one element.
                    if (m_yieldExtra) 
                    {
                        m_yieldExtra = false;
                        currentElement = default(TSource);
                        currentKey = 0; 
                        return true;
                    } 
 
                    return false;
                } 

                // Scan our input, looking for a match.
                bool found = false;
                TSource current = default(TSource); 
                TKey keyUnused = default(TKey);
 
                while (m_source.MoveNext(ref current, ref keyUnused)) 
                {
                    // If the predicate is null or the current element satisfies it, we will remember 
                    // it so that we can yield it later.  We then proceed with scanning the input
                    // in case there are multiple such elements.
                    if (m_predicate == null || m_predicate(current))
                    { 
                        // Notify other partitions.
                        Interlocked.Increment(ref m_totalElementCount.Value); 
 
                        currentElement = current;
                        currentKey = 0; 

                        if (found)
                        {
                            // Already found an element previously, we can exit. 
                            m_yieldExtra = true;
                            break; 
                        } 
                        else
                        { 
                            found = true;
                        }
                    }
 
                    // If we've already determined there is more than one matching element in the
                    // data source, we can exit right away. 
                    if (m_totalElementCount.Value > 1) 
                    {
                        break; 
                    }
                }
                m_alreadySearched = true;
 
                return found;
            } 
 
            protected override void Dispose(bool disposing)
            { 
                m_source.Dispose();
            }
        }
    } 
}

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
// ==++== 
//
//   Copyright (c) Microsoft Corporation.  All rights reserved.
//
// ==--== 
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
// 
// SingleQueryOperator.cs 
//
// [....] 
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-

using System.Collections.Generic; 
using System.Threading;
using System.Diagnostics.Contracts; 
 
namespace System.Linq.Parallel
{ 
    /// 
    /// Single searches the input to find the sole element that satisfies the (optional)
    /// predicate.  If multiple such elements are found, the caller is responsible for
    /// producing an error.  There is some degree of cross-partition synchronization to 
    /// proactively hault the search if we ever determine there are multiple elements
    /// satisfying the search in the input. 
    ///  
    /// 
    internal sealed class SingleQueryOperator : UnaryQueryOperator 
    {

        private readonly Func m_predicate; // The optional predicate used during the search.
 
        //----------------------------------------------------------------------------------------
        // Initializes a new Single operator. 
        // 
        // Arguments:
        //     child                - the child whose data we will reverse 
        //

        internal SingleQueryOperator(IEnumerable child, Func predicate)
            :base(child) 
        {
            Contract.Assert(child != null, "child data source cannot be null"); 
            m_predicate = predicate; 
        }
 
        //---------------------------------------------------------------------------------------
        // Just opens the current operator, including opening the child and wrapping it with
        // partitions as needed.
        // 

        internal override QueryResults Open( 
            QuerySettings settings, bool preferStriping) 
        {
            QueryResults childQueryResults = Child.Open(settings, false); 
            return new UnaryQueryOperatorResults(childQueryResults, this, settings, preferStriping);
        }

        internal override void WrapPartitionedStream( 
            PartitionedStream inputStream, IPartitionedStreamRecipient recipient, bool preferStriping, QuerySettings settings)
        { 
            int partitionCount = inputStream.PartitionCount; 
            PartitionedStream outputStream = new PartitionedStream(
                partitionCount, Util.GetDefaultComparer(), OrdinalIndexState.Shuffled); 

            Shared totalElementCount = new Shared(0);
            for (int i = 0; i < partitionCount; i++)
            { 
                outputStream[i] = new SingleQueryOperatorEnumerator(inputStream[i], m_predicate, totalElementCount);
            } 
 
            recipient.Receive(outputStream);
        } 

        //---------------------------------------------------------------------------------------
        // Returns an enumerable that represents the query executing sequentially.
        // 

        internal override IEnumerable AsSequentialQuery(CancellationToken token) 
        { 
            Contract.Assert(false, "This method should never be called as it is an ending operator with LimitsParallelism=false.");
            throw new NotSupportedException(); 
        }

        //---------------------------------------------------------------------------------------
        // Whether this operator performs a premature merge. 
        //
 
        internal override bool LimitsParallelism 
        {
            get { return false; } 
        }

        //----------------------------------------------------------------------------------------
        // The enumerator type responsible for executing the Single operation. 
        //
 
        class SingleQueryOperatorEnumerator : QueryOperatorEnumerator 
        {
 
            private QueryOperatorEnumerator m_source; // The data source to enumerate.
            private Func m_predicate; // The optional predicate used during the search.
            private bool m_alreadySearched; // Whether we have searched our input already.
            private bool m_yieldExtra; // Whether we found more than one element. 

            // Data shared among partitions. 
            private Shared m_totalElementCount; // The total count of elements found. 

            //--------------------------------------------------------------------------------------- 
            // Instantiates a new enumerator.
            //

            internal SingleQueryOperatorEnumerator(QueryOperatorEnumerator source, 
                                                   Func predicate, Shared totalElementCount)
            { 
                Contract.Assert(source != null); 
                Contract.Assert(totalElementCount != null);
 
                m_source = source;
                m_predicate = predicate;
                m_totalElementCount = totalElementCount;
            } 

            //---------------------------------------------------------------------------------------- 
            // Straightforward IEnumerator methods. 
            //
 
            internal override bool MoveNext(ref TSource currentElement, ref int currentKey)
            {
                Contract.Assert(m_source != null);
 
                if (m_alreadySearched)
                { 
                    // If we've already searched, we will "fake out" the caller by returning an extra 
                    // element at the end in the case that we've found more than one element.
                    if (m_yieldExtra) 
                    {
                        m_yieldExtra = false;
                        currentElement = default(TSource);
                        currentKey = 0; 
                        return true;
                    } 
 
                    return false;
                } 

                // Scan our input, looking for a match.
                bool found = false;
                TSource current = default(TSource); 
                TKey keyUnused = default(TKey);
 
                while (m_source.MoveNext(ref current, ref keyUnused)) 
                {
                    // If the predicate is null or the current element satisfies it, we will remember 
                    // it so that we can yield it later.  We then proceed with scanning the input
                    // in case there are multiple such elements.
                    if (m_predicate == null || m_predicate(current))
                    { 
                        // Notify other partitions.
                        Interlocked.Increment(ref m_totalElementCount.Value); 
 
                        currentElement = current;
                        currentKey = 0; 

                        if (found)
                        {
                            // Already found an element previously, we can exit. 
                            m_yieldExtra = true;
                            break; 
                        } 
                        else
                        { 
                            found = true;
                        }
                    }
 
                    // If we've already determined there is more than one matching element in the
                    // data source, we can exit right away. 
                    if (m_totalElementCount.Value > 1) 
                    {
                        break; 
                    }
                }
                m_alreadySearched = true;
 
                return found;
            } 
 
            protected override void Dispose(bool disposing)
            { 
                m_source.Dispose();
            }
        }
    } 
}

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
                        

Link Menu

Network programming in C#, Network Programming in VB.NET, Network Programming in .NET
This book is available now!
Buy at Amazon US or
Buy at Amazon UK