ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [26/51] [partial] ignite git commit: IGNITE-1513: Finalized build procedure.
Date Tue, 22 Sep 2015 15:02:00 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/f0bac562/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskHolder.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskHolder.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskHolder.cs
deleted file mode 100644
index dfe0d18..0000000
--- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskHolder.cs
+++ /dev/null
@@ -1,484 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace Apache.Ignite.Core.Impl.Compute
-{
-    using System;
-    using System.Collections.Generic;
-    using System.Collections.ObjectModel;
-    using System.Diagnostics;
-    using System.Diagnostics.CodeAnalysis;
-    using System.Linq;
-    using Apache.Ignite.Core.Cluster;
-    using Apache.Ignite.Core.Common;
-    using Apache.Ignite.Core.Compute;
-    using Apache.Ignite.Core.Impl.Cluster;
-    using Apache.Ignite.Core.Impl.Common;
-    using Apache.Ignite.Core.Impl.Compute.Closure;
-    using Apache.Ignite.Core.Impl.Memory;
-    using Apache.Ignite.Core.Impl.Portable;
-    using Apache.Ignite.Core.Impl.Resource;
-
-    /// <summary>
-    /// Compute task holder interface used to avoid generics.
-    /// </summary>
-    internal interface IComputeTaskHolder
-    {
-        /// <summary>
-        /// Perform map step.
-        /// </summary>
-        /// <param name="inStream">Stream with IN data (topology info).</param>
-        /// <param name="outStream">Stream for OUT data (map result).</param>
-        /// <returns>Map with produced jobs.</returns>
-        void Map(PlatformMemoryStream inStream, PlatformMemoryStream outStream);
-
-        /// <summary>
-        /// Process local job result.
-        /// </summary>
-        /// <param name="jobId">Job pointer.</param>
-        /// <returns>Policy.</returns>
-        int JobResultLocal(ComputeJobHolder jobId);
-
-        /// <summary>
-        /// Process remote job result.
-        /// </summary>
-        /// <param name="jobId">Job pointer.</param>
-        /// <param name="stream">Stream.</param>
-        /// <returns>Policy.</returns>
-        int JobResultRemote(ComputeJobHolder jobId, PlatformMemoryStream stream);
-        
-        /// <summary>
-        /// Perform task reduce.
-        /// </summary>
-        void Reduce();
-
-        /// <summary>
-        /// Complete task.
-        /// </summary>
-        /// <param name="taskHandle">Task handle.</param>
-        void Complete(long taskHandle);
-        
-        /// <summary>
-        /// Complete task with error.
-        /// </summary>
-        /// <param name="taskHandle">Task handle.</param>
-        /// <param name="stream">Stream with serialized exception.</param>
-        void CompleteWithError(long taskHandle, PlatformMemoryStream stream);
-    }
-
-    /// <summary>
-    /// Compute task holder.
-    /// </summary>
-    internal class ComputeTaskHolder<TA, T, TR> : IComputeTaskHolder
-    {
-        /** Empty results. */
-        private static readonly IList<IComputeJobResult<T>> EmptyRes =     
-            new ReadOnlyCollection<IComputeJobResult<T>>(new List<IComputeJobResult<T>>());
-
-        /** Compute instance. */
-        private readonly ComputeImpl _compute;
-
-        /** Actual task. */
-        private readonly IComputeTask<TA, T, TR> _task;
-
-        /** Task argument. */
-        private readonly TA _arg;
-
-        /** Results cache flag. */
-        private readonly bool _resCache;
-
-        /** Task future. */
-        private readonly Future<TR> _fut = new Future<TR>();
-                
-        /** Jobs whose results are cached. */
-        private ISet<object> _resJobs;
-
-        /** Cached results. */
-        private IList<IComputeJobResult<T>> _ress;
-
-        /** Handles for jobs which are not serialized right away. */
-        private volatile List<long> _jobHandles;
-        
-        /// <summary>
-        /// Constructor.
-        /// </summary>
-        /// <param name="grid">Grid.</param>
-        /// <param name="compute">Compute.</param>
-        /// <param name="task">Task.</param>
-        /// <param name="arg">Argument.</param>
-        public ComputeTaskHolder(Ignite grid, ComputeImpl compute, IComputeTask<TA, T, TR> task, TA arg)
-        {
-            _compute = compute;
-            _arg = arg;
-            _task = task;
-
-            ResourceTypeDescriptor resDesc = ResourceProcessor.Descriptor(task.GetType());
-
-            IComputeResourceInjector injector = task as IComputeResourceInjector;
-
-            if (injector != null)
-                injector.Inject(grid);
-            else
-                resDesc.InjectIgnite(task, grid);
-
-            _resCache = !resDesc.TaskNoResultCache;
-        }
-
-        /** <inheritDoc /> */
-        [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes",
-            Justification = "User code can throw any exception")]
-        public void Map(PlatformMemoryStream inStream, PlatformMemoryStream outStream)
-        {
-            IList<IClusterNode> subgrid;
-
-            ClusterGroupImpl prj = (ClusterGroupImpl)_compute.ClusterGroup;
-
-            var ignite = (Ignite) prj.Ignite;
-
-            // 1. Unmarshal topology info if topology changed.
-            var reader = prj.Marshaller.StartUnmarshal(inStream);
-
-            if (reader.ReadBoolean())
-            {
-                long topVer = reader.ReadLong();
-
-                List<IClusterNode> nodes = new List<IClusterNode>(reader.ReadInt());
-
-                int nodesCnt = reader.ReadInt();
-
-                subgrid = new List<IClusterNode>(nodesCnt);
-
-                for (int i = 0; i < nodesCnt; i++)
-                {
-                    IClusterNode node = ignite.GetNode(reader.ReadGuid());
-
-                    nodes.Add(node);
-
-                    if (reader.ReadBoolean())
-                        subgrid.Add(node);
-                }
-
-                // Update parent projection to help other task callers avoid this overhead.
-                // Note that there is a chance that topology changed even further and this update fails.
-                // It means that some of subgrid nodes could have left the Grid. This is not critical
-                // for us, because Java will handle it gracefully.
-                prj.UpdateTopology(topVer, nodes);
-            }
-            else
-            {
-                IList<IClusterNode> nodes = prj.NodesNoRefresh();
-
-                Debug.Assert(nodes != null, "At least one topology update should have occurred.");
-
-                subgrid = IgniteUtils.Shuffle(nodes);
-            }
-
-            // 2. Perform map.
-            IDictionary<IComputeJob<T>, IClusterNode> map;
-            Exception err;
-
-            try
-            {
-                map = _task.Map(subgrid, _arg);
-
-                err = null;
-            }
-            catch (Exception e)
-            {
-                map = null;
-
-                err = e;
-
-                // Java can receive another exception in case of marshalling failure but it is not important.
-                Finish(default(TR), e);
-            }
-
-            // 3. Write map result to the output stream.
-            PortableWriterImpl writer = prj.Marshaller.StartMarshal(outStream);
-
-            try
-            {
-                if (err == null)
-                {
-                    writer.WriteBoolean(true); // Success flag.
-
-                    if (map == null)
-                        writer.WriteBoolean(false); // Map produced no result.
-                    else
-                    {
-                        writer.WriteBoolean(true); // Map produced result.
-                        writer.WriteInt(map.Count); // Amount of mapped jobs.
-
-                        var jobHandles = new List<long>(map.Count);
-
-                        foreach (KeyValuePair<IComputeJob<T>, IClusterNode> mapEntry in map)
-                        {
-                            var job = new ComputeJobHolder(_compute.ClusterGroup.Ignite as Ignite, mapEntry.Key.ToNonGeneric());
-
-                            IClusterNode node = mapEntry.Value;
-
-                            var jobHandle = ignite.HandleRegistry.Allocate(job);
-
-                            jobHandles.Add(jobHandle);
-
-                            writer.WriteLong(jobHandle);
-
-                            if (node.IsLocal)
-                                writer.WriteBoolean(false); // Job is not serialized.
-                            else
-                            {
-                                writer.WriteBoolean(true); // Job is serialized.
-                                writer.WriteObject(job);
-                            }
-
-                            writer.WriteGuid(node.Id);
-                        }
-
-                        _jobHandles = jobHandles;
-                    }
-                }
-                else
-                {
-                    writer.WriteBoolean(false); // Map failed.
-
-                    // Write error as string because it is not important for Java, we need only to print
-                    // a message in the log.
-                    writer.WriteString("Map step failed [errType=" + err.GetType().Name +
-                        ", errMsg=" + err.Message + ']');
-                }
-            }
-            catch (Exception e)
-            {
-                // Something went wrong during marshaling.
-                Finish(default(TR), e);
-
-                outStream.Reset();
-                
-                writer.WriteBoolean(false); // Map failed.
-                writer.WriteString(e.Message); // Write error message.
-            }
-            finally
-            {
-                prj.Marshaller.FinishMarshal(writer);
-            }
-        }
-
-        /** <inheritDoc /> */
-        public int JobResultLocal(ComputeJobHolder job)
-        {
-            return (int)JobResult0(job.JobResult);
-        }
-
-        /** <inheritDoc /> */
-        [SuppressMessage("ReSharper", "PossibleInvalidOperationException")]
-        public int JobResultRemote(ComputeJobHolder job, PlatformMemoryStream stream)
-        {
-            // 1. Unmarshal result.
-            PortableReaderImpl reader = _compute.Marshaller.StartUnmarshal(stream);
-
-            Guid nodeId = reader.ReadGuid().Value;
-            bool cancelled = reader.ReadBoolean();
-
-            try
-            {
-                object err;
-
-                var data = PortableUtils.ReadWrappedInvocationResult(reader, out err);
-
-                // 2. Process the result.
-                return (int) JobResult0(new ComputeJobResultImpl(data, (Exception) err, job.Job, nodeId, cancelled));
-            }
-            catch (Exception e)
-            {
-                Finish(default(TR), e);
-
-                if (!(e is IgniteException))
-                    throw new IgniteException("Failed to process job result: " + e.Message, e);
-
-                throw;
-            }
-        }
-        
-        /** <inheritDoc /> */
-        public void Reduce()
-        {
-            try
-            {
-                TR taskRes = _task.Reduce(_resCache ? _ress : EmptyRes);
-
-                Finish(taskRes, null);
-            }
-            catch (Exception e)
-            {
-                Finish(default(TR), e);
-
-                if (!(e is IgniteException))
-                    throw new IgniteException("Failed to reduce task: " + e.Message, e);
-
-                throw;
-            }
-        }
-
-        /** <inheritDoc /> */
-        public void Complete(long taskHandle)
-        {
-            Clean(taskHandle);
-        }
-
-        /// <summary>
-        /// Complete task with error.
-        /// </summary>
-        /// <param name="taskHandle">Task handle.</param>
-        /// <param name="e">Error.</param>
-        public void CompleteWithError(long taskHandle, Exception e)
-        {
-            Finish(default(TR), e);
-
-            Clean(taskHandle);
-        }
-
-        /** <inheritDoc /> */
-        [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes",
-            Justification = "User object deserialization can throw any exception")]
-        public void CompleteWithError(long taskHandle, PlatformMemoryStream stream)
-        {
-            PortableReaderImpl reader = _compute.Marshaller.StartUnmarshal(stream);
-
-            Exception err;
-
-            try
-            {
-                if (reader.ReadBoolean())
-                {
-                    PortableResultWrapper res = reader.ReadObject<PortableUserObject>()
-                        .Deserialize<PortableResultWrapper>();
-
-                    err = (Exception) res.Result;
-                }
-                else
-                    err = ExceptionUtils.GetException(reader.ReadString(), reader.ReadString());
-            }
-            catch (Exception e)
-            {
-                err = new IgniteException("Task completed with error, but it cannot be unmarshalled: " + e.Message, e);
-            }
-
-            CompleteWithError(taskHandle, err);
-        }
-
-        /// <summary>
-        /// Task completion future.
-        /// </summary>
-        internal IFuture<TR> Future
-        {
-            get { return _fut; }
-        }
-
-        /// <summary>
-        /// Manually set job handles. Used by closures because they have separate flow for map step.
-        /// </summary>
-        /// <param name="jobHandles">Job handles.</param>
-        internal void JobHandles(List<long> jobHandles)
-        {
-            _jobHandles = jobHandles;
-        }
-
-        /// <summary>
-        /// Process job result.
-        /// </summary>
-        /// <param name="res">Result.</param>
-        private ComputeJobResultPolicy JobResult0(IComputeJobResult<object> res)
-        {
-            try
-            {
-                IList<IComputeJobResult<T>> ress0;
-
-                // 1. Prepare old results.
-                if (_resCache)
-                {
-                    if (_resJobs == null)
-                    {
-                        _resJobs = new HashSet<object>();
-
-                        _ress = new List<IComputeJobResult<T>>();
-                    }
-
-                    ress0 = _ress;
-                }
-                else
-                    ress0 = EmptyRes;
-
-                // 2. Invoke user code.
-                var policy = _task.Result(new ComputeJobResultGenericWrapper<T>(res), ress0);
-
-                // 3. Add result to the list only in case of success.
-                if (_resCache)
-                {
-                    var job = res.Job().Unwrap();
-
-                    if (!_resJobs.Add(job))
-                    {
-                        // Duplicate result => find and replace it with the new one.
-                        var oldRes = _ress.Single(item => item.Job() == job);
-
-                        _ress.Remove(oldRes);
-                    }
-
-                    _ress.Add(new ComputeJobResultGenericWrapper<T>(res));
-                }
-
-                return policy;
-            }
-            catch (Exception e)
-            {
-                Finish(default(TR), e);
-
-                if (!(e is IgniteException))
-                    throw new IgniteException("Failed to process job result: " + e.Message, e);
-
-                throw;
-            }
-        }
-
-        /// <summary>
-        /// Finish task.
-        /// </summary>
-        /// <param name="res">Result.</param>
-        /// <param name="err">Error.</param>
-        private void Finish(TR res, Exception err)
-        {
-            _fut.OnDone(res, err);
-        }
-
-        /// <summary>
-        /// Clean-up task resources.
-        /// </summary>
-        /// <param name="taskHandle"></param>
-        private void Clean(long taskHandle)
-        {
-            var handles = _jobHandles;
-
-            var handleRegistry = _compute.Marshaller.Ignite.HandleRegistry;
-
-            if (handles != null)
-                foreach (var handle in handles) 
-                    handleRegistry.Release(handle, true);
-
-            handleRegistry.Release(taskHandle, true);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/f0bac562/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerBatch.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerBatch.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerBatch.cs
deleted file mode 100644
index cbd26dd..0000000
--- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerBatch.cs
+++ /dev/null
@@ -1,269 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace Apache.Ignite.Core.Impl.Datastream
-{
-    using System;
-    using System.Collections.Concurrent;
-    using System.Collections.Generic;
-    using System.Diagnostics.CodeAnalysis;
-    using System.Threading;
-    using Apache.Ignite.Core.Common;
-    using Apache.Ignite.Core.Impl.Common;
-    using Apache.Ignite.Core.Impl.Portable;
-
-    /// <summary>
-    /// Data streamer batch.
-    /// </summary>
-    [SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable")]
-    internal class DataStreamerBatch<TK, TV>
-    {
-        /** Queue. */
-        private readonly ConcurrentQueue<object> _queue = new ConcurrentQueue<object>();
-
-        /** Lock for concurrency. */
-        private readonly ReaderWriterLockSlim _rwLock = new ReaderWriterLockSlim();
-
-        /** Previous batch. */
-        private volatile DataStreamerBatch<TK, TV> _prev;
-
-        /** Current queue size.*/
-        private volatile int _size;
-        
-        /** Send guard. */
-        private bool _sndGuard;
-
-        /** */
-        private readonly Future<object> _fut = new Future<object>();
-
-        /// <summary>
-        /// Constructor.
-        /// </summary>
-        public DataStreamerBatch() : this(null)
-        {
-            // No-op.
-        }
-
-        /// <summary>
-        /// Constructor.
-        /// </summary>
-        /// <param name="prev">Previous batch.</param>
-        public DataStreamerBatch(DataStreamerBatch<TK, TV> prev)
-        {
-            _prev = prev;
-
-            if (prev != null)
-                Thread.MemoryBarrier(); // Prevent "prev" field escape.
-
-            _fut.Listen(() => ParentsCompleted());
-        }
-
-        /// <summary>
-        /// Gets the future.
-        /// </summary>
-        public IFuture Future
-        {
-            get { return _fut; }
-        }
-
-        /// <summary>
-        /// Add object to the batch.
-        /// </summary>
-        /// <param name="val">Value.</param>
-        /// <param name="cnt">Items count.</param>
-        /// <returns>Positive value in case batch is active, -1 in case no more additions are allowed.</returns>
-        public int Add(object val, int cnt)
-        {
-            // If we cannot enter read-lock immediately, then send is scheduled and batch is definetely blocked.
-            if (!_rwLock.TryEnterReadLock(0))
-                return -1;
-
-            try 
-            {
-                // 1. Ensure additions are possible
-                if (_sndGuard)
-                    return -1;
-
-                // 2. Add data and increase size.
-                _queue.Enqueue(val);
-
-#pragma warning disable 0420
-                int newSize = Interlocked.Add(ref _size, cnt);
-#pragma warning restore 0420
-
-                return newSize;
-            }
-            finally
-            {
-                _rwLock.ExitReadLock();
-            }
-        }
-
-        /// <summary>
-        /// Internal send routine.
-        /// </summary>
-        /// <param name="ldr">streamer.</param>
-        /// <param name="plc">Policy.</param>
-        public void Send(DataStreamerImpl<TK, TV> ldr, int plc)
-        {
-            // 1. Delegate to the previous batch first.
-            DataStreamerBatch<TK, TV> prev0 = _prev;
-
-            if (prev0 != null)
-                prev0.Send(ldr, DataStreamerImpl<TK, TV>.PlcContinue);
-
-            // 2. Set guard.
-            _rwLock.EnterWriteLock();
-
-            try
-            {
-                if (_sndGuard)
-                    return;
-                else
-                    _sndGuard = true;
-            }
-            finally
-            {
-                _rwLock.ExitWriteLock();
-            }
-
-            var handleRegistry = ldr.Marshaller.Ignite.HandleRegistry;
-
-            long futHnd = 0;
-
-            // 3. Actual send.
-            ldr.Update(writer =>
-            {
-                writer.WriteInt(plc);
-
-                if (plc != DataStreamerImpl<TK, TV>.PlcCancelClose)
-                {
-                    futHnd = handleRegistry.Allocate(_fut);
-
-                    try
-                    {
-                        writer.WriteLong(futHnd);
-
-                        WriteTo(writer);
-                    }
-                    catch (Exception)
-                    {
-                        handleRegistry.Release(futHnd);
-
-                        throw;
-                    }
-                }
-            });
-
-            if (plc == DataStreamerImpl<TK, TV>.PlcCancelClose || _size == 0)
-            {
-                _fut.OnNullResult();
-                
-                handleRegistry.Release(futHnd);
-            }
-        }
-
-
-        /// <summary>
-        /// Await completion of current and all previous loads.
-        /// </summary>
-        public void AwaitCompletion()
-        {
-            DataStreamerBatch<TK, TV> curBatch = this;
-
-            while (curBatch != null)
-            {
-                try
-                {
-                    curBatch._fut.Get();
-                }
-                catch (Exception)
-                {
-                    // Ignore.
-                }
-
-                curBatch = curBatch._prev;
-            }
-        }
-
-        /// <summary>
-        /// Write batch content.
-        /// </summary>
-        /// <param name="writer">Portable writer.</param>
-        private void WriteTo(PortableWriterImpl writer)
-        {
-            writer.WriteInt(_size);
-
-            object val;
-
-            while (_queue.TryDequeue(out val))
-            {
-                // 1. Is it a collection?
-                ICollection<KeyValuePair<TK, TV>> entries = val as ICollection<KeyValuePair<TK, TV>>;
-
-                if (entries != null)
-                {
-                    foreach (KeyValuePair<TK, TV> item in entries)
-                    {
-                        writer.Write(item.Key);
-                        writer.Write(item.Value);
-                    }
-
-                    continue;
-                }
-
-                // 2. Is it a single entry?
-                DataStreamerEntry<TK, TV> entry = val as DataStreamerEntry<TK, TV>;
-
-                if (entry != null) {
-                    writer.Write(entry.Key);
-                    writer.Write(entry.Value);
-
-                    continue;
-                }
-
-                // 3. Is it remove merker?
-                DataStreamerRemoveEntry<TK> rmvEntry = val as DataStreamerRemoveEntry<TK>;
-
-                if (rmvEntry != null)
-                {
-                    writer.Write(rmvEntry.Key);
-                    writer.Write<object>(null);
-                }
-            }
-        }
-
-        /// <summary>
-        /// Checck whether all previous batches are completed.
-        /// </summary>
-        /// <returns></returns>
-        private bool ParentsCompleted()
-        {
-            DataStreamerBatch<TK, TV> prev0 = _prev;
-
-            if (prev0 != null)
-            {
-                if (prev0.ParentsCompleted())
-                    _prev = null;
-                else
-                    return false;
-            }
-
-            return _fut.IsDone;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/f0bac562/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerEntry.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerEntry.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerEntry.cs
deleted file mode 100644
index 41ee176..0000000
--- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerEntry.cs
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace Apache.Ignite.Core.Impl.Datastream
-{
-    /// <summary>
-    /// Data streamer entry.
-    /// </summary>
-    internal class DataStreamerEntry<TK, TV>
-    {
-        /** Key. */
-        private readonly TK _key;
-
-        /** Value. */
-        private readonly TV _val;
-
-        /// <summary>
-        /// Constructor.
-        /// </summary>
-        /// <param name="key">Key.</param>
-        /// <param name="val">Value.</param>
-        public DataStreamerEntry(TK key, TV val)
-        {
-            _key = key;
-            _val = val;
-        }
-
-        /// <summary>
-        /// Key.
-        /// </summary>
-        public TK Key
-        {
-            get
-            {
-                return _key;
-            }
-        }
-
-        /// <summary>
-        /// Value.
-        /// </summary>
-        public TV Value
-        {
-            get
-            {
-                return _val;
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/f0bac562/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs
deleted file mode 100644
index bf11397..0000000
--- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs
+++ /dev/null
@@ -1,832 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace Apache.Ignite.Core.Impl.Datastream
-{
-    using System;
-    using System.Collections.Generic;
-    using System.Threading;
-    using Apache.Ignite.Core.Common;
-    using Apache.Ignite.Core.Datastream;
-    using Apache.Ignite.Core.Impl.Common;
-    using Apache.Ignite.Core.Impl.Portable;
-    using Apache.Ignite.Core.Impl.Unmanaged;
-    using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils;
-
-    /// <summary>
-    /// Data streamer internal interface to get rid of generics.
-    /// </summary>
-    internal interface IDataStreamer
-    {
-        /// <summary>
-        /// Callback invoked on topology size change.
-        /// </summary>
-        /// <param name="topVer">New topology version.</param>
-        /// <param name="topSize">New topology size.</param>
-        void TopologyChange(long topVer, int topSize);
-    }
-
-    /// <summary>
-    /// Data streamer implementation.
-    /// </summary>
-    internal class DataStreamerImpl<TK, TV> : PlatformDisposableTarget, IDataStreamer, IDataStreamer<TK, TV>
-    {
-
-#pragma warning disable 0420
-
-        /** Policy: continue. */
-        internal const int PlcContinue = 0;
-
-        /** Policy: close. */
-        internal const int PlcClose = 1;
-
-        /** Policy: cancel and close. */
-        internal const int PlcCancelClose = 2;
-
-        /** Policy: flush. */
-        internal const int PlcFlush = 3;
-        
-        /** Operation: update. */
-        private const int OpUpdate = 1;
-        
-        /** Operation: set receiver. */
-        private const int OpReceiver = 2;
-        
-        /** Cache name. */
-        private readonly string _cacheName;
-
-        /** Lock. */
-        private readonly ReaderWriterLockSlim _rwLock = new ReaderWriterLockSlim();
-
-        /** Closed event. */
-        private readonly ManualResetEventSlim _closedEvt = new ManualResetEventSlim(false);
-
-        /** Close future. */
-        private readonly Future<object> _closeFut = new Future<object>();
-
-        /** GC handle to this streamer. */
-        private readonly long _hnd;
-                
-        /** Topology version. */
-        private long _topVer;
-
-        /** Topology size. */
-        private int _topSize;
-        
-        /** Buffer send size. */
-        private volatile int _bufSndSize;
-
-        /** Current data streamer batch. */
-        private volatile DataStreamerBatch<TK, TV> _batch;
-
-        /** Flusher. */
-        private readonly Flusher<TK, TV> _flusher;
-
-        /** Receiver. */
-        private volatile IStreamReceiver<TK, TV> _rcv;
-
-        /** Receiver handle. */
-        private long _rcvHnd;
-
-        /** Receiver portable mode. */
-        private readonly bool _keepPortable;
-
-        /// <summary>
-        /// Constructor.
-        /// </summary>
-        /// <param name="target">Target.</param>
-        /// <param name="marsh">Marshaller.</param>
-        /// <param name="cacheName">Cache name.</param>
-        /// <param name="keepPortable">Portable flag.</param>
-        public DataStreamerImpl(IUnmanagedTarget target, PortableMarshaller marsh, string cacheName, bool keepPortable)
-            : base(target, marsh)
-        {
-            _cacheName = cacheName;
-            _keepPortable = keepPortable;
-
-            // Create empty batch.
-            _batch = new DataStreamerBatch<TK, TV>();
-
-            // Allocate GC handle so that this data streamer could be easily dereferenced from native code.
-            WeakReference thisRef = new WeakReference(this);
-
-            _hnd = marsh.Ignite.HandleRegistry.Allocate(thisRef);
-
-            // Start topology listening. This call will ensure that buffer size member is updated.
-            UU.DataStreamerListenTopology(target, _hnd);
-
-            // Membar to ensure fields initialization before leaving constructor.
-            Thread.MemoryBarrier();
-
-            // Start flusher after everything else is initialized.
-            _flusher = new Flusher<TK, TV>(thisRef);
-
-            _flusher.RunThread();
-        }
-
-        /** <inheritDoc /> */
-        public string CacheName
-        {
-            get { return _cacheName; }
-        }
-
-        /** <inheritDoc /> */
-        public bool AllowOverwrite
-        {
-            get
-            {
-                _rwLock.EnterReadLock();
-
-                try
-                {
-                    ThrowIfDisposed();
-
-                    return UU.DataStreamerAllowOverwriteGet(Target);
-                }
-                finally
-                {
-                    _rwLock.ExitReadLock();
-                }
-            }
-            set
-            {
-                _rwLock.EnterWriteLock();
-
-                try
-                {
-                    ThrowIfDisposed();
-
-                    UU.DataStreamerAllowOverwriteSet(Target, value);
-                }
-                finally
-                {
-                    _rwLock.ExitWriteLock();
-                }
-            }
-        }
-
-        /** <inheritDoc /> */
-        public bool SkipStore
-        {
-            get
-            {
-                _rwLock.EnterReadLock(); 
-                
-                try
-                {
-                    ThrowIfDisposed();
-
-                    return UU.DataStreamerSkipStoreGet(Target);
-                }
-                finally
-                {
-                    _rwLock.ExitReadLock();
-                }
-            }
-            set
-            {
-                _rwLock.EnterWriteLock(); 
-                
-                try
-                {
-                    ThrowIfDisposed();
-
-                    UU.DataStreamerSkipStoreSet(Target, value);
-                }
-                finally
-                {
-                    _rwLock.ExitWriteLock();
-                }
-            }
-        }
-
-        /** <inheritDoc /> */
-        public int PerNodeBufferSize
-        {
-            get
-            {
-                _rwLock.EnterReadLock(); 
-                
-                try
-                {
-                    ThrowIfDisposed();
-
-                    return UU.DataStreamerPerNodeBufferSizeGet(Target);
-                }
-                finally
-                {
-                    _rwLock.ExitReadLock();
-                }
-            }
-            set
-            {
-                _rwLock.EnterWriteLock(); 
-                
-                try
-                {
-                    ThrowIfDisposed();
-
-                    UU.DataStreamerPerNodeBufferSizeSet(Target, value);
-
-                    _bufSndSize = _topSize * value;
-                }
-                finally
-                {
-                    _rwLock.ExitWriteLock();
-                }
-            }
-        }
-
-        /** <inheritDoc /> */
-        public int PerNodeParallelOperations
-        {
-            get
-            {
-                _rwLock.EnterReadLock(); 
-                
-                try
-                {
-                    ThrowIfDisposed();
-
-                    return UU.DataStreamerPerNodeParallelOperationsGet(Target);
-                }
-                finally
-                {
-                    _rwLock.ExitReadLock();
-                }
-
-            }
-            set
-            {
-                _rwLock.EnterWriteLock(); 
-                
-                try
-                {
-                    ThrowIfDisposed();
-
-                    UU.DataStreamerPerNodeParallelOperationsSet(Target, value);
-                }
-                finally
-                {
-                    _rwLock.ExitWriteLock();
-                }
-
-            }
-        }
-
-        /** <inheritDoc /> */
-        public long AutoFlushFrequency
-        {
-            get
-            {
-                _rwLock.EnterReadLock(); 
-                
-                try
-                {
-                    ThrowIfDisposed();
-
-                    return _flusher.Frequency;
-                }
-                finally
-                {
-                    _rwLock.ExitReadLock();
-                }
-
-            }
-            set
-            {
-                _rwLock.EnterWriteLock(); 
-                
-                try
-                {
-                    ThrowIfDisposed();
-
-                    _flusher.Frequency = value;
-                }
-                finally
-                {
-                    _rwLock.ExitWriteLock();
-                }
-            }
-        }
-
-        /** <inheritDoc /> */
-        public IFuture Future
-        {
-            get
-            {
-                ThrowIfDisposed();
-
-                return _closeFut;
-            }
-        }
-
-        /** <inheritDoc /> */
-        public IStreamReceiver<TK, TV> Receiver
-        {
-            get
-            {
-                ThrowIfDisposed();
-
-                return _rcv;
-            }
-            set
-            {
-                IgniteArgumentCheck.NotNull(value, "value");
-
-                var handleRegistry = Marshaller.Ignite.HandleRegistry;
-
-                _rwLock.EnterWriteLock();
-
-                try
-                {
-                    ThrowIfDisposed();
-
-                    if (_rcv == value)
-                        return;
-
-                    var rcvHolder = new StreamReceiverHolder(value,
-                        (rec, grid, cache, stream, keepPortable) =>
-                            StreamReceiverHolder.InvokeReceiver((IStreamReceiver<TK, TV>) rec, grid, cache, stream,
-                                keepPortable));
-
-                    var rcvHnd0 = handleRegistry.Allocate(rcvHolder);
-
-                    try
-                    {
-                        DoOutOp(OpReceiver, w =>
-                        {
-                            w.WriteLong(rcvHnd0);
-
-                            w.WriteObject(rcvHolder);
-                        });
-                    }
-                    catch (Exception)
-                    {
-                        handleRegistry.Release(rcvHnd0);
-                        throw;
-                    }
-
-                    if (_rcv != null)
-                        handleRegistry.Release(_rcvHnd);
-
-                    _rcv = value;
-                    _rcvHnd = rcvHnd0;
-                }
-                finally
-                {
-                    _rwLock.ExitWriteLock();
-                }
-            }
-        }
-
-        /** <inheritDoc /> */
-        public IFuture AddData(TK key, TV val)
-        {
-            ThrowIfDisposed(); 
-            
-            IgniteArgumentCheck.NotNull(key, "key");
-
-            return Add0(new DataStreamerEntry<TK, TV>(key, val), 1);
-        }
-
-        /** <inheritDoc /> */
-        public IFuture AddData(KeyValuePair<TK, TV> pair)
-        {
-            ThrowIfDisposed();
-
-            return Add0(new DataStreamerEntry<TK, TV>(pair.Key, pair.Value), 1);
-        }
-        
-        /** <inheritDoc /> */
-        public IFuture AddData(ICollection<KeyValuePair<TK, TV>> entries)
-        {
-            ThrowIfDisposed();
-
-            IgniteArgumentCheck.NotNull(entries, "entries");
-
-            return Add0(entries, entries.Count);
-        }
-
-        /** <inheritDoc /> */
-        public IFuture RemoveData(TK key)
-        {
-            ThrowIfDisposed();
-
-            IgniteArgumentCheck.NotNull(key, "key");
-
-            return Add0(new DataStreamerRemoveEntry<TK>(key), 1);
-        }
-
-        /** <inheritDoc /> */
-        public void TryFlush()
-        {
-            ThrowIfDisposed();
-
-            DataStreamerBatch<TK, TV> batch0 = _batch;
-
-            if (batch0 != null)
-                Flush0(batch0, false, PlcFlush);
-        }
-
-        /** <inheritDoc /> */
-        public void Flush()
-        {
-            ThrowIfDisposed();
-
-            DataStreamerBatch<TK, TV> batch0 = _batch;
-
-            if (batch0 != null)
-                Flush0(batch0, true, PlcFlush);
-            else 
-            {
-                // Batch is null, i.e. data streamer is closing. Wait for close to complete.
-                _closedEvt.Wait();
-            }
-        }
-
-        /** <inheritDoc /> */
-        public void Close(bool cancel)
-        {
-            _flusher.Stop();
-
-            while (true)
-            {
-                DataStreamerBatch<TK, TV> batch0 = _batch;
-
-                if (batch0 == null)
-                {
-                    // Wait for concurrent close to finish.
-                    _closedEvt.Wait();
-
-                    return;
-                }
-
-                if (Flush0(batch0, true, cancel ? PlcCancelClose : PlcClose))
-                {
-                    _closeFut.OnDone(null, null);
-
-                    _rwLock.EnterWriteLock(); 
-                    
-                    try
-                    {
-                        base.Dispose(true);
-
-                        if (_rcv != null)
-                            Marshaller.Ignite.HandleRegistry.Release(_rcvHnd);
-
-                        _closedEvt.Set();
-                    }
-                    finally
-                    {
-                        _rwLock.ExitWriteLock();
-                    }
-
-                    Marshaller.Ignite.HandleRegistry.Release(_hnd);
-
-                    break;
-                }
-            }
-        }
-
-        /** <inheritDoc /> */
-        public IDataStreamer<TK1, TV1> WithKeepPortable<TK1, TV1>()
-        {
-            if (_keepPortable)
-            {
-                var result = this as IDataStreamer<TK1, TV1>;
-
-                if (result == null)
-                    throw new InvalidOperationException(
-                        "Can't change type of portable streamer. WithKeepPortable has been called on an instance of " +
-                        "portable streamer with incompatible generic arguments.");
-
-                return result;
-            }
-
-            return new DataStreamerImpl<TK1, TV1>(UU.ProcessorDataStreamer(Marshaller.Ignite.InteropProcessor,
-                _cacheName, true), Marshaller, _cacheName, true);
-        }
-
-        /** <inheritDoc /> */
-        protected override void Dispose(bool disposing)
-        {
-            if (disposing)
-                Close(false);  // Normal dispose: do not cancel
-            else
-            {
-                // Finalizer: just close Java streamer
-                try
-                {
-                    if (_batch != null)
-                        _batch.Send(this, PlcCancelClose);
-                }
-                catch (Exception)
-                {
-                    // Finalizers should never throw
-                }
-
-                Marshaller.Ignite.HandleRegistry.Release(_hnd, true);
-                Marshaller.Ignite.HandleRegistry.Release(_rcvHnd, true);
-
-                base.Dispose(false);
-            }
-        }
-
-        /** <inheritDoc /> */
-        ~DataStreamerImpl()
-        {
-            Dispose(false);
-        }
-
-        /** <inheritDoc /> */
-        public void TopologyChange(long topVer, int topSize)
-        {
-            _rwLock.EnterWriteLock(); 
-            
-            try
-            {
-                ThrowIfDisposed();
-
-                if (_topVer < topVer)
-                {
-                    _topVer = topVer;
-                    _topSize = topSize;
-
-                    _bufSndSize = topSize * UU.DataStreamerPerNodeBufferSizeGet(Target);
-                }
-            }
-            finally
-            {
-                _rwLock.ExitWriteLock();
-            }
-
-        }
-
-        /// <summary>
-        /// Internal add/remove routine.
-        /// </summary>
-        /// <param name="val">Value.</param>
-        /// <param name="cnt">Items count.</param>
-        /// <returns>Future.</returns>
-        private IFuture Add0(object val, int cnt)
-        {
-            int bufSndSize0 = _bufSndSize;
-
-            while (true)
-            {
-                var batch0 = _batch;
-
-                if (batch0 == null)
-                    throw new InvalidOperationException("Data streamer is stopped.");
-
-                int size = batch0.Add(val, cnt);
-
-                if (size == -1)
-                {
-                    // Batch is blocked, perform CAS.
-                    Interlocked.CompareExchange(ref _batch,
-                        new DataStreamerBatch<TK, TV>(batch0), batch0);
-
-                    continue;
-                }
-                if (size >= bufSndSize0)
-                    // Batch is too big, schedule flush.
-                    Flush0(batch0, false, PlcContinue);
-
-                return batch0.Future;
-            }
-        }
-
-        /// <summary>
-        /// Internal flush routine.
-        /// </summary>
-        /// <param name="curBatch"></param>
-        /// <param name="wait">Whether to wait for flush to complete.</param>
-        /// <param name="plc">Whether this is the last batch.</param>
-        /// <returns>Whether this call was able to CAS previous batch</returns>
-        private bool Flush0(DataStreamerBatch<TK, TV> curBatch, bool wait, int plc)
-        {
-            // 1. Try setting new current batch to help further adders. 
-            bool res = Interlocked.CompareExchange(ref _batch, 
-                (plc == PlcContinue || plc == PlcFlush) ? 
-                new DataStreamerBatch<TK, TV>(curBatch) : null, curBatch) == curBatch;
-
-            // 2. Perform actual send.
-            curBatch.Send(this, plc);
-
-            if (wait)
-                // 3. Wait for all futures to finish.
-                curBatch.AwaitCompletion();
-
-            return res;
-        }
-
-        /// <summary>
-        /// Start write.
-        /// </summary>
-        /// <returns>Writer.</returns>
-        internal void Update(Action<PortableWriterImpl> action)
-        {
-            _rwLock.EnterReadLock();
-
-            try
-            {
-                ThrowIfDisposed();
-
-                DoOutOp(OpUpdate, action);
-            }
-            finally
-            {
-                _rwLock.ExitReadLock();
-            }
-        }
-
-        /// <summary>
-        /// Flusher.
-        /// </summary>
-        private class Flusher<TK1, TV1>
-        {
-            /** State: running. */
-            private const int StateRunning = 0;
-
-            /** State: stopping. */
-            private const int StateStopping = 1;
-
-            /** State: stopped. */
-            private const int StateStopped = 2;
-
-            /** Data streamer. */
-            private readonly WeakReference _ldrRef;
-
-            /** Finish flag. */
-            private int _state;
-
-            /** Flush frequency. */
-            private long _freq;
-
-            /// <summary>
-            /// Constructor.
-            /// </summary>
-            /// <param name="ldrRef">Data streamer weak reference..</param>
-            public Flusher(WeakReference ldrRef)
-            {
-                _ldrRef = ldrRef;
-
-                lock (this)
-                {
-                    _state = StateRunning;
-                }
-            }
-
-            /// <summary>
-            /// Main flusher routine.
-            /// </summary>
-            private void Run()
-            {
-                bool force = false;
-                long curFreq = 0;
-                
-                try
-                {
-                    while (true)
-                    {
-                        if (curFreq > 0 || force)
-                        {
-                            var ldr = _ldrRef.Target as DataStreamerImpl<TK1, TV1>;
-
-                            if (ldr == null)
-                                return;
-
-                            ldr.TryFlush();
-
-                            force = false;
-                        }
-
-                        lock (this)
-                        {
-                            // Stop immediately.
-                            if (_state == StateStopping)
-                                return;
-
-                            if (curFreq == _freq)
-                            {
-                                // Frequency is unchanged
-                                if (curFreq == 0)
-                                    // Just wait for a second and re-try.
-                                    Monitor.Wait(this, 1000);
-                                else
-                                {
-                                    // Calculate remaining time.
-                                    DateTime now = DateTime.Now;
-
-                                    long ticks;
-
-                                    try
-                                    {
-                                        ticks = now.AddMilliseconds(curFreq).Ticks - now.Ticks;
-
-                                        if (ticks > int.MaxValue)
-                                            ticks = int.MaxValue;
-                                    }
-                                    catch (ArgumentOutOfRangeException)
-                                    {
-                                        // Handle possible overflow.
-                                        ticks = int.MaxValue;
-                                    }
-
-                                    Monitor.Wait(this, TimeSpan.FromTicks(ticks));
-                                }
-                            }
-                            else
-                            {
-                                if (curFreq != 0)
-                                    force = true;
-
-                                curFreq = _freq;
-                            } 
-                        }
-                    }
-                }
-                finally
-                {
-                    // Let streamer know about stop.
-                    lock (this)
-                    {
-                        _state = StateStopped;
-
-                        Monitor.PulseAll(this);
-                    }
-                }
-            }
-            
-            /// <summary>
-            /// Frequency.
-            /// </summary>
-            public long Frequency
-            {
-                get
-                {
-                    return Interlocked.Read(ref _freq);
-                }
-
-                set
-                {
-                    lock (this)
-                    {
-                        if (_freq != value)
-                        {
-                            _freq = value;
-
-                            Monitor.PulseAll(this);
-                        }
-                    }
-                }
-            }
-
-            /// <summary>
-            /// Stop flusher.
-            /// </summary>
-            public void Stop()
-            {
-                lock (this)
-                {
-                    if (_state == StateRunning)
-                    {
-                        _state = StateStopping;
-
-                        Monitor.PulseAll(this);
-                    }
-
-                    while (_state != StateStopped)
-                        Monitor.Wait(this);
-                }
-            }
-
-            /// <summary>
-            /// Runs the flusher thread.
-            /// </summary>
-            public void RunThread()
-            {
-                new Thread(Run).Start();
-            }
-        }
-
-#pragma warning restore 0420
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/f0bac562/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerRemoveEntry.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerRemoveEntry.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerRemoveEntry.cs
deleted file mode 100644
index 7e65934..0000000
--- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerRemoveEntry.cs
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace Apache.Ignite.Core.Impl.Datastream
-{
-    /// <summary>
-    /// Remove marker.
-    /// </summary>
-    internal class DataStreamerRemoveEntry<TK>
-    {
-        /** Key to remove. */
-        private readonly TK _key;
-
-        /// <summary>
-        /// Constructor.
-        /// </summary>
-        /// <param name="key">Key.</param>
-        public DataStreamerRemoveEntry(TK key)
-        {
-            _key = key;
-        }
-
-        /// <summary>
-        /// Key.
-        /// </summary>
-        public TK Key
-        {
-            get
-            {
-                return _key;
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/f0bac562/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/StreamReceiverHolder.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/StreamReceiverHolder.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/StreamReceiverHolder.cs
deleted file mode 100644
index 5a7c104..0000000
--- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/StreamReceiverHolder.cs
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace Apache.Ignite.Core.Impl.Datastream
-{
-    using System;
-    using System.Collections.Generic;
-    using System.Diagnostics;
-    using Apache.Ignite.Core.Cache;
-    using Apache.Ignite.Core.Datastream;
-    using Apache.Ignite.Core.Impl.Cache;
-    using Apache.Ignite.Core.Impl.Common;
-    using Apache.Ignite.Core.Impl.Portable;
-    using Apache.Ignite.Core.Impl.Portable.IO;
-    using Apache.Ignite.Core.Impl.Unmanaged;
-    using Apache.Ignite.Core.Portable;
-
-    /// <summary>
-    /// Portable wrapper for <see cref="IStreamReceiver{TK,TV}"/>.
-    /// </summary>
-    internal class StreamReceiverHolder : IPortableWriteAware
-    {
-        /** */
-        private const byte RcvNormal = 0;
-
-        /** */
-        public const byte RcvTransformer = 1;
-
-        /** Generic receiver. */
-        private readonly object _rcv;
-        
-        /** Invoker delegate. */
-        private readonly Action<object, Ignite, IUnmanagedTarget, IPortableStream, bool> _invoke;
-
-        /// <summary>
-        /// Initializes a new instance of the <see cref="StreamReceiverHolder"/> class.
-        /// </summary>
-        /// <param name="reader">The reader.</param>
-        public StreamReceiverHolder(PortableReaderImpl reader)
-        {
-            var rcvType = reader.ReadByte();
-
-            _rcv = PortableUtils.ReadPortableOrSerializable<object>(reader);
-            
-            Debug.Assert(_rcv != null);
-
-            var type = _rcv.GetType();
-
-            if (rcvType == RcvTransformer)
-            {
-                // rcv is a user ICacheEntryProcessor<K, V, A, R>, construct StreamTransformer from it.
-                // (we can't marshal StreamTransformer directly, because it is generic, 
-                // and we do not know type arguments that user will have)
-                _rcv = DelegateTypeDescriptor.GetStreamTransformerCtor(type)(_rcv);
-            }
-
-            _invoke = DelegateTypeDescriptor.GetStreamReceiver(_rcv.GetType());
-        }
-
-        /// <summary>
-        /// Initializes a new instance of the <see cref="StreamReceiverHolder"/> class.
-        /// </summary>
-        /// <param name="rcv">Receiver.</param>
-        /// <param name="invoke">Invoke delegate.</param>
-        public StreamReceiverHolder(object rcv, 
-            Action<object, Ignite, IUnmanagedTarget, IPortableStream, bool> invoke)
-        {
-            Debug.Assert(rcv != null);
-            Debug.Assert(invoke != null);
-
-            _rcv = rcv;
-            _invoke = invoke;
-        }
-
-        /** <inheritdoc /> */
-        public void WritePortable(IPortableWriter writer)
-        {
-            var w = writer.RawWriter();
-
-            var writeAware = _rcv as IPortableWriteAware;
-
-            if (writeAware != null)
-                writeAware.WritePortable(writer);
-            else
-            {
-                w.WriteByte(RcvNormal);
-                PortableUtils.WritePortableOrSerializable((PortableWriterImpl) writer, _rcv);
-            }
-        }
-
-        /// <summary>
-        /// Updates cache with batch of entries.
-        /// </summary>
-        /// <param name="grid">The grid.</param>
-        /// <param name="cache">Cache.</param>
-        /// <param name="stream">Stream.</param>
-        /// <param name="keepPortable">Portable flag.</param>
-        public void Receive(Ignite grid, IUnmanagedTarget cache, IPortableStream stream, bool keepPortable)
-        {
-            Debug.Assert(grid != null);
-            Debug.Assert(cache != null);
-            Debug.Assert(stream != null);
-
-            _invoke(_rcv, grid, cache, stream, keepPortable);
-        }
-
-        /// <summary>
-        /// Invokes the receiver.
-        /// </summary>
-        /// <param name="receiver">Receiver.</param>
-        /// <param name="grid">Grid.</param>
-        /// <param name="cache">Cache.</param>
-        /// <param name="stream">Stream.</param>
-        /// <param name="keepPortable">Portable flag.</param>
-        public static void InvokeReceiver<TK, TV>(IStreamReceiver<TK, TV> receiver, Ignite grid, IUnmanagedTarget cache,
-            IPortableStream stream, bool keepPortable)
-        {
-            var reader = grid.Marshaller.StartUnmarshal(stream, keepPortable);
-
-            var size = reader.ReadInt();
-
-            var entries = new List<ICacheEntry<TK, TV>>(size);
-
-            for (var i = 0; i < size; i++)
-                entries.Add(new CacheEntry<TK, TV>(reader.ReadObject<TK>(), reader.ReadObject<TV>()));
-
-            receiver.Receive(grid.Cache<TK, TV>(cache, keepPortable), entries);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/f0bac562/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs
deleted file mode 100644
index 3972bb0..0000000
--- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs
+++ /dev/null
@@ -1,498 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace Apache.Ignite.Core.Impl.Events
-{
-    using System;
-    using System.Collections.Generic;
-    using System.Diagnostics;
-    using System.Diagnostics.CodeAnalysis;
-    using System.Linq;
-    using Apache.Ignite.Core.Cluster;
-    using Apache.Ignite.Core.Common;
-    using Apache.Ignite.Core.Events;
-    using Apache.Ignite.Core.Impl.Common;
-    using Apache.Ignite.Core.Impl.Handle;
-    using Apache.Ignite.Core.Impl.Portable;
-    using Apache.Ignite.Core.Impl.Portable.IO;
-    using Apache.Ignite.Core.Impl.Unmanaged;
-    using Apache.Ignite.Core.Portable;
-    using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils;
-
-    /// <summary>
-    /// Ignite events.
-    /// </summary>
-    internal class Events : PlatformTarget, IEvents
-    {
-        /// <summary>
-        /// Opcodes.
-        /// </summary>
-        protected enum Op
-        {
-            RemoteQuery = 1,
-            RemoteListen = 2,
-            StopRemoteListen = 3,
-            WaitForLocal = 4,
-            LocalQuery = 5,
-            RecordLocal = 6,
-            EnableLocal = 8,
-            DisableLocal = 9,
-            GetEnabledEvents = 10
-        }
-
-        /** Map from user func to local wrapper, needed for invoke/unsubscribe. */
-        private readonly Dictionary<object, Dictionary<int, LocalHandledEventFilter>> _localFilters
-            = new Dictionary<object, Dictionary<int, LocalHandledEventFilter>>();
-
-        /** Grid. */
-        protected readonly Ignite Ignite;
-
-        /// <summary>
-        /// Initializes a new instance of the <see cref="Events"/> class.
-        /// </summary>
-        /// <param name="target">Target.</param>
-        /// <param name="marsh">Marshaller.</param>
-        /// <param name="clusterGroup">Cluster group.</param>
-        public Events(IUnmanagedTarget target, PortableMarshaller marsh, IClusterGroup clusterGroup)
-            : base(target, marsh)
-        {
-            Debug.Assert(clusterGroup != null);
-
-            ClusterGroup = clusterGroup;
-
-            Ignite = (Ignite) clusterGroup.Ignite;
-        }
-
-        /** <inheritDoc /> */
-        public virtual IEvents WithAsync()
-        {
-            return new EventsAsync(UU.EventsWithAsync(Target), Marshaller, ClusterGroup);
-        }
-
-        /** <inheritDoc /> */
-        public virtual bool IsAsync
-        {
-            get { return false; }
-        }
-
-        /** <inheritDoc /> */
-        public virtual IFuture GetFuture()
-        {
-            throw IgniteUtils.GetAsyncModeDisabledException();
-        }
-
-        /** <inheritDoc /> */
-        public virtual IFuture<TResult> GetFuture<TResult>()
-        {
-            throw IgniteUtils.GetAsyncModeDisabledException();
-        }
-
-        /** <inheritDoc /> */
-        public IClusterGroup ClusterGroup { get; private set; }
-
-        /** <inheritDoc /> */
-        public virtual List<T> RemoteQuery<T>(IEventFilter<T> filter, TimeSpan? timeout = null, params int[] types)
-            where T : IEvent
-        {
-            IgniteArgumentCheck.NotNull(filter, "filter");
-
-            return DoOutInOp((int) Op.RemoteQuery,
-                writer =>
-                {
-                    writer.Write(new PortableOrSerializableObjectHolder(filter));
-
-                    writer.WriteLong((long) (timeout == null ? 0 : timeout.Value.TotalMilliseconds));
-
-                    WriteEventTypes(types, writer);
-                },
-                reader => ReadEvents<T>(reader));
-        }
-
-        /** <inheritDoc /> */
-        public virtual Guid RemoteListen<T>(int bufSize = 1, TimeSpan? interval = null, bool autoUnsubscribe = true,
-            IEventFilter<T> localListener = null, IEventFilter<T> remoteFilter = null, params int[] types)
-            where T : IEvent
-        {
-            IgniteArgumentCheck.Ensure(bufSize > 0, "bufSize", "should be > 0");
-            IgniteArgumentCheck.Ensure(interval == null || interval.Value.TotalMilliseconds > 0, "interval", "should be null or >= 0");
-
-            return DoOutInOp((int) Op.RemoteListen,
-                writer =>
-                {
-                    writer.WriteInt(bufSize);
-                    writer.WriteLong((long) (interval == null ? 0 : interval.Value.TotalMilliseconds));
-                    writer.WriteBoolean(autoUnsubscribe);
-
-                    writer.WriteBoolean(localListener != null);
-
-                    if (localListener != null)
-                    {
-                        var listener = new RemoteListenEventFilter(Ignite, (id, e) => localListener.Invoke(id, (T) e));
-                        writer.WriteLong(Ignite.HandleRegistry.Allocate(listener));
-                    }
-
-                    writer.WriteBoolean(remoteFilter != null);
-
-                    if (remoteFilter != null)
-                        writer.Write(new PortableOrSerializableObjectHolder(remoteFilter));
-
-                    WriteEventTypes(types, writer);
-                },
-                reader => Marshaller.StartUnmarshal(reader).ReadGuid() ?? Guid.Empty);
-        }
-
-        /** <inheritDoc /> */
-        public virtual void StopRemoteListen(Guid opId)
-        {
-            DoOutOp((int) Op.StopRemoteListen, writer =>
-            {
-                Marshaller.StartMarshal(writer).WriteGuid(opId);
-            });
-        }
-
-        /** <inheritDoc /> */
-        public IEvent WaitForLocal(params int[] types)
-        {
-            return WaitForLocal<IEvent>(null, types);
-        }
-
-        /** <inheritDoc /> */
-        public virtual T WaitForLocal<T>(IEventFilter<T> filter, params int[] types) where T : IEvent
-        {
-            long hnd = 0;
-
-            try
-            {
-                return WaitForLocal0(filter, ref hnd, types);
-            }
-            finally
-            {
-                if (filter != null)
-                    Ignite.HandleRegistry.Release(hnd);
-            }
-        }
-
-        /** <inheritDoc /> */
-        public List<IEvent> LocalQuery(params int[] types)
-        {
-            return DoOutInOp((int) Op.LocalQuery,
-                writer => WriteEventTypes(types, writer),
-                reader => ReadEvents<IEvent>(reader));
-        }
-
-        /** <inheritDoc /> */
-        public void RecordLocal(IEvent evt)
-        {
-            throw new NotImplementedException("GG-10244");
-        }
-
-        /** <inheritDoc /> */
-        public void LocalListen<T>(IEventFilter<T> listener, params int[] types) where T : IEvent
-        {
-            IgniteArgumentCheck.NotNull(listener, "listener");
-            IgniteArgumentCheck.NotNullOrEmpty(types, "types");
-
-            foreach (var type in types)
-                LocalListen(listener, type);
-        }
-
-        /** <inheritDoc /> */
-        public bool StopLocalListen<T>(IEventFilter<T> listener, params int[] types) where T : IEvent
-        {
-            lock (_localFilters)
-            {
-                Dictionary<int, LocalHandledEventFilter> filters;
-
-                if (!_localFilters.TryGetValue(listener, out filters))
-                    return false;
-
-                var success = false;
-
-                // Should do this inside lock to avoid race with subscription
-                // ToArray is required because we are going to modify underlying dictionary during enumeration
-                foreach (var filter in GetLocalFilters(listener, types).ToArray())
-                    success |= UU.EventsStopLocalListen(Target, filter.Handle);
-
-                return success;
-            }
-        }
-
-        /** <inheritDoc /> */
-        public void EnableLocal(params int[] types)
-        {
-            IgniteArgumentCheck.NotNullOrEmpty(types, "types");
-
-            DoOutOp((int)Op.EnableLocal, writer => WriteEventTypes(types, writer));
-        }
-
-        /** <inheritDoc /> */
-        public void DisableLocal(params int[] types)
-        {
-            IgniteArgumentCheck.NotNullOrEmpty(types, "types");
-
-            DoOutOp((int)Op.DisableLocal, writer => WriteEventTypes(types, writer));
-        }
-
-        /** <inheritDoc /> */
-        public int[] GetEnabledEvents()
-        {
-            return DoInOp((int)Op.GetEnabledEvents, reader => ReadEventTypes(reader));
-        }
-
-        /** <inheritDoc /> */
-        public bool IsEnabled(int type)
-        {
-            return UU.EventsIsEnabled(Target, type);
-        }
-
-        /// <summary>
-        /// Waits for the specified events.
-        /// </summary>
-        /// <typeparam name="T">Type of events.</typeparam>
-        /// <param name="filter">Optional filtering predicate. Event wait will end as soon as it returns false.</param>
-        /// <param name="handle">The filter handle, if applicable.</param>
-        /// <param name="types">Types of the events to wait for. 
-        /// If not provided, all events will be passed to the filter.</param>
-        /// <returns>Ignite event.</returns>
-        protected T WaitForLocal0<T>(IEventFilter<T> filter, ref long handle, params int[] types) where T : IEvent
-        {
-            if (filter != null)
-                handle = Ignite.HandleRegistry.Allocate(new LocalEventFilter
-                {
-                    InvokeFunc = stream => InvokeLocalFilter(stream, filter)
-                });
-
-            var hnd = handle;
-
-            return DoOutInOp((int)Op.WaitForLocal,
-                writer =>
-                {
-                    if (filter != null)
-                    {
-                        writer.WriteBoolean(true);
-                        writer.WriteLong(hnd);
-                    }
-                    else
-                        writer.WriteBoolean(false);
-
-                    WriteEventTypes(types, writer);
-                },
-                reader => EventReader.Read<T>(Marshaller.StartUnmarshal(reader)));
-        }
-
-        /// <summary>
-        /// Reads events from a portable stream.
-        /// </summary>
-        /// <typeparam name="T">Event type.</typeparam>
-        /// <param name="reader">Reader.</param>
-        /// <returns>Resulting list or null.</returns>
-        private List<T> ReadEvents<T>(IPortableStream reader) where T : IEvent
-        {
-            return ReadEvents<T>(Marshaller.StartUnmarshal(reader));
-        }
-
-        /// <summary>
-        /// Reads events from a portable reader.
-        /// </summary>
-        /// <typeparam name="T">Event type.</typeparam>
-        /// <param name="portableReader">Reader.</param>
-        /// <returns>Resulting list or null.</returns>
-        protected static List<T> ReadEvents<T>(PortableReaderImpl portableReader) where T : IEvent
-        {
-            var count = portableReader.RawReader().ReadInt();
-
-            if (count == -1)
-                return null;
-
-            var result = new List<T>(count);
-
-            for (var i = 0; i < count; i++)
-                result.Add(EventReader.Read<T>(portableReader));
-
-            return result;
-        }
-
-        /// <summary>
-        /// Gets local filters by user listener and event type.
-        /// </summary>
-        /// <param name="listener">Listener.</param>
-        /// <param name="types">Types.</param>
-        /// <returns>Collection of local listener wrappers.</returns>
-        [SuppressMessage("ReSharper", "InconsistentlySynchronizedField",
-            Justification = "This private method should be always called within a lock on localFilters")]
-        private IEnumerable<LocalHandledEventFilter> GetLocalFilters(object listener, int[] types)
-        {
-            Dictionary<int, LocalHandledEventFilter> filters;
-
-            if (!_localFilters.TryGetValue(listener, out filters))
-                return Enumerable.Empty<LocalHandledEventFilter>();
-
-            if (types.Length == 0)
-                return filters.Values;
-
-            return types.Select(type =>
-            {
-                LocalHandledEventFilter filter;
-
-                return filters.TryGetValue(type, out filter) ? filter : null;
-            }).Where(x => x != null);
-        }
-
-        /// <summary>
-        /// Adds an event listener for local events.
-        /// </summary>
-        /// <typeparam name="T">Type of events.</typeparam>
-        /// <param name="listener">Predicate that is called on each received event.</param>
-        /// <param name="type">Event type for which this listener will be notified</param>
-        private void LocalListen<T>(IEventFilter<T> listener, int type) where T : IEvent
-        {
-            lock (_localFilters)
-            {
-                Dictionary<int, LocalHandledEventFilter> filters;
-
-                if (!_localFilters.TryGetValue(listener, out filters))
-                {
-                    filters = new Dictionary<int, LocalHandledEventFilter>();
-
-                    _localFilters[listener] = filters;
-                }
-
-                LocalHandledEventFilter localFilter;
-
-                if (!filters.TryGetValue(type, out localFilter))
-                {
-                    localFilter = CreateLocalFilter(listener, type);
-
-                    filters[type] = localFilter;
-                }
-
-                UU.EventsLocalListen(Target, localFilter.Handle, type);
-            }
-        }
-
-        /// <summary>
-        /// Creates a user filter wrapper.
-        /// </summary>
-        /// <typeparam name="T">Event object type.</typeparam>
-        /// <param name="listener">Listener.</param>
-        /// <param name="type">Event type.</param>
-        /// <returns>Created wrapper.</returns>
-        private LocalHandledEventFilter CreateLocalFilter<T>(IEventFilter<T> listener, int type) where T : IEvent
-        {
-            var result = new LocalHandledEventFilter(
-                stream => InvokeLocalFilter(stream, listener),
-                unused =>
-                {
-                    lock (_localFilters)
-                    {
-                        Dictionary<int, LocalHandledEventFilter> filters;
-
-                        if (_localFilters.TryGetValue(listener, out filters))
-                        {
-                            filters.Remove(type);
-
-                            if (filters.Count == 0)
-                                _localFilters.Remove(listener);
-                        }
-                    }
-                });
-
-            result.Handle = Ignite.HandleRegistry.Allocate(result);
-
-            return result;
-        }
-
-        /// <summary>
-        /// Invokes local filter using data from specified stream.
-        /// </summary>
-        /// <typeparam name="T">Event object type.</typeparam>
-        /// <param name="stream">The stream.</param>
-        /// <param name="listener">The listener.</param>
-        /// <returns>Filter invocation result.</returns>
-        private bool InvokeLocalFilter<T>(IPortableStream stream, IEventFilter<T> listener) where T : IEvent
-        {
-            var evt = EventReader.Read<T>(Marshaller.StartUnmarshal(stream));
-
-            // No guid in local mode
-            return listener.Invoke(Guid.Empty, evt);
-        }
-
-        /// <summary>
-        /// Writes the event types.
-        /// </summary>
-        /// <param name="types">Types.</param>
-        /// <param name="writer">Writer.</param>
-        private static void WriteEventTypes(int[] types, IPortableRawWriter writer)
-        {
-            if (types.Length == 0)
-                types = null;  // empty array means no type filtering
-
-            writer.WriteIntArray(types);
-        }
-
-        /// <summary>
-        /// Writes the event types.
-        /// </summary>
-        /// <param name="reader">Reader.</param>
-        private int[] ReadEventTypes(IPortableStream reader)
-        {
-            return Marshaller.StartUnmarshal(reader).ReadIntArray();
-        }
-
-        /// <summary>
-        /// Local user filter wrapper.
-        /// </summary>
-        private class LocalEventFilter : IInteropCallback
-        {
-            /** */
-            public Func<IPortableStream, bool> InvokeFunc;
-
-            /** <inheritdoc /> */
-            public int Invoke(IPortableStream stream)
-            {
-                return InvokeFunc(stream) ? 1 : 0;
-            }
-        }
-
-        /// <summary>
-        /// Local user filter wrapper with handle.
-        /// </summary>
-        private class LocalHandledEventFilter : Handle<Func<IPortableStream, bool>>, IInteropCallback
-        {
-            /** */
-            public long Handle;
-
-            /** <inheritdoc /> */
-            public int Invoke(IPortableStream stream)
-            {
-                return Target(stream) ? 1 : 0;
-            }
-
-            /// <summary>
-            /// Initializes a new instance of the <see cref="LocalHandledEventFilter"/> class.
-            /// </summary>
-            /// <param name="invokeFunc">The invoke function.</param>
-            /// <param name="releaseAction">The release action.</param>
-            public LocalHandledEventFilter(
-                Func<IPortableStream, bool> invokeFunc, Action<Func<IPortableStream, bool>> releaseAction) 
-                : base(invokeFunc, releaseAction)
-            {
-                // No-op.
-            }
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/f0bac562/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Events/EventsAsync.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Events/EventsAsync.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Events/EventsAsync.cs
deleted file mode 100644
index 632d8b8..0000000
--- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Events/EventsAsync.cs
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace Apache.Ignite.Core.Impl.Events
-{
-    using System;
-    using System.Collections.Generic;
-    using System.Diagnostics.CodeAnalysis;
-    using System.Threading;
-    using Apache.Ignite.Core.Cluster;
-    using Apache.Ignite.Core.Common;
-    using Apache.Ignite.Core.Events;
-    using Apache.Ignite.Core.Impl.Portable;
-    using Apache.Ignite.Core.Impl.Unmanaged;
-    using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils;
-
-    /// <summary>
-    /// Async Ignite events.
-    /// </summary>
-    [SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable")]
-    internal class EventsAsync : Events
-    {
-        /** */
-        private readonly ThreadLocal<int> _lastAsyncOp = new ThreadLocal<int>(() => OpNone);
-
-        /** */
-        private readonly ThreadLocal<IFuture> _curFut = new ThreadLocal<IFuture>();
-
-        /// <summary>
-        /// Initializes a new instance of the <see cref="Events"/> class.
-        /// </summary>
-        /// <param name="target">Target.</param>
-        /// <param name="marsh">Marshaller.</param>
-        /// <param name="clusterGroup">Cluster group.</param>
-        public EventsAsync(IUnmanagedTarget target, PortableMarshaller marsh, IClusterGroup clusterGroup)
-            : base(target, marsh, clusterGroup)
-        {
-            // No-op.
-        }
-
-        /** <inheritdoc /> */
-        public override List<T> RemoteQuery<T>(IEventFilter<T> filter, TimeSpan? timeout = null, params int[] types)
-        {
-            _lastAsyncOp.Value = (int) Op.RemoteQuery;
-
-            var result = base.RemoteQuery(filter, timeout, types);
-
-            // Result is a List<T> so we can't create proper converter later in GetFuture call from user.
-            // ReSharper disable once RedundantTypeArgumentsOfMethod (otherwise won't compile in VS2010 / TC)
-            _curFut.Value = GetFuture<List<T>>((futId, futTyp) => UU.TargetListenFutureForOperation(Target, futId, futTyp,
-                (int) Op.RemoteQuery), convertFunc: ReadEvents<T>);
-
-            return result;
-        }
-
-        /** <inheritdoc /> */
-        public override Guid RemoteListen<T>(int bufSize = 1, TimeSpan? interval = null, bool autoUnsubscribe = true,
-            IEventFilter<T> localListener = null, IEventFilter<T> remoteFilter = null, params int[] types)
-        {
-            _lastAsyncOp.Value = (int) Op.RemoteListen;
-            _curFut.Value = null;
-
-            return base.RemoteListen(bufSize, interval, autoUnsubscribe, localListener, remoteFilter, types);
-        }
-
-        /** <inheritdoc /> */
-        public override void StopRemoteListen(Guid opId)
-        {
-            _lastAsyncOp.Value = (int) Op.StopRemoteListen;
-            _curFut.Value = null;
-
-            base.StopRemoteListen(opId);
-        }
-
-        /** <inheritdoc /> */
-        public override T WaitForLocal<T>(IEventFilter<T> filter, params int[] types)
-        {
-            _lastAsyncOp.Value = (int) Op.WaitForLocal;
-
-            long hnd = 0;
-
-            try
-            {
-                var result = WaitForLocal0(filter, ref hnd, types);
-
-                if (filter != null)
-                {
-                    // Dispose handle as soon as future ends.
-                    var fut = GetFuture<T>();
-
-                    _curFut.Value = fut;
-
-                    fut.Listen(() => Ignite.HandleRegistry.Release(hnd));
-                }
-                else
-                    _curFut.Value = null;
-
-                return result;
-            }
-            catch (Exception)
-            {
-                Ignite.HandleRegistry.Release(hnd);
-                throw;
-            }
-        }
-
-        /** <inheritdoc /> */
-        public override IEvents WithAsync()
-        {
-            return this;
-        }
-
-        /** <inheritdoc /> */
-        public override bool IsAsync
-        {
-            get { return true; }
-        }
-
-        /** <inheritdoc /> */
-        public override IFuture GetFuture()
-        {
-            return GetFuture<object>();
-        }
-
-        /** <inheritdoc /> */
-        public override IFuture<T> GetFuture<T>()
-        {
-            if (_curFut.Value != null)
-            {
-                var fut = _curFut.Value;
-                _curFut.Value = null;
-                return (IFuture<T>) fut;
-            }
-
-            Func<PortableReaderImpl, T> converter = null;
-
-            if (_lastAsyncOp.Value == (int) Op.WaitForLocal)
-                converter = reader => (T) EventReader.Read<IEvent>(reader);
-
-            return GetFuture((futId, futTyp) => UU.TargetListenFutureForOperation(Target, futId, futTyp, _lastAsyncOp.Value),
-                convertFunc: converter);
-        }
-    }
-}
\ No newline at end of file


Mime
View raw message