ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [40/55] [abbrv] ignite git commit: IGNITE-1348: Moved GridGain's .Net module to Ignite.
Date Fri, 04 Sep 2015 16:27:54 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs
new file mode 100644
index 0000000..bf11397
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs
@@ -0,0 +1,832 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Datastream
+{
+    using System;
+    using System.Collections.Generic;
+    using System.Threading;
+    using Apache.Ignite.Core.Common;
+    using Apache.Ignite.Core.Datastream;
+    using Apache.Ignite.Core.Impl.Common;
+    using Apache.Ignite.Core.Impl.Portable;
+    using Apache.Ignite.Core.Impl.Unmanaged;
+    using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils;
+
+    /// <summary>
+    /// Data streamer internal interface to get rid of generics.
+    /// </summary>
+    internal interface IDataStreamer
+    {
+        /// <summary>
+        /// Callback invoked on topology size change.
+        /// </summary>
+        /// <param name="topVer">New topology version.</param>
+        /// <param name="topSize">New topology size.</param>
+        void TopologyChange(long topVer, int topSize);
+    }
+
+    /// <summary>
+    /// Data streamer implementation.
+    /// </summary>
+    internal class DataStreamerImpl<TK, TV> : PlatformDisposableTarget, IDataStreamer, IDataStreamer<TK, TV>
+    {
+
+#pragma warning disable 0420
+
+        /** Policy: continue. */
+        internal const int PlcContinue = 0;
+
+        /** Policy: close. */
+        internal const int PlcClose = 1;
+
+        /** Policy: cancel and close. */
+        internal const int PlcCancelClose = 2;
+
+        /** Policy: flush. */
+        internal const int PlcFlush = 3;
+        
+        /** Operation: update. */
+        private const int OpUpdate = 1;
+        
+        /** Operation: set receiver. */
+        private const int OpReceiver = 2;
+        
+        /** Cache name. */
+        private readonly string _cacheName;
+
+        /** Lock. */
+        private readonly ReaderWriterLockSlim _rwLock = new ReaderWriterLockSlim();
+
+        /** Closed event. */
+        private readonly ManualResetEventSlim _closedEvt = new ManualResetEventSlim(false);
+
+        /** Close future. */
+        private readonly Future<object> _closeFut = new Future<object>();
+
+        /** GC handle to this streamer. */
+        private readonly long _hnd;
+                
+        /** Topology version. */
+        private long _topVer;
+
+        /** Topology size. */
+        private int _topSize;
+        
+        /** Buffer send size. */
+        private volatile int _bufSndSize;
+
+        /** Current data streamer batch. */
+        private volatile DataStreamerBatch<TK, TV> _batch;
+
+        /** Flusher. */
+        private readonly Flusher<TK, TV> _flusher;
+
+        /** Receiver. */
+        private volatile IStreamReceiver<TK, TV> _rcv;
+
+        /** Receiver handle. */
+        private long _rcvHnd;
+
+        /** Receiver portable mode. */
+        private readonly bool _keepPortable;
+
+        /// <summary>
+        /// Constructor.
+        /// </summary>
+        /// <param name="target">Target.</param>
+        /// <param name="marsh">Marshaller.</param>
+        /// <param name="cacheName">Cache name.</param>
+        /// <param name="keepPortable">Portable flag.</param>
+        public DataStreamerImpl(IUnmanagedTarget target, PortableMarshaller marsh, string cacheName, bool keepPortable)
+            : base(target, marsh)
+        {
+            _cacheName = cacheName;
+            _keepPortable = keepPortable;
+
+            // Create empty batch.
+            _batch = new DataStreamerBatch<TK, TV>();
+
+            // Allocate GC handle so that this data streamer could be easily dereferenced from native code.
+            WeakReference thisRef = new WeakReference(this);
+
+            _hnd = marsh.Ignite.HandleRegistry.Allocate(thisRef);
+
+            // Start topology listening. This call will ensure that buffer size member is updated.
+            UU.DataStreamerListenTopology(target, _hnd);
+
+            // Membar to ensure fields initialization before leaving constructor.
+            Thread.MemoryBarrier();
+
+            // Start flusher after everything else is initialized.
+            _flusher = new Flusher<TK, TV>(thisRef);
+
+            _flusher.RunThread();
+        }
+
+        /** <inheritDoc /> */
+        public string CacheName
+        {
+            get { return _cacheName; }
+        }
+
+        /** <inheritDoc /> */
+        public bool AllowOverwrite
+        {
+            get
+            {
+                _rwLock.EnterReadLock();
+
+                try
+                {
+                    ThrowIfDisposed();
+
+                    return UU.DataStreamerAllowOverwriteGet(Target);
+                }
+                finally
+                {
+                    _rwLock.ExitReadLock();
+                }
+            }
+            set
+            {
+                _rwLock.EnterWriteLock();
+
+                try
+                {
+                    ThrowIfDisposed();
+
+                    UU.DataStreamerAllowOverwriteSet(Target, value);
+                }
+                finally
+                {
+                    _rwLock.ExitWriteLock();
+                }
+            }
+        }
+
+        /** <inheritDoc /> */
+        public bool SkipStore
+        {
+            get
+            {
+                _rwLock.EnterReadLock(); 
+                
+                try
+                {
+                    ThrowIfDisposed();
+
+                    return UU.DataStreamerSkipStoreGet(Target);
+                }
+                finally
+                {
+                    _rwLock.ExitReadLock();
+                }
+            }
+            set
+            {
+                _rwLock.EnterWriteLock(); 
+                
+                try
+                {
+                    ThrowIfDisposed();
+
+                    UU.DataStreamerSkipStoreSet(Target, value);
+                }
+                finally
+                {
+                    _rwLock.ExitWriteLock();
+                }
+            }
+        }
+
+        /** <inheritDoc /> */
+        public int PerNodeBufferSize
+        {
+            get
+            {
+                _rwLock.EnterReadLock(); 
+                
+                try
+                {
+                    ThrowIfDisposed();
+
+                    return UU.DataStreamerPerNodeBufferSizeGet(Target);
+                }
+                finally
+                {
+                    _rwLock.ExitReadLock();
+                }
+            }
+            set
+            {
+                _rwLock.EnterWriteLock(); 
+                
+                try
+                {
+                    ThrowIfDisposed();
+
+                    UU.DataStreamerPerNodeBufferSizeSet(Target, value);
+
+                    _bufSndSize = _topSize * value;
+                }
+                finally
+                {
+                    _rwLock.ExitWriteLock();
+                }
+            }
+        }
+
+        /** <inheritDoc /> */
+        public int PerNodeParallelOperations
+        {
+            get
+            {
+                _rwLock.EnterReadLock(); 
+                
+                try
+                {
+                    ThrowIfDisposed();
+
+                    return UU.DataStreamerPerNodeParallelOperationsGet(Target);
+                }
+                finally
+                {
+                    _rwLock.ExitReadLock();
+                }
+
+            }
+            set
+            {
+                _rwLock.EnterWriteLock(); 
+                
+                try
+                {
+                    ThrowIfDisposed();
+
+                    UU.DataStreamerPerNodeParallelOperationsSet(Target, value);
+                }
+                finally
+                {
+                    _rwLock.ExitWriteLock();
+                }
+
+            }
+        }
+
+        /** <inheritDoc /> */
+        public long AutoFlushFrequency
+        {
+            get
+            {
+                _rwLock.EnterReadLock(); 
+                
+                try
+                {
+                    ThrowIfDisposed();
+
+                    return _flusher.Frequency;
+                }
+                finally
+                {
+                    _rwLock.ExitReadLock();
+                }
+
+            }
+            set
+            {
+                _rwLock.EnterWriteLock(); 
+                
+                try
+                {
+                    ThrowIfDisposed();
+
+                    _flusher.Frequency = value;
+                }
+                finally
+                {
+                    _rwLock.ExitWriteLock();
+                }
+            }
+        }
+
+        /** <inheritDoc /> */
+        public IFuture Future
+        {
+            get
+            {
+                ThrowIfDisposed();
+
+                return _closeFut;
+            }
+        }
+
+        /** <inheritDoc /> */
+        public IStreamReceiver<TK, TV> Receiver
+        {
+            get
+            {
+                ThrowIfDisposed();
+
+                return _rcv;
+            }
+            set
+            {
+                IgniteArgumentCheck.NotNull(value, "value");
+
+                var handleRegistry = Marshaller.Ignite.HandleRegistry;
+
+                _rwLock.EnterWriteLock();
+
+                try
+                {
+                    ThrowIfDisposed();
+
+                    if (_rcv == value)
+                        return;
+
+                    var rcvHolder = new StreamReceiverHolder(value,
+                        (rec, grid, cache, stream, keepPortable) =>
+                            StreamReceiverHolder.InvokeReceiver((IStreamReceiver<TK, TV>) rec, grid, cache, stream,
+                                keepPortable));
+
+                    var rcvHnd0 = handleRegistry.Allocate(rcvHolder);
+
+                    try
+                    {
+                        DoOutOp(OpReceiver, w =>
+                        {
+                            w.WriteLong(rcvHnd0);
+
+                            w.WriteObject(rcvHolder);
+                        });
+                    }
+                    catch (Exception)
+                    {
+                        handleRegistry.Release(rcvHnd0);
+                        throw;
+                    }
+
+                    if (_rcv != null)
+                        handleRegistry.Release(_rcvHnd);
+
+                    _rcv = value;
+                    _rcvHnd = rcvHnd0;
+                }
+                finally
+                {
+                    _rwLock.ExitWriteLock();
+                }
+            }
+        }
+
+        /** <inheritDoc /> */
+        public IFuture AddData(TK key, TV val)
+        {
+            ThrowIfDisposed(); 
+            
+            IgniteArgumentCheck.NotNull(key, "key");
+
+            return Add0(new DataStreamerEntry<TK, TV>(key, val), 1);
+        }
+
+        /** <inheritDoc /> */
+        public IFuture AddData(KeyValuePair<TK, TV> pair)
+        {
+            ThrowIfDisposed();
+
+            return Add0(new DataStreamerEntry<TK, TV>(pair.Key, pair.Value), 1);
+        }
+        
+        /** <inheritDoc /> */
+        public IFuture AddData(ICollection<KeyValuePair<TK, TV>> entries)
+        {
+            ThrowIfDisposed();
+
+            IgniteArgumentCheck.NotNull(entries, "entries");
+
+            return Add0(entries, entries.Count);
+        }
+
+        /** <inheritDoc /> */
+        public IFuture RemoveData(TK key)
+        {
+            ThrowIfDisposed();
+
+            IgniteArgumentCheck.NotNull(key, "key");
+
+            return Add0(new DataStreamerRemoveEntry<TK>(key), 1);
+        }
+
+        /** <inheritDoc /> */
+        public void TryFlush()
+        {
+            ThrowIfDisposed();
+
+            DataStreamerBatch<TK, TV> batch0 = _batch;
+
+            if (batch0 != null)
+                Flush0(batch0, false, PlcFlush);
+        }
+
+        /** <inheritDoc /> */
+        public void Flush()
+        {
+            ThrowIfDisposed();
+
+            DataStreamerBatch<TK, TV> batch0 = _batch;
+
+            if (batch0 != null)
+                Flush0(batch0, true, PlcFlush);
+            else 
+            {
+                // Batch is null, i.e. data streamer is closing. Wait for close to complete.
+                _closedEvt.Wait();
+            }
+        }
+
+        /** <inheritDoc /> */
+        public void Close(bool cancel)
+        {
+            _flusher.Stop();
+
+            while (true)
+            {
+                DataStreamerBatch<TK, TV> batch0 = _batch;
+
+                if (batch0 == null)
+                {
+                    // Wait for concurrent close to finish.
+                    _closedEvt.Wait();
+
+                    return;
+                }
+
+                if (Flush0(batch0, true, cancel ? PlcCancelClose : PlcClose))
+                {
+                    _closeFut.OnDone(null, null);
+
+                    _rwLock.EnterWriteLock(); 
+                    
+                    try
+                    {
+                        base.Dispose(true);
+
+                        if (_rcv != null)
+                            Marshaller.Ignite.HandleRegistry.Release(_rcvHnd);
+
+                        _closedEvt.Set();
+                    }
+                    finally
+                    {
+                        _rwLock.ExitWriteLock();
+                    }
+
+                    Marshaller.Ignite.HandleRegistry.Release(_hnd);
+
+                    break;
+                }
+            }
+        }
+
+        /** <inheritDoc /> */
+        public IDataStreamer<TK1, TV1> WithKeepPortable<TK1, TV1>()
+        {
+            if (_keepPortable)
+            {
+                var result = this as IDataStreamer<TK1, TV1>;
+
+                if (result == null)
+                    throw new InvalidOperationException(
+                        "Can't change type of portable streamer. WithKeepPortable has been called on an instance of " +
+                        "portable streamer with incompatible generic arguments.");
+
+                return result;
+            }
+
+            return new DataStreamerImpl<TK1, TV1>(UU.ProcessorDataStreamer(Marshaller.Ignite.InteropProcessor,
+                _cacheName, true), Marshaller, _cacheName, true);
+        }
+
+        /** <inheritDoc /> */
+        protected override void Dispose(bool disposing)
+        {
+            if (disposing)
+                Close(false);  // Normal dispose: do not cancel
+            else
+            {
+                // Finalizer: just close Java streamer
+                try
+                {
+                    if (_batch != null)
+                        _batch.Send(this, PlcCancelClose);
+                }
+                catch (Exception)
+                {
+                    // Finalizers should never throw
+                }
+
+                Marshaller.Ignite.HandleRegistry.Release(_hnd, true);
+                Marshaller.Ignite.HandleRegistry.Release(_rcvHnd, true);
+
+                base.Dispose(false);
+            }
+        }
+
+        /** <inheritDoc /> */
+        ~DataStreamerImpl()
+        {
+            Dispose(false);
+        }
+
+        /** <inheritDoc /> */
+        public void TopologyChange(long topVer, int topSize)
+        {
+            _rwLock.EnterWriteLock(); 
+            
+            try
+            {
+                ThrowIfDisposed();
+
+                if (_topVer < topVer)
+                {
+                    _topVer = topVer;
+                    _topSize = topSize;
+
+                    _bufSndSize = topSize * UU.DataStreamerPerNodeBufferSizeGet(Target);
+                }
+            }
+            finally
+            {
+                _rwLock.ExitWriteLock();
+            }
+
+        }
+
+        /// <summary>
+        /// Internal add/remove routine.
+        /// </summary>
+        /// <param name="val">Value.</param>
+        /// <param name="cnt">Items count.</param>
+        /// <returns>Future.</returns>
+        private IFuture Add0(object val, int cnt)
+        {
+            int bufSndSize0 = _bufSndSize;
+
+            while (true)
+            {
+                var batch0 = _batch;
+
+                if (batch0 == null)
+                    throw new InvalidOperationException("Data streamer is stopped.");
+
+                int size = batch0.Add(val, cnt);
+
+                if (size == -1)
+                {
+                    // Batch is blocked, perform CAS.
+                    Interlocked.CompareExchange(ref _batch,
+                        new DataStreamerBatch<TK, TV>(batch0), batch0);
+
+                    continue;
+                }
+                if (size >= bufSndSize0)
+                    // Batch is too big, schedule flush.
+                    Flush0(batch0, false, PlcContinue);
+
+                return batch0.Future;
+            }
+        }
+
+        /// <summary>
+        /// Internal flush routine.
+        /// </summary>
+        /// <param name="curBatch"></param>
+        /// <param name="wait">Whether to wait for flush to complete.</param>
+        /// <param name="plc">Whether this is the last batch.</param>
+        /// <returns>Whether this call was able to CAS previous batch</returns>
+        private bool Flush0(DataStreamerBatch<TK, TV> curBatch, bool wait, int plc)
+        {
+            // 1. Try setting new current batch to help further adders. 
+            bool res = Interlocked.CompareExchange(ref _batch, 
+                (plc == PlcContinue || plc == PlcFlush) ? 
+                new DataStreamerBatch<TK, TV>(curBatch) : null, curBatch) == curBatch;
+
+            // 2. Perform actual send.
+            curBatch.Send(this, plc);
+
+            if (wait)
+                // 3. Wait for all futures to finish.
+                curBatch.AwaitCompletion();
+
+            return res;
+        }
+
+        /// <summary>
+        /// Start write.
+        /// </summary>
+        /// <returns>Writer.</returns>
+        internal void Update(Action<PortableWriterImpl> action)
+        {
+            _rwLock.EnterReadLock();
+
+            try
+            {
+                ThrowIfDisposed();
+
+                DoOutOp(OpUpdate, action);
+            }
+            finally
+            {
+                _rwLock.ExitReadLock();
+            }
+        }
+
+        /// <summary>
+        /// Flusher.
+        /// </summary>
+        private class Flusher<TK1, TV1>
+        {
+            /** State: running. */
+            private const int StateRunning = 0;
+
+            /** State: stopping. */
+            private const int StateStopping = 1;
+
+            /** State: stopped. */
+            private const int StateStopped = 2;
+
+            /** Data streamer. */
+            private readonly WeakReference _ldrRef;
+
+            /** Finish flag. */
+            private int _state;
+
+            /** Flush frequency. */
+            private long _freq;
+
+            /// <summary>
+            /// Constructor.
+            /// </summary>
+            /// <param name="ldrRef">Data streamer weak reference..</param>
+            public Flusher(WeakReference ldrRef)
+            {
+                _ldrRef = ldrRef;
+
+                lock (this)
+                {
+                    _state = StateRunning;
+                }
+            }
+
+            /// <summary>
+            /// Main flusher routine.
+            /// </summary>
+            private void Run()
+            {
+                bool force = false;
+                long curFreq = 0;
+                
+                try
+                {
+                    while (true)
+                    {
+                        if (curFreq > 0 || force)
+                        {
+                            var ldr = _ldrRef.Target as DataStreamerImpl<TK1, TV1>;
+
+                            if (ldr == null)
+                                return;
+
+                            ldr.TryFlush();
+
+                            force = false;
+                        }
+
+                        lock (this)
+                        {
+                            // Stop immediately.
+                            if (_state == StateStopping)
+                                return;
+
+                            if (curFreq == _freq)
+                            {
+                                // Frequency is unchanged
+                                if (curFreq == 0)
+                                    // Just wait for a second and re-try.
+                                    Monitor.Wait(this, 1000);
+                                else
+                                {
+                                    // Calculate remaining time.
+                                    DateTime now = DateTime.Now;
+
+                                    long ticks;
+
+                                    try
+                                    {
+                                        ticks = now.AddMilliseconds(curFreq).Ticks - now.Ticks;
+
+                                        if (ticks > int.MaxValue)
+                                            ticks = int.MaxValue;
+                                    }
+                                    catch (ArgumentOutOfRangeException)
+                                    {
+                                        // Handle possible overflow.
+                                        ticks = int.MaxValue;
+                                    }
+
+                                    Monitor.Wait(this, TimeSpan.FromTicks(ticks));
+                                }
+                            }
+                            else
+                            {
+                                if (curFreq != 0)
+                                    force = true;
+
+                                curFreq = _freq;
+                            } 
+                        }
+                    }
+                }
+                finally
+                {
+                    // Let streamer know about stop.
+                    lock (this)
+                    {
+                        _state = StateStopped;
+
+                        Monitor.PulseAll(this);
+                    }
+                }
+            }
+            
+            /// <summary>
+            /// Frequency.
+            /// </summary>
+            public long Frequency
+            {
+                get
+                {
+                    return Interlocked.Read(ref _freq);
+                }
+
+                set
+                {
+                    lock (this)
+                    {
+                        if (_freq != value)
+                        {
+                            _freq = value;
+
+                            Monitor.PulseAll(this);
+                        }
+                    }
+                }
+            }
+
+            /// <summary>
+            /// Stop flusher.
+            /// </summary>
+            public void Stop()
+            {
+                lock (this)
+                {
+                    if (_state == StateRunning)
+                    {
+                        _state = StateStopping;
+
+                        Monitor.PulseAll(this);
+                    }
+
+                    while (_state != StateStopped)
+                        Monitor.Wait(this);
+                }
+            }
+
+            /// <summary>
+            /// Runs the flusher thread.
+            /// </summary>
+            public void RunThread()
+            {
+                new Thread(Run).Start();
+            }
+        }
+
+#pragma warning restore 0420
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerRemoveEntry.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerRemoveEntry.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerRemoveEntry.cs
new file mode 100644
index 0000000..7e65934
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerRemoveEntry.cs
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Datastream
+{
+    /// <summary>
+    /// Remove marker.
+    /// </summary>
+    internal class DataStreamerRemoveEntry<TK>
+    {
+        /** Key to remove. */
+        private readonly TK _key;
+
+        /// <summary>
+        /// Constructor.
+        /// </summary>
+        /// <param name="key">Key.</param>
+        public DataStreamerRemoveEntry(TK key)
+        {
+            _key = key;
+        }
+
+        /// <summary>
+        /// Key.
+        /// </summary>
+        public TK Key
+        {
+            get
+            {
+                return _key;
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/StreamReceiverHolder.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/StreamReceiverHolder.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/StreamReceiverHolder.cs
new file mode 100644
index 0000000..5a7c104
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/StreamReceiverHolder.cs
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Datastream
+{
+    using System;
+    using System.Collections.Generic;
+    using System.Diagnostics;
+    using Apache.Ignite.Core.Cache;
+    using Apache.Ignite.Core.Datastream;
+    using Apache.Ignite.Core.Impl.Cache;
+    using Apache.Ignite.Core.Impl.Common;
+    using Apache.Ignite.Core.Impl.Portable;
+    using Apache.Ignite.Core.Impl.Portable.IO;
+    using Apache.Ignite.Core.Impl.Unmanaged;
+    using Apache.Ignite.Core.Portable;
+
+    /// <summary>
+    /// Portable wrapper for <see cref="IStreamReceiver{TK,TV}"/>.
+    /// </summary>
+    internal class StreamReceiverHolder : IPortableWriteAware
+    {
+        /** */
+        private const byte RcvNormal = 0;
+
+        /** */
+        public const byte RcvTransformer = 1;
+
+        /** Generic receiver. */
+        private readonly object _rcv;
+        
+        /** Invoker delegate. */
+        private readonly Action<object, Ignite, IUnmanagedTarget, IPortableStream, bool> _invoke;
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="StreamReceiverHolder"/> class.
+        /// </summary>
+        /// <param name="reader">The reader.</param>
+        public StreamReceiverHolder(PortableReaderImpl reader)
+        {
+            var rcvType = reader.ReadByte();
+
+            _rcv = PortableUtils.ReadPortableOrSerializable<object>(reader);
+            
+            Debug.Assert(_rcv != null);
+
+            var type = _rcv.GetType();
+
+            if (rcvType == RcvTransformer)
+            {
+                // rcv is a user ICacheEntryProcessor<K, V, A, R>, construct StreamTransformer from it.
+                // (we can't marshal StreamTransformer directly, because it is generic, 
+                // and we do not know type arguments that user will have)
+                _rcv = DelegateTypeDescriptor.GetStreamTransformerCtor(type)(_rcv);
+            }
+
+            _invoke = DelegateTypeDescriptor.GetStreamReceiver(_rcv.GetType());
+        }
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="StreamReceiverHolder"/> class.
+        /// </summary>
+        /// <param name="rcv">Receiver.</param>
+        /// <param name="invoke">Invoke delegate.</param>
+        public StreamReceiverHolder(object rcv, 
+            Action<object, Ignite, IUnmanagedTarget, IPortableStream, bool> invoke)
+        {
+            Debug.Assert(rcv != null);
+            Debug.Assert(invoke != null);
+
+            _rcv = rcv;
+            _invoke = invoke;
+        }
+
+        /** <inheritdoc /> */
+        public void WritePortable(IPortableWriter writer)
+        {
+            var w = writer.RawWriter();
+
+            var writeAware = _rcv as IPortableWriteAware;
+
+            if (writeAware != null)
+                writeAware.WritePortable(writer);
+            else
+            {
+                w.WriteByte(RcvNormal);
+                PortableUtils.WritePortableOrSerializable((PortableWriterImpl) writer, _rcv);
+            }
+        }
+
+        /// <summary>
+        /// Updates cache with batch of entries.
+        /// </summary>
+        /// <param name="grid">The grid.</param>
+        /// <param name="cache">Cache.</param>
+        /// <param name="stream">Stream.</param>
+        /// <param name="keepPortable">Portable flag.</param>
+        public void Receive(Ignite grid, IUnmanagedTarget cache, IPortableStream stream, bool keepPortable)
+        {
+            Debug.Assert(grid != null);
+            Debug.Assert(cache != null);
+            Debug.Assert(stream != null);
+
+            _invoke(_rcv, grid, cache, stream, keepPortable);
+        }
+
+        /// <summary>
+        /// Invokes the receiver.
+        /// </summary>
+        /// <param name="receiver">Receiver.</param>
+        /// <param name="grid">Grid.</param>
+        /// <param name="cache">Cache.</param>
+        /// <param name="stream">Stream.</param>
+        /// <param name="keepPortable">Portable flag.</param>
+        public static void InvokeReceiver<TK, TV>(IStreamReceiver<TK, TV> receiver, Ignite grid, IUnmanagedTarget cache,
+            IPortableStream stream, bool keepPortable)
+        {
+            var reader = grid.Marshaller.StartUnmarshal(stream, keepPortable);
+
+            var size = reader.ReadInt();
+
+            var entries = new List<ICacheEntry<TK, TV>>(size);
+
+            for (var i = 0; i < size; i++)
+                entries.Add(new CacheEntry<TK, TV>(reader.ReadObject<TK>(), reader.ReadObject<TV>()));
+
+            receiver.Receive(grid.Cache<TK, TV>(cache, keepPortable), entries);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs
new file mode 100644
index 0000000..3972bb0
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs
@@ -0,0 +1,498 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Events
+{
+    using System;
+    using System.Collections.Generic;
+    using System.Diagnostics;
+    using System.Diagnostics.CodeAnalysis;
+    using System.Linq;
+    using Apache.Ignite.Core.Cluster;
+    using Apache.Ignite.Core.Common;
+    using Apache.Ignite.Core.Events;
+    using Apache.Ignite.Core.Impl.Common;
+    using Apache.Ignite.Core.Impl.Handle;
+    using Apache.Ignite.Core.Impl.Portable;
+    using Apache.Ignite.Core.Impl.Portable.IO;
+    using Apache.Ignite.Core.Impl.Unmanaged;
+    using Apache.Ignite.Core.Portable;
+    using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils;
+
+    /// <summary>
+    /// Ignite events.
+    /// </summary>
+    internal class Events : PlatformTarget, IEvents
+    {
+        /// <summary>
+        /// Opcodes.
+        /// </summary>
+        protected enum Op
+        {
+            RemoteQuery = 1,
+            RemoteListen = 2,
+            StopRemoteListen = 3,
+            WaitForLocal = 4,
+            LocalQuery = 5,
+            RecordLocal = 6,
+            EnableLocal = 8,
+            DisableLocal = 9,
+            GetEnabledEvents = 10
+        }
+
+        /** Map from user func to local wrapper, needed for invoke/unsubscribe. */
+        private readonly Dictionary<object, Dictionary<int, LocalHandledEventFilter>> _localFilters
+            = new Dictionary<object, Dictionary<int, LocalHandledEventFilter>>();
+
+        /** Grid. */
+        protected readonly Ignite Ignite;
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="Events"/> class.
+        /// </summary>
+        /// <param name="target">Target.</param>
+        /// <param name="marsh">Marshaller.</param>
+        /// <param name="clusterGroup">Cluster group.</param>
+        public Events(IUnmanagedTarget target, PortableMarshaller marsh, IClusterGroup clusterGroup)
+            : base(target, marsh)
+        {
+            Debug.Assert(clusterGroup != null);
+
+            ClusterGroup = clusterGroup;
+
+            Ignite = (Ignite) clusterGroup.Ignite;
+        }
+
+        /** <inheritDoc /> */
+        public virtual IEvents WithAsync()
+        {
+            return new EventsAsync(UU.EventsWithAsync(Target), Marshaller, ClusterGroup);
+        }
+
+        /** <inheritDoc /> */
+        public virtual bool IsAsync
+        {
+            get { return false; }
+        }
+
+        /** <inheritDoc /> */
+        public virtual IFuture GetFuture()
+        {
+            throw IgniteUtils.GetAsyncModeDisabledException();
+        }
+
+        /** <inheritDoc /> */
+        public virtual IFuture<TResult> GetFuture<TResult>()
+        {
+            throw IgniteUtils.GetAsyncModeDisabledException();
+        }
+
+        /** <inheritDoc /> */
+        public IClusterGroup ClusterGroup { get; private set; }
+
+        /** <inheritDoc /> */
+        public virtual List<T> RemoteQuery<T>(IEventFilter<T> filter, TimeSpan? timeout = null, params int[] types)
+            where T : IEvent
+        {
+            IgniteArgumentCheck.NotNull(filter, "filter");
+
+            return DoOutInOp((int) Op.RemoteQuery,
+                writer =>
+                {
+                    writer.Write(new PortableOrSerializableObjectHolder(filter));
+
+                    writer.WriteLong((long) (timeout == null ? 0 : timeout.Value.TotalMilliseconds));
+
+                    WriteEventTypes(types, writer);
+                },
+                reader => ReadEvents<T>(reader));
+        }
+
+        /** <inheritDoc /> */
+        public virtual Guid RemoteListen<T>(int bufSize = 1, TimeSpan? interval = null, bool autoUnsubscribe = true,
+            IEventFilter<T> localListener = null, IEventFilter<T> remoteFilter = null, params int[] types)
+            where T : IEvent
+        {
+            IgniteArgumentCheck.Ensure(bufSize > 0, "bufSize", "should be > 0");
+            IgniteArgumentCheck.Ensure(interval == null || interval.Value.TotalMilliseconds > 0, "interval", "should be null or >= 0");
+
+            return DoOutInOp((int) Op.RemoteListen,
+                writer =>
+                {
+                    writer.WriteInt(bufSize);
+                    writer.WriteLong((long) (interval == null ? 0 : interval.Value.TotalMilliseconds));
+                    writer.WriteBoolean(autoUnsubscribe);
+
+                    writer.WriteBoolean(localListener != null);
+
+                    if (localListener != null)
+                    {
+                        var listener = new RemoteListenEventFilter(Ignite, (id, e) => localListener.Invoke(id, (T) e));
+                        writer.WriteLong(Ignite.HandleRegistry.Allocate(listener));
+                    }
+
+                    writer.WriteBoolean(remoteFilter != null);
+
+                    if (remoteFilter != null)
+                        writer.Write(new PortableOrSerializableObjectHolder(remoteFilter));
+
+                    WriteEventTypes(types, writer);
+                },
+                reader => Marshaller.StartUnmarshal(reader).ReadGuid() ?? Guid.Empty);
+        }
+
+        /** <inheritDoc /> */
+        public virtual void StopRemoteListen(Guid opId)
+        {
+            DoOutOp((int) Op.StopRemoteListen, writer =>
+            {
+                Marshaller.StartMarshal(writer).WriteGuid(opId);
+            });
+        }
+
+        /** <inheritDoc /> */
+        public IEvent WaitForLocal(params int[] types)
+        {
+            return WaitForLocal<IEvent>(null, types);
+        }
+
+        /** <inheritDoc /> */
+        public virtual T WaitForLocal<T>(IEventFilter<T> filter, params int[] types) where T : IEvent
+        {
+            long hnd = 0;
+
+            try
+            {
+                return WaitForLocal0(filter, ref hnd, types);
+            }
+            finally
+            {
+                if (filter != null)
+                    Ignite.HandleRegistry.Release(hnd);
+            }
+        }
+
+        /** <inheritDoc /> */
+        public List<IEvent> LocalQuery(params int[] types)
+        {
+            return DoOutInOp((int) Op.LocalQuery,
+                writer => WriteEventTypes(types, writer),
+                reader => ReadEvents<IEvent>(reader));
+        }
+
+        /** <inheritDoc /> */
+        public void RecordLocal(IEvent evt)
+        {
+            throw new NotImplementedException("GG-10244");
+        }
+
+        /** <inheritDoc /> */
+        public void LocalListen<T>(IEventFilter<T> listener, params int[] types) where T : IEvent
+        {
+            IgniteArgumentCheck.NotNull(listener, "listener");
+            IgniteArgumentCheck.NotNullOrEmpty(types, "types");
+
+            foreach (var type in types)
+                LocalListen(listener, type);
+        }
+
+        /** <inheritDoc /> */
+        public bool StopLocalListen<T>(IEventFilter<T> listener, params int[] types) where T : IEvent
+        {
+            lock (_localFilters)
+            {
+                Dictionary<int, LocalHandledEventFilter> filters;
+
+                if (!_localFilters.TryGetValue(listener, out filters))
+                    return false;
+
+                var success = false;
+
+                // Should do this inside lock to avoid race with subscription
+                // ToArray is required because we are going to modify underlying dictionary during enumeration
+                foreach (var filter in GetLocalFilters(listener, types).ToArray())
+                    success |= UU.EventsStopLocalListen(Target, filter.Handle);
+
+                return success;
+            }
+        }
+
+        /** <inheritDoc /> */
+        public void EnableLocal(params int[] types)
+        {
+            IgniteArgumentCheck.NotNullOrEmpty(types, "types");
+
+            DoOutOp((int)Op.EnableLocal, writer => WriteEventTypes(types, writer));
+        }
+
+        /** <inheritDoc /> */
+        public void DisableLocal(params int[] types)
+        {
+            IgniteArgumentCheck.NotNullOrEmpty(types, "types");
+
+            DoOutOp((int)Op.DisableLocal, writer => WriteEventTypes(types, writer));
+        }
+
+        /** <inheritDoc /> */
+        public int[] GetEnabledEvents()
+        {
+            return DoInOp((int)Op.GetEnabledEvents, reader => ReadEventTypes(reader));
+        }
+
+        /** <inheritDoc /> */
+        public bool IsEnabled(int type)
+        {
+            return UU.EventsIsEnabled(Target, type);
+        }
+
+        /// <summary>
+        /// Waits for the specified events.
+        /// </summary>
+        /// <typeparam name="T">Type of events.</typeparam>
+        /// <param name="filter">Optional filtering predicate. Event wait will end as soon as it returns false.</param>
+        /// <param name="handle">The filter handle, if applicable.</param>
+        /// <param name="types">Types of the events to wait for. 
+        /// If not provided, all events will be passed to the filter.</param>
+        /// <returns>Ignite event.</returns>
+        protected T WaitForLocal0<T>(IEventFilter<T> filter, ref long handle, params int[] types) where T : IEvent
+        {
+            if (filter != null)
+                handle = Ignite.HandleRegistry.Allocate(new LocalEventFilter
+                {
+                    InvokeFunc = stream => InvokeLocalFilter(stream, filter)
+                });
+
+            var hnd = handle;
+
+            return DoOutInOp((int)Op.WaitForLocal,
+                writer =>
+                {
+                    if (filter != null)
+                    {
+                        writer.WriteBoolean(true);
+                        writer.WriteLong(hnd);
+                    }
+                    else
+                        writer.WriteBoolean(false);
+
+                    WriteEventTypes(types, writer);
+                },
+                reader => EventReader.Read<T>(Marshaller.StartUnmarshal(reader)));
+        }
+
+        /// <summary>
+        /// Reads events from a portable stream.
+        /// </summary>
+        /// <typeparam name="T">Event type.</typeparam>
+        /// <param name="reader">Reader.</param>
+        /// <returns>Resulting list or null.</returns>
+        private List<T> ReadEvents<T>(IPortableStream reader) where T : IEvent
+        {
+            return ReadEvents<T>(Marshaller.StartUnmarshal(reader));
+        }
+
+        /// <summary>
+        /// Reads events from a portable reader.
+        /// </summary>
+        /// <typeparam name="T">Event type.</typeparam>
+        /// <param name="portableReader">Reader.</param>
+        /// <returns>Resulting list or null.</returns>
+        protected static List<T> ReadEvents<T>(PortableReaderImpl portableReader) where T : IEvent
+        {
+            var count = portableReader.RawReader().ReadInt();
+
+            if (count == -1)
+                return null;
+
+            var result = new List<T>(count);
+
+            for (var i = 0; i < count; i++)
+                result.Add(EventReader.Read<T>(portableReader));
+
+            return result;
+        }
+
+        /// <summary>
+        /// Gets local filters by user listener and event type.
+        /// </summary>
+        /// <param name="listener">Listener.</param>
+        /// <param name="types">Types.</param>
+        /// <returns>Collection of local listener wrappers.</returns>
+        [SuppressMessage("ReSharper", "InconsistentlySynchronizedField",
+            Justification = "This private method should be always called within a lock on localFilters")]
+        private IEnumerable<LocalHandledEventFilter> GetLocalFilters(object listener, int[] types)
+        {
+            Dictionary<int, LocalHandledEventFilter> filters;
+
+            if (!_localFilters.TryGetValue(listener, out filters))
+                return Enumerable.Empty<LocalHandledEventFilter>();
+
+            if (types.Length == 0)
+                return filters.Values;
+
+            return types.Select(type =>
+            {
+                LocalHandledEventFilter filter;
+
+                return filters.TryGetValue(type, out filter) ? filter : null;
+            }).Where(x => x != null);
+        }
+
+        /// <summary>
+        /// Adds an event listener for local events.
+        /// </summary>
+        /// <typeparam name="T">Type of events.</typeparam>
+        /// <param name="listener">Predicate that is called on each received event.</param>
+        /// <param name="type">Event type for which this listener will be notified</param>
+        private void LocalListen<T>(IEventFilter<T> listener, int type) where T : IEvent
+        {
+            lock (_localFilters)
+            {
+                Dictionary<int, LocalHandledEventFilter> filters;
+
+                if (!_localFilters.TryGetValue(listener, out filters))
+                {
+                    filters = new Dictionary<int, LocalHandledEventFilter>();
+
+                    _localFilters[listener] = filters;
+                }
+
+                LocalHandledEventFilter localFilter;
+
+                if (!filters.TryGetValue(type, out localFilter))
+                {
+                    localFilter = CreateLocalFilter(listener, type);
+
+                    filters[type] = localFilter;
+                }
+
+                UU.EventsLocalListen(Target, localFilter.Handle, type);
+            }
+        }
+
+        /// <summary>
+        /// Creates a user filter wrapper.
+        /// </summary>
+        /// <typeparam name="T">Event object type.</typeparam>
+        /// <param name="listener">Listener.</param>
+        /// <param name="type">Event type.</param>
+        /// <returns>Created wrapper.</returns>
+        private LocalHandledEventFilter CreateLocalFilter<T>(IEventFilter<T> listener, int type) where T : IEvent
+        {
+            var result = new LocalHandledEventFilter(
+                stream => InvokeLocalFilter(stream, listener),
+                unused =>
+                {
+                    lock (_localFilters)
+                    {
+                        Dictionary<int, LocalHandledEventFilter> filters;
+
+                        if (_localFilters.TryGetValue(listener, out filters))
+                        {
+                            filters.Remove(type);
+
+                            if (filters.Count == 0)
+                                _localFilters.Remove(listener);
+                        }
+                    }
+                });
+
+            result.Handle = Ignite.HandleRegistry.Allocate(result);
+
+            return result;
+        }
+
+        /// <summary>
+        /// Invokes local filter using data from specified stream.
+        /// </summary>
+        /// <typeparam name="T">Event object type.</typeparam>
+        /// <param name="stream">The stream.</param>
+        /// <param name="listener">The listener.</param>
+        /// <returns>Filter invocation result.</returns>
+        private bool InvokeLocalFilter<T>(IPortableStream stream, IEventFilter<T> listener) where T : IEvent
+        {
+            var evt = EventReader.Read<T>(Marshaller.StartUnmarshal(stream));
+
+            // No guid in local mode
+            return listener.Invoke(Guid.Empty, evt);
+        }
+
+        /// <summary>
+        /// Writes the event types.
+        /// </summary>
+        /// <param name="types">Types.</param>
+        /// <param name="writer">Writer.</param>
+        private static void WriteEventTypes(int[] types, IPortableRawWriter writer)
+        {
+            if (types.Length == 0)
+                types = null;  // empty array means no type filtering
+
+            writer.WriteIntArray(types);
+        }
+
+        /// <summary>
+        /// Writes the event types.
+        /// </summary>
+        /// <param name="reader">Reader.</param>
+        private int[] ReadEventTypes(IPortableStream reader)
+        {
+            return Marshaller.StartUnmarshal(reader).ReadIntArray();
+        }
+
+        /// <summary>
+        /// Local user filter wrapper.
+        /// </summary>
+        private class LocalEventFilter : IInteropCallback
+        {
+            /** */
+            public Func<IPortableStream, bool> InvokeFunc;
+
+            /** <inheritdoc /> */
+            public int Invoke(IPortableStream stream)
+            {
+                return InvokeFunc(stream) ? 1 : 0;
+            }
+        }
+
+        /// <summary>
+        /// Local user filter wrapper with handle.
+        /// </summary>
+        private class LocalHandledEventFilter : Handle<Func<IPortableStream, bool>>, IInteropCallback
+        {
+            /** */
+            public long Handle;
+
+            /** <inheritdoc /> */
+            public int Invoke(IPortableStream stream)
+            {
+                return Target(stream) ? 1 : 0;
+            }
+
+            /// <summary>
+            /// Initializes a new instance of the <see cref="LocalHandledEventFilter"/> class.
+            /// </summary>
+            /// <param name="invokeFunc">The invoke function.</param>
+            /// <param name="releaseAction">The release action.</param>
+            public LocalHandledEventFilter(
+                Func<IPortableStream, bool> invokeFunc, Action<Func<IPortableStream, bool>> releaseAction) 
+                : base(invokeFunc, releaseAction)
+            {
+                // No-op.
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Events/EventsAsync.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Events/EventsAsync.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Events/EventsAsync.cs
new file mode 100644
index 0000000..632d8b8
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Events/EventsAsync.cs
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Events
+{
+    using System;
+    using System.Collections.Generic;
+    using System.Diagnostics.CodeAnalysis;
+    using System.Threading;
+    using Apache.Ignite.Core.Cluster;
+    using Apache.Ignite.Core.Common;
+    using Apache.Ignite.Core.Events;
+    using Apache.Ignite.Core.Impl.Portable;
+    using Apache.Ignite.Core.Impl.Unmanaged;
+    using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils;
+
+    /// <summary>
+    /// Async Ignite events.
+    /// </summary>
+    [SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable")]
+    internal class EventsAsync : Events
+    {
+        /** */
+        private readonly ThreadLocal<int> _lastAsyncOp = new ThreadLocal<int>(() => OpNone);
+
+        /** */
+        private readonly ThreadLocal<IFuture> _curFut = new ThreadLocal<IFuture>();
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="Events"/> class.
+        /// </summary>
+        /// <param name="target">Target.</param>
+        /// <param name="marsh">Marshaller.</param>
+        /// <param name="clusterGroup">Cluster group.</param>
+        public EventsAsync(IUnmanagedTarget target, PortableMarshaller marsh, IClusterGroup clusterGroup)
+            : base(target, marsh, clusterGroup)
+        {
+            // No-op.
+        }
+
+        /** <inheritdoc /> */
+        public override List<T> RemoteQuery<T>(IEventFilter<T> filter, TimeSpan? timeout = null, params int[] types)
+        {
+            _lastAsyncOp.Value = (int) Op.RemoteQuery;
+
+            var result = base.RemoteQuery(filter, timeout, types);
+
+            // Result is a List<T> so we can't create proper converter later in GetFuture call from user.
+            // ReSharper disable once RedundantTypeArgumentsOfMethod (otherwise won't compile in VS2010 / TC)
+            _curFut.Value = GetFuture<List<T>>((futId, futTyp) => UU.TargetListenFutureForOperation(Target, futId, futTyp,
+                (int) Op.RemoteQuery), convertFunc: ReadEvents<T>);
+
+            return result;
+        }
+
+        /** <inheritdoc /> */
+        public override Guid RemoteListen<T>(int bufSize = 1, TimeSpan? interval = null, bool autoUnsubscribe = true,
+            IEventFilter<T> localListener = null, IEventFilter<T> remoteFilter = null, params int[] types)
+        {
+            _lastAsyncOp.Value = (int) Op.RemoteListen;
+            _curFut.Value = null;
+
+            return base.RemoteListen(bufSize, interval, autoUnsubscribe, localListener, remoteFilter, types);
+        }
+
+        /** <inheritdoc /> */
+        public override void StopRemoteListen(Guid opId)
+        {
+            _lastAsyncOp.Value = (int) Op.StopRemoteListen;
+            _curFut.Value = null;
+
+            base.StopRemoteListen(opId);
+        }
+
+        /** <inheritdoc /> */
+        public override T WaitForLocal<T>(IEventFilter<T> filter, params int[] types)
+        {
+            _lastAsyncOp.Value = (int) Op.WaitForLocal;
+
+            long hnd = 0;
+
+            try
+            {
+                var result = WaitForLocal0(filter, ref hnd, types);
+
+                if (filter != null)
+                {
+                    // Dispose handle as soon as future ends.
+                    var fut = GetFuture<T>();
+
+                    _curFut.Value = fut;
+
+                    fut.Listen(() => Ignite.HandleRegistry.Release(hnd));
+                }
+                else
+                    _curFut.Value = null;
+
+                return result;
+            }
+            catch (Exception)
+            {
+                Ignite.HandleRegistry.Release(hnd);
+                throw;
+            }
+        }
+
+        /** <inheritdoc /> */
+        public override IEvents WithAsync()
+        {
+            return this;
+        }
+
+        /** <inheritdoc /> */
+        public override bool IsAsync
+        {
+            get { return true; }
+        }
+
+        /** <inheritdoc /> */
+        public override IFuture GetFuture()
+        {
+            return GetFuture<object>();
+        }
+
+        /** <inheritdoc /> */
+        public override IFuture<T> GetFuture<T>()
+        {
+            if (_curFut.Value != null)
+            {
+                var fut = _curFut.Value;
+                _curFut.Value = null;
+                return (IFuture<T>) fut;
+            }
+
+            Func<PortableReaderImpl, T> converter = null;
+
+            if (_lastAsyncOp.Value == (int) Op.WaitForLocal)
+                converter = reader => (T) EventReader.Read<IEvent>(reader);
+
+            return GetFuture((futId, futTyp) => UU.TargetListenFutureForOperation(Target, futId, futTyp, _lastAsyncOp.Value),
+                convertFunc: converter);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Events/RemoteListenEventFilter.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Events/RemoteListenEventFilter.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Events/RemoteListenEventFilter.cs
new file mode 100644
index 0000000..8b44966
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Events/RemoteListenEventFilter.cs
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Events
+{
+    using System;
+    using System.Diagnostics;
+    using Apache.Ignite.Core.Events;
+    using Apache.Ignite.Core.Impl.Common;
+    using Apache.Ignite.Core.Impl.Portable;
+    using Apache.Ignite.Core.Impl.Portable.IO;
+
+    /// <summary>
+    /// Event filter/listener holder for RemoteListen.
+    /// </summary>
+    internal class RemoteListenEventFilter : IInteropCallback
+    {
+        /** */
+        private readonly Ignite _ignite;
+        
+        /** */
+        private readonly Func<Guid, IEvent, bool> _filter;
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="RemoteListenEventFilter"/> class.
+        /// </summary>
+        /// <param name="ignite">The grid.</param>
+        /// <param name="filter">The filter.</param>
+        public RemoteListenEventFilter(Ignite ignite, Func<Guid, IEvent, bool> filter)
+        {
+            _ignite = ignite;
+            _filter = filter;
+        }
+
+        /** <inheritdoc /> */
+        public int Invoke(IPortableStream stream)
+        {
+            var reader = _ignite.Marshaller.StartUnmarshal(stream);
+
+            var evt = EventReader.Read<IEvent>(reader);
+
+            var nodeId = reader.ReadGuid() ?? Guid.Empty;
+
+            return _filter(nodeId, evt) ? 1 : 0;
+        }
+
+        /// <summary>
+        /// Creates an instance of this class from a stream.
+        /// </summary>
+        /// <param name="memPtr">Memory pointer.</param>
+        /// <param name="grid">Grid</param>
+        /// <returns>Deserialized instance of <see cref="RemoteListenEventFilter"/></returns>
+        public static RemoteListenEventFilter CreateInstance(long memPtr, Ignite grid)
+        {
+            Debug.Assert(grid != null);
+
+            using (var stream = IgniteManager.Memory.Get(memPtr).Stream())
+            {
+                var marsh = grid.Marshaller;
+
+                var reader = marsh.StartUnmarshal(stream);
+
+                var pred = reader.ReadObject<PortableOrSerializableObjectHolder>().Item;
+
+                var func = DelegateTypeDescriptor.GetEventFilter(pred.GetType());
+
+                return new RemoteListenEventFilter(grid, (id, evt) => func(pred, id, evt));
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/ExceptionUtils.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/ExceptionUtils.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/ExceptionUtils.cs
new file mode 100644
index 0000000..066f345
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/ExceptionUtils.cs
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl
+{
+    using System;
+    using System.Collections.Generic;
+    using System.Diagnostics;
+    using System.Runtime.InteropServices;
+    using System.Security;
+    using System.Threading;
+    using Apache.Ignite.Core.Cache;
+    using Apache.Ignite.Core.Cache.Store;
+    using Apache.Ignite.Core.Cluster;
+    using Apache.Ignite.Core.Common;
+    using Apache.Ignite.Core.Compute;
+    using Apache.Ignite.Core.Impl.Portable;
+    using Apache.Ignite.Core.Transactions;
+
+    /// <summary>
+    /// Managed environment. Acts as a gateway for native code.
+    /// </summary>
+    [StructLayout(LayoutKind.Sequential)]
+    internal static class ExceptionUtils
+    {
+        /** NoClassDefFoundError fully-qualified class name which is important during startup phase. */
+        private const string ClsNoClsDefFoundErr = "java.lang.NoClassDefFoundError";
+
+        /** NoSuchMethodError fully-qualified class name which is important during startup phase. */
+        private const string ClsNoSuchMthdErr = "java.lang.NoSuchMethodError";
+
+        /** InteropCachePartialUpdateException. */
+        private const string ClsCachePartialUpdateErr = "org.apache.ignite.internal.processors.platform.cache.PlatformCachePartialUpdateException";
+        
+        /** Map with predefined exceptions. */
+        private static readonly IDictionary<string, ExceptionFactoryDelegate> EXS = new Dictionary<string, ExceptionFactoryDelegate>();
+
+        /** Exception factory delegate. */
+        private delegate Exception ExceptionFactoryDelegate(string msg);
+        
+        /// <summary>
+        /// Static initializer.
+        /// </summary>
+        static ExceptionUtils()
+        {
+            // Common Java exceptions mapped to common .Net exceptions.
+            EXS["java.lang.IllegalArgumentException"] = m => new ArgumentException(m);
+            EXS["java.lang.IllegalStateException"] = m => new InvalidOperationException(m);
+            EXS["java.lang.UnsupportedOperationException"] = m => new NotImplementedException(m);
+            EXS["java.lang.InterruptedException"] = m => new ThreadInterruptedException(m);
+            
+            // Generic Ignite exceptions.
+            EXS["org.apache.ignite.IgniteException"] = m => new IgniteException(m);
+            EXS["org.apache.ignite.IgniteCheckedException"] = m => new IgniteException(m);
+
+            // Cluster exceptions.
+            EXS["org.apache.ignite.cluster.ClusterGroupEmptyException"] = m => new ClusterGroupEmptyException(m);
+            EXS["org.apache.ignite.cluster.ClusterTopologyException"] = m => new ClusterTopologyException(m);
+
+            // Compute exceptions.
+            EXS["org.apache.ignite.compute.ComputeExecutionRejectedException"] = m => new ComputeExecutionRejectedException(m);
+            EXS["org.apache.ignite.compute.ComputeJobFailoverException"] = m => new ComputeJobFailoverException(m);
+            EXS["org.apache.ignite.compute.ComputeTaskCancelledException"] = m => new ComputeTaskCancelledException(m);
+            EXS["org.apache.ignite.compute.ComputeTaskTimeoutException"] = m => new ComputeTaskTimeoutException(m);
+            EXS["org.apache.ignite.compute.ComputeUserUndeclaredException"] = m => new ComputeUserUndeclaredException(m);
+
+            // Cache exceptions.
+            EXS["javax.cache.CacheException"] = m => new CacheException(m);
+            EXS["javax.cache.integration.CacheLoaderException"] = m => new CacheStoreException(m);
+            EXS["javax.cache.integration.CacheWriterException"] = m => new CacheStoreException(m);
+            EXS["javax.cache.processor.EntryProcessorException"] = m => new CacheEntryProcessorException(m);
+            EXS["org.apache.ignite.cache.CacheAtomicUpdateTimeoutException"] = m => new CacheAtomicUpdateTimeoutException(m);
+            
+            // Transaction exceptions.
+            EXS["org.apache.ignite.transactions.TransactionOptimisticException"] = m => new TransactionOptimisticException(m);
+            EXS["org.apache.ignite.transactions.TransactionTimeoutException"] = m => new TransactionTimeoutException(m);
+            EXS["org.apache.ignite.transactions.TransactionRollbackException"] = m => new TransactionRollbackException(m);
+            EXS["org.apache.ignite.transactions.TransactionHeuristicException"] = m => new TransactionHeuristicException(m);
+
+            // Security exceptions.
+            EXS["org.apache.ignite.IgniteAuthenticationException"] = m => new SecurityException(m);
+            EXS["org.apache.ignite.plugin.security.GridSecurityException"] = m => new SecurityException(m);
+        }
+
+        /// <summary>
+        /// Creates exception according to native code class and message.
+        /// </summary>
+        /// <param name="clsName">Exception class name.</param>
+        /// <param name="msg">Exception message.</param>
+        /// <param name="reader">Error data reader.</param>
+        public static Exception GetException(string clsName, string msg, PortableReaderImpl reader = null)
+        {
+            ExceptionFactoryDelegate ctor;
+
+            if (EXS.TryGetValue(clsName, out ctor))
+                return ctor(msg);
+
+            if (ClsNoClsDefFoundErr.Equals(clsName))
+                return new IgniteException("Java class is not found (did you set IGNITE_HOME environment " +
+                    "variable?): " + msg);
+
+            if (ClsNoSuchMthdErr.Equals(clsName))
+                return new IgniteException("Java class method is not found (did you set IGNITE_HOME environment " +
+                    "variable?): " + msg);
+
+            if (ClsCachePartialUpdateErr.Equals(clsName))
+                return ProcessCachePartialUpdateException(msg, reader);
+            
+            return new IgniteException("Java exception occurred [class=" + clsName + ", message=" + msg + ']');
+        }
+
+        /// <summary>
+        /// Process cache partial update exception.
+        /// </summary>
+        /// <param name="msg">Message.</param>
+        /// <param name="reader">Reader.</param>
+        /// <returns></returns>
+        private static Exception ProcessCachePartialUpdateException(string msg, PortableReaderImpl reader)
+        {
+            if (reader == null)
+                return new CachePartialUpdateException(msg, new IgniteException("Failed keys are not available."));
+            
+            bool dataExists = reader.ReadBoolean();
+
+            Debug.Assert(dataExists);
+
+            if (reader.ReadBoolean())
+            {
+                bool keepPortable = reader.ReadBoolean();
+
+                PortableReaderImpl keysReader = reader.Marshaller.StartUnmarshal(reader.Stream, keepPortable);
+
+                try
+                {
+                    return new CachePartialUpdateException(msg, ReadNullableList(keysReader));
+                }
+                catch (Exception e)
+                {
+                    // Failed to deserialize data.
+                    return new CachePartialUpdateException(msg, e);
+                }
+            }
+            
+            // Was not able to write keys.
+            string innerErrCls = reader.ReadString();
+            string innerErrMsg = reader.ReadString();
+
+            Exception innerErr = GetException(innerErrCls, innerErrMsg);
+
+            return new CachePartialUpdateException(msg, innerErr);
+        }
+
+        /// <summary>
+        /// Create JVM initialization exception.
+        /// </summary>
+        /// <param name="clsName">Class name.</param>
+        /// <param name="msg">Message.</param>
+        /// <returns>Exception.</returns>
+        public static Exception GetJvmInitializeException(string clsName, string msg)
+        {
+            if (clsName != null)
+                return new IgniteException("Failed to initialize JVM.", GetException(clsName, msg));
+
+            if (msg != null)
+                return new IgniteException("Failed to initialize JVM: " + msg);
+
+            return new IgniteException("Failed to initialize JVM.");
+        }
+
+        /// <summary>
+        /// Reads nullable list.
+        /// </summary>
+        /// <param name="reader">Reader.</param>
+        /// <returns>List.</returns>
+        private static List<object> ReadNullableList(PortableReaderImpl reader)
+        {
+            if (!reader.ReadBoolean()) 
+                return null;
+
+            var size = reader.ReadInt();
+
+            var list = new List<object>(size);
+
+            for (int i = 0; i < size; i++)
+                list.Add(reader.ReadObject<object>());
+
+            return list;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Handle/HandleRegistry.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Handle/HandleRegistry.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Handle/HandleRegistry.cs
index 2a67c41..9c8178f 100644
--- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Handle/HandleRegistry.cs
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Handle/HandleRegistry.cs
@@ -253,7 +253,7 @@ namespace Apache.Ignite.Core.Impl.Handle
             }
 
             if (throwOnAbsent)
-                throw new InvalidOperationException("Resource handle has been released (is grid stopping?).");
+                throw new InvalidOperationException("Resource handle has been released (is Ignite stopping?).");
 
             return default(T);
         }
@@ -333,7 +333,7 @@ namespace Apache.Ignite.Core.Impl.Handle
         /// <returns>Exception.</returns>
         private static Exception ClosedException()
         {
-            return new InvalidOperationException("Cannot allocate a resource handle because grid is stopping.");
+            return new InvalidOperationException("Cannot allocate a resource handle because Ignite is stopping.");
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/IInteropCallback.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/IInteropCallback.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/IInteropCallback.cs
new file mode 100644
index 0000000..91838d0
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/IInteropCallback.cs
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl
+{
+    using Apache.Ignite.Core.Impl.Portable.IO;
+
+    /// <summary>
+    /// Interop callback.
+    /// </summary>
+    internal interface IInteropCallback
+    {
+        /// <summary>
+        /// Invokes callback.
+        /// </summary>
+        /// <param name="stream">Stream.</param>
+        /// <returns>Invocation result.</returns>
+        int Invoke(IPortableStream stream);
+    }
+}
\ No newline at end of file


Mime
View raw message