ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject ignite git commit: IGNITE-4477: Added IgniteFuture.listenAsync() and IgniteFuture.chainAsync() methods. This closes #1844.
Date Sat, 03 Jun 2017 13:43:59 GMT
Repository: ignite
Updated Branches:
  refs/heads/master 9d2ec1cc2 -> b375c1ef5


IGNITE-4477: Added IgniteFuture.listenAsync() and IgniteFuture.chainAsync() methods. This closes #1844.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b375c1ef
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b375c1ef
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b375c1ef

Branch: refs/heads/master
Commit: b375c1ef504c72b9ffd653933da4bacea5aad5b5
Parents: 9d2ec1c
Author: dkarachentsev <dkarachentsev@gridgain.com>
Authored: Sat Jun 3 16:43:35 2017 +0300
Committer: devozerov <ppozerov@gmail.com>
Committed: Sat Jun 3 16:43:35 2017 +0300

----------------------------------------------------------------------
 .../processors/cache/IgniteCacheFutureImpl.java |   8 +-
 .../util/future/AsyncFutureListener.java        |  57 +++
 .../internal/util/future/IgniteFutureImpl.java  |  33 +-
 .../org/apache/ignite/lang/IgniteFuture.java    |  24 +-
 .../util/future/IgniteCacheFutureImplTest.java  |  46 ++
 .../util/future/IgniteFutureImplTest.java       | 466 +++++++++++++++++--
 .../testsuites/IgniteLangSelfTestSuite.java     |   3 +
 .../processors/schedule/ScheduleFutureImpl.java |  51 +-
 .../schedule/GridScheduleSelfTest.java          |  84 +++-
 9 files changed, 724 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b375c1ef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheFutureImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheFutureImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheFutureImpl.java
index 74cccc1..c861be8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheFutureImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheFutureImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.cache;
 
+import java.util.concurrent.Executor;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.IgniteFutureCancelledCheckedException;
 import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
@@ -43,7 +44,12 @@ public class IgniteCacheFutureImpl<V> extends IgniteFutureImpl<V> {
 
     /** {@inheritDoc} */
     @Override public <T> IgniteFuture<T> chain(IgniteClosure<? super IgniteFuture<V>, T> doneCb) {
-        return new IgniteCacheFutureImpl<>(chainInternal(doneCb));
+        return new IgniteCacheFutureImpl<>(chainInternal(doneCb, null));
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> IgniteFuture<T> chainAsync(IgniteClosure<? super IgniteFuture<V>, T> doneCb, Executor exec) {
+        return new IgniteCacheFutureImpl<>(chainInternal(doneCb, exec));
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/b375c1ef/modules/core/src/main/java/org/apache/ignite/internal/util/future/AsyncFutureListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/AsyncFutureListener.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/AsyncFutureListener.java
new file mode 100644
index 0000000..460ce8b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/AsyncFutureListener.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.future;
+
+import java.util.concurrent.Executor;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteInClosure;
+
+/**
+ * Wraps listener and executes it in specified executor.
+ */
+public class AsyncFutureListener<V> implements IgniteInClosure<IgniteFuture<V>> {
+    /** Serial version uid. */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private final IgniteInClosure<? super IgniteFuture<V>> lsnr;
+
+    /** */
+    private final Executor exec;
+
+    /**
+     * @param lsnr Listener to be called asynchronously.
+     * @param exec Executor to process listener.
+     */
+    public AsyncFutureListener(IgniteInClosure<? super IgniteFuture<V>> lsnr, Executor exec) {
+        assert lsnr != null;
+        assert exec != null;
+
+        this.lsnr = lsnr;
+        this.exec = exec;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void apply(final IgniteFuture<V> fut) {
+        exec.execute(new Runnable() {
+            @Override public void run() {
+                lsnr.apply(fut);
+            }
+        });
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b375c1ef/modules/core/src/main/java/org/apache/ignite/internal/util/future/IgniteFutureImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/IgniteFutureImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/IgniteFutureImpl.java
index 08fae96..a018628 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/IgniteFutureImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/IgniteFutureImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.util.future;
 
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
@@ -28,6 +29,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteInClosure;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Implementation of public API future.
@@ -70,16 +72,34 @@ public class IgniteFutureImpl<V> implements IgniteFuture<V> {
     }
 
     /** {@inheritDoc} */
+    @Override public void listenAsync(IgniteInClosure<? super IgniteFuture<V>> lsnr, Executor exec) {
+        A.notNull(lsnr, "lsnr");
+        A.notNull(exec, "exec");
+
+        fut.listen(new InternalFutureListener(new AsyncFutureListener<>(lsnr, exec)));
+    }
+
+    /** {@inheritDoc} */
     @Override public <T> IgniteFuture<T> chain(final IgniteClosure<? super IgniteFuture<V>, T> doneCb) {
-        return new IgniteFutureImpl<>(chainInternal(doneCb));
+        return new IgniteFutureImpl<>(chainInternal(doneCb, null));
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> IgniteFuture<T> chainAsync(final IgniteClosure<? super IgniteFuture<V>, T> doneCb,
+        Executor exec) {
+        A.notNull(doneCb, "doneCb");
+        A.notNull(exec, "exec");
+
+        return new IgniteFutureImpl<>(chainInternal(doneCb, exec));
     }
 
     /**
      * @param doneCb Done callback.
      * @return Internal future
      */
-    protected  <T> IgniteInternalFuture<T> chainInternal(final IgniteClosure<? super IgniteFuture<V>, T> doneCb) {
-        return fut.chain(new C1<IgniteInternalFuture<V>, T>() {
+    protected  <T> IgniteInternalFuture<T> chainInternal(final IgniteClosure<? super IgniteFuture<V>, T> doneCb,
+        @Nullable Executor exec) {
+        C1<IgniteInternalFuture<V>, T> clos = new C1<IgniteInternalFuture<V>, T>() {
             @Override public T apply(IgniteInternalFuture<V> fut) {
                 assert IgniteFutureImpl.this.fut == fut;
 
@@ -90,7 +110,12 @@ public class IgniteFutureImpl<V> implements IgniteFuture<V> {
                     throw new GridClosureException(e);
                 }
             }
-        });
+        };
+
+        if (exec != null)
+            return fut.chain(clos, exec);
+
+        return fut.chain(clos);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/b375c1ef/modules/core/src/main/java/org/apache/ignite/lang/IgniteFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/lang/IgniteFuture.java b/modules/core/src/main/java/org/apache/ignite/lang/IgniteFuture.java
index 6519ec8..ee297cd 100644
--- a/modules/core/src/main/java/org/apache/ignite/lang/IgniteFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/lang/IgniteFuture.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.lang;
 
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteInterruptedException;
@@ -96,12 +97,23 @@ public interface IgniteFuture<V> {
 
     /**
      * Registers listener closure to be asynchronously notified whenever future completes.
+     * Closure will be processed in thread that completes this future or (if future already
+     * completed) immediately in current thread.
      *
-     * @param lsnr Listener closure to register. If not provided - this method is no-op.
+     * @param lsnr Listener closure to register. Cannot be {@code null}.
      */
     public void listen(IgniteInClosure<? super IgniteFuture<V>> lsnr);
 
     /**
+     * Registers listener closure to be asynchronously notified whenever future completes.
+     * Closure will be processed in specified executor.
+     *
+     * @param lsnr Listener closure to register. Cannot be {@code null}.
+     * @param exec Executor to run listener. Cannot be {@code null}.
+     */
+    public void listenAsync(IgniteInClosure<? super IgniteFuture<V>> lsnr, Executor exec);
+
+    /**
      * Make a chained future to convert result of this future (when complete) into a new format.
      * It is guaranteed that done callback will be called only ONCE.
      *
@@ -109,4 +121,14 @@ public interface IgniteFuture<V> {
      * @return Chained future that finishes after this future completes and done callback is called.
      */
     public <T> IgniteFuture<T> chain(IgniteClosure<? super IgniteFuture<V>, T> doneCb);
+
+    /**
+     * Make a chained future to convert result of this future (when complete) into a new format.
+     * It is guaranteed that done callback will be called only ONCE.
+     *
+     * @param doneCb Done callback that is applied to this future when it finishes to produce chained future result.
+     * @param exec Executor to run done callback. Cannot be {@code null}.
+     * @return Chained future that finishes after this future completes and done callback is called.
+     */
+    public <T> IgniteFuture<T> chainAsync(IgniteClosure<? super IgniteFuture<V>, T> doneCb, Executor exec);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b375c1ef/modules/core/src/test/java/org/apache/ignite/internal/util/future/IgniteCacheFutureImplTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/future/IgniteCacheFutureImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/future/IgniteCacheFutureImplTest.java
new file mode 100644
index 0000000..46f1706
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/future/IgniteCacheFutureImplTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.future;
+
+import javax.cache.CacheException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.IgniteCacheFutureImpl;
+
+/**
+ * Tests IgniteCacheFutureImpl.
+ */
+public class IgniteCacheFutureImplTest extends IgniteFutureImplTest {
+    /** {@inheritDoc} */
+    @Override protected <V> IgniteFutureImpl<V> createFuture(IgniteInternalFuture<V> fut) {
+        return new IgniteCacheFutureImpl<>(fut);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Class<? extends Exception> expectedException() {
+        return CacheException.class;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void assertExpectedException(Exception e, Exception exp) {
+        if (exp instanceof IgniteException)
+            assertEquals(exp, e.getCause().getCause());
+        else
+            assertEquals(exp, e.getCause());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b375c1ef/modules/core/src/test/java/org/apache/ignite/internal/util/future/IgniteFutureImplTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/future/IgniteFutureImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/future/IgniteFutureImplTest.java
index 1e5b9a8..3a06cf4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/future/IgniteFutureImplTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/future/IgniteFutureImplTest.java
@@ -18,10 +18,17 @@
 package org.apache.ignite.internal.util.future;
 
 import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.CacheException;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.util.typedef.C1;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -29,18 +36,47 @@ import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.NotNull;
 
 /**
  *
  */
 public class IgniteFutureImplTest extends GridCommonAbstractTest {
+    /** Context thread name. */
+    private static final String CTX_THREAD_NAME = "test-async";
+
+    /** Custom thread name. */
+    private static final String CUSTOM_THREAD_NAME = "test-custom-async";
+
+    /** Test executor. */
+    private ExecutorService ctxExec;
+
+    /** Custom executor. */
+    private ExecutorService customExec;
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor")
+    @Override protected void beforeTest() throws Exception {
+        ctxExec = createExecutor(CTX_THREAD_NAME);
+        customExec = createExecutor(CUSTOM_THREAD_NAME);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        U.shutdownNow(getClass(), ctxExec, log);
+        U.shutdownNow(getClass(), customExec, log);
+
+        ctxExec = null;
+        customExec = null;
+    }
+
     /**
      * @throws Exception If failed.
      */
     public void testFutureGet() throws Exception {
         GridFutureAdapter<String> fut0 = new GridFutureAdapter<>();
 
-        IgniteFutureImpl<String> fut = new IgniteFutureImpl<>(fut0);
+        IgniteFutureImpl<String> fut = createFuture(fut0);
 
         assertFalse(fut.isDone());
 
@@ -59,7 +95,7 @@ public class IgniteFutureImplTest extends GridCommonAbstractTest {
     public void testFutureException() throws Exception {
         GridFutureAdapter<String> fut0 = new GridFutureAdapter<>();
 
-        final IgniteFutureImpl<String> fut = new IgniteFutureImpl<>(fut0);
+        final IgniteFutureImpl<String> fut = createFuture(fut0);
 
         assertFalse(fut.isDone());
 
@@ -69,27 +105,27 @@ public class IgniteFutureImplTest extends GridCommonAbstractTest {
 
         fut0.onDone(err0);
 
-        IgniteException err = (IgniteException)GridTestUtils.assertThrows(log, new Callable<Void>() {
+        Exception err = (Exception)GridTestUtils.assertThrows(log, new Callable<Void>() {
             @Override public Void call() throws Exception {
                 fut.get();
 
                 return null;
             }
-        }, IgniteException.class, "test error");
+        }, expectedException(), "test error");
 
-        assertEquals(err0, err.getCause());
+        assertExpectedException(err, err0);
 
         assertTrue(fut.isDone());
 
-        err = (IgniteException)GridTestUtils.assertThrows(log, new Callable<Void>() {
+        err = (Exception)GridTestUtils.assertThrows(log, new Callable<Void>() {
             @Override public Void call() throws Exception {
                 fut.get();
 
                 return null;
             }
-        }, IgniteException.class, null);
+        }, expectedException(), null);
 
-        assertEquals(err0, err.getCause());
+        assertExpectedException(err, err0);
     }
 
     /**
@@ -98,21 +134,21 @@ public class IgniteFutureImplTest extends GridCommonAbstractTest {
     public void testFutureIgniteException() throws Exception {
         GridFutureAdapter<String> fut0 = new GridFutureAdapter<>();
 
-        final IgniteFutureImpl<String> fut = new IgniteFutureImpl<>(fut0);
+        final IgniteFutureImpl<String> fut = createFuture(fut0);
 
         IgniteException err0 = new IgniteException("test error");
 
         fut0.onDone(err0);
 
-        IgniteException err = (IgniteException)GridTestUtils.assertThrows(log, new Callable<Void>() {
+        Exception err = (Exception)GridTestUtils.assertThrows(log, new Callable<Void>() {
             @Override public Void call() throws Exception {
                 fut.get();
 
                 return null;
             }
-        }, IgniteException.class, "test error");
+        }, expectedException(), "test error");
 
-        assertEquals(err0, err);
+        assertExpectedException(err, err0);
     }
 
     /**
@@ -121,7 +157,7 @@ public class IgniteFutureImplTest extends GridCommonAbstractTest {
     public void testListeners() throws Exception {
         GridFutureAdapter<String> fut0 = new GridFutureAdapter<>();
 
-        IgniteFutureImpl<String> fut = new IgniteFutureImpl<>(fut0);
+        IgniteFutureImpl<String> fut = createFuture(fut0);
 
         final AtomicInteger lsnr1Cnt = new AtomicInteger();
 
@@ -166,7 +202,7 @@ public class IgniteFutureImplTest extends GridCommonAbstractTest {
         {
             GridFutureAdapter<String> fut0 = new GridFutureAdapter<>();
 
-            IgniteFutureImpl<String> fut = new IgniteFutureImpl<>(fut0);
+            IgniteFutureImpl<String> fut = createFuture(fut0);
 
             final IgniteException err0 = new IgniteException("test error");
 
@@ -179,8 +215,8 @@ public class IgniteFutureImplTest extends GridCommonAbstractTest {
 
                         fail();
                     }
-                    catch (IgniteException err) {
-                        assertEquals(err0, err);
+                    catch (IgniteException | CacheException err) {
+                        assertExpectedException(err, err0);
 
                         passed.set(true);
                     }
@@ -197,7 +233,7 @@ public class IgniteFutureImplTest extends GridCommonAbstractTest {
         {
             GridFutureAdapter<String> fut0 = new GridFutureAdapter<>();
 
-            IgniteFutureImpl<String> fut = new IgniteFutureImpl<>(fut0);
+            IgniteFutureImpl<String> fut = createFuture(fut0);
 
             final IgniteCheckedException err0 = new IgniteCheckedException("test error");
 
@@ -210,8 +246,8 @@ public class IgniteFutureImplTest extends GridCommonAbstractTest {
 
                         fail();
                     }
-                    catch (IgniteException err) {
-                        assertEquals(err0, err.getCause());
+                    catch (IgniteException | CacheException err) {
+                        assertExpectedException(err, err0);
 
                         passed.set(true);
                     }
@@ -229,10 +265,130 @@ public class IgniteFutureImplTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    public void testAsyncListeners() throws Exception {
+        GridFutureAdapter<String> fut0 = new GridFutureAdapter<>();
+
+        IgniteFutureImpl<String> fut = createFuture(fut0);
+
+        final CountDownLatch latch1 = new CountDownLatch(1);
+
+        IgniteInClosure<? super IgniteFuture<String>> lsnr1 = createAsyncListener(latch1, CUSTOM_THREAD_NAME, null);
+
+        assertFalse(fut.isDone());
+
+        fut.listenAsync(lsnr1, customExec);
+
+        U.sleep(100);
+
+        assertEquals(1, latch1.getCount());
+
+        fut0.onDone("test");
+
+        assert latch1.await(1, TimeUnit.SECONDS) : latch1.getCount();
+
+        checkAsyncListener(fut);
+        checkAsyncListener(createFuture(new GridFinishedFuture<>("test")));
+    }
+
+    /**
+     * @param fut Future.
+     */
+    private void checkAsyncListener(IgniteFutureImpl<String> fut) throws InterruptedException {
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        IgniteInClosure<? super IgniteFuture<String>> lsnr = createAsyncListener(latch, CUSTOM_THREAD_NAME, null);
+
+        fut.listenAsync(lsnr, customExec);
+
+        assert latch.await(1, TimeUnit.SECONDS) : latch.getCount();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAsyncListenersOnError() throws Exception {
+        checkAsyncListenerOnError(new IgniteException("Test exception"));
+        checkAsyncListenerOnError(new IgniteCheckedException("Test checked exception"));
+    }
+
+    /**
+     * @param err0 Test exception.
+     */
+    private void checkAsyncListenerOnError(Exception err0) throws InterruptedException {
+        GridFutureAdapter<String> fut0 = new GridFutureAdapter<>();
+
+        IgniteFutureImpl<String> fut = createFuture(fut0);
+
+        final CountDownLatch latch1 = new CountDownLatch(1);
+
+        IgniteInClosure<? super IgniteFuture<String>> lsnr1 = createAsyncListener(latch1, CUSTOM_THREAD_NAME, err0);
+
+        fut.listenAsync(lsnr1, customExec);
+
+        assertEquals(1, latch1.getCount());
+
+        fut0.onDone(err0);
+
+        assert latch1.await(1, TimeUnit.SECONDS);
+
+        checkAsyncListenerOnError(err0, fut);
+        checkAsyncListenerOnError(err0, createFuture(new GridFinishedFuture<String>(err0)));
+    }
+
+    /**
+     * @param err0 Err 0.
+     * @param fut Future.
+     */
+    private void checkAsyncListenerOnError(Exception err0, IgniteFutureImpl<String> fut) throws InterruptedException {
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        IgniteInClosure<? super IgniteFuture<String>> lsnr = createAsyncListener(latch, CUSTOM_THREAD_NAME, err0);
+
+        fut.listenAsync(lsnr, customExec);
+
+        assert latch.await(1, TimeUnit.SECONDS);
+    }
+
+    /**
+     * @param latch Latch.
+     */
+    @NotNull private CI1<IgniteFuture<String>> createAsyncListener(
+        final CountDownLatch latch,
+        final String threadName,
+        final Exception err
+    ) {
+        return new CI1<IgniteFuture<String>>() {
+            @Override public void apply(IgniteFuture<String> fut) {
+                try {
+                    String tname = Thread.currentThread().getName();
+
+                    assert tname.contains(threadName) : tname;
+
+                    assertEquals("test", fut.get());
+
+                    if (err != null)
+                        fail();
+                }
+                catch (IgniteException | CacheException e) {
+                    if (err != null)
+                        assertExpectedException(e, err);
+                    else
+                        throw e;
+                }
+                finally {
+                    latch.countDown();
+                }
+            }
+        };
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testChain() throws Exception {
         GridFutureAdapter<String> fut0 = new GridFutureAdapter<>();
 
-        IgniteFutureImpl<String> fut = new IgniteFutureImpl<>(fut0);
+        IgniteFutureImpl<String> fut = createFuture(fut0);
 
         IgniteFuture<Integer> chained = fut.chain(new C1<IgniteFuture<String>, Integer>() {
             @Override public Integer apply(IgniteFuture<String> fut) {
@@ -274,7 +430,7 @@ public class IgniteFutureImplTest extends GridCommonAbstractTest {
         {
             GridFutureAdapter<String> fut0 = new GridFutureAdapter<>();
 
-            IgniteFutureImpl<String> fut = new IgniteFutureImpl<>(fut0);
+            IgniteFutureImpl<String> fut = createFuture(fut0);
 
             final IgniteException err0 = new IgniteException("test error");
 
@@ -289,8 +445,8 @@ public class IgniteFutureImplTest extends GridCommonAbstractTest {
 
                         return -1;
                     }
-                    catch (IgniteException err) {
-                        assertEquals(err0, err);
+                    catch (IgniteException | CacheException err) {
+                        assertExpectedException(err, err0);
 
                         chainedPassed.set(true);
 
@@ -308,8 +464,8 @@ public class IgniteFutureImplTest extends GridCommonAbstractTest {
 
                         fail();
                     }
-                    catch (IgniteException err) {
-                        assertEquals(err0, err);
+                    catch (IgniteException | CacheException err) {
+                        assertExpectedException(err, err0);
 
                         lsnrPassed.set(true);
                     }
@@ -329,8 +485,8 @@ public class IgniteFutureImplTest extends GridCommonAbstractTest {
 
                 fail();
             }
-            catch (IgniteException err) {
-                assertEquals(err0, err);
+            catch (IgniteException | CacheException err) {
+                assertExpectedException(err, err0);
             }
 
             try {
@@ -338,15 +494,15 @@ public class IgniteFutureImplTest extends GridCommonAbstractTest {
 
                 fail();
             }
-            catch (IgniteException err) {
-                assertEquals(err0, err);
+            catch (IgniteException | CacheException err) {
+                assertExpectedException(err, err0);
             }
         }
 
         {
             GridFutureAdapter<String> fut0 = new GridFutureAdapter<>();
 
-            IgniteFutureImpl<String> fut = new IgniteFutureImpl<>(fut0);
+            IgniteFutureImpl<String> fut = createFuture(fut0);
 
             final IgniteCheckedException err0 = new IgniteCheckedException("test error");
 
@@ -361,8 +517,8 @@ public class IgniteFutureImplTest extends GridCommonAbstractTest {
 
                         return -1;
                     }
-                    catch (IgniteException err) {
-                        assertEquals(err0, err.getCause());
+                    catch (IgniteException | CacheException err) {
+                        assertExpectedException(err, err0);
 
                         chainedPassed.set(true);
 
@@ -380,8 +536,8 @@ public class IgniteFutureImplTest extends GridCommonAbstractTest {
 
                         fail();
                     }
-                    catch (IgniteException err) {
-                        assertEquals(err0, err.getCause());
+                    catch (IgniteException | CacheException err) {
+                        assertExpectedException(err, err0);
 
                         lsnrPassed.set(true);
                     }
@@ -401,8 +557,8 @@ public class IgniteFutureImplTest extends GridCommonAbstractTest {
 
                 fail();
             }
-            catch (IgniteException err) {
-                assertEquals(err0, err.getCause());
+            catch (IgniteException | CacheException err) {
+                assertExpectedException(err, err0);
             }
 
             try {
@@ -410,9 +566,243 @@ public class IgniteFutureImplTest extends GridCommonAbstractTest {
 
                 fail();
             }
-            catch (IgniteException err) {
-                assertEquals(err0, err.getCause());
+            catch (IgniteException | CacheException err) {
+                assertExpectedException(err, err0);
+            }
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testChainAsync() throws Exception {
+        GridFutureAdapter<String> fut0 = new GridFutureAdapter<>();
+
+        IgniteFuture<String> fut = createFuture(fut0);
+
+        C1<IgniteFuture<String>, Integer> chainClos = new C1<IgniteFuture<String>, Integer>() {
+            @Override public Integer apply(IgniteFuture<String> fut) {
+                assertEquals(CUSTOM_THREAD_NAME, Thread.currentThread().getName());
+
+                return Integer.valueOf(fut.get());
+            }
+        };
+
+        IgniteFuture<Integer> chained1 = fut.chainAsync(chainClos, customExec);
+
+        assertFalse(chained1.isDone());
+
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        class TestClosure implements CI1<IgniteFuture<Integer>> {
+            private final CountDownLatch latch;
+
+            private TestClosure(CountDownLatch latch) {
+                this.latch = latch;
+            }
+
+            @Override public void apply(IgniteFuture<Integer> fut) {
+                assertEquals(CUSTOM_THREAD_NAME, Thread.currentThread().getName());
+                assertEquals(10, (int)fut.get());
+
+                latch.countDown();
             }
         }
+
+        chained1.listen(new TestClosure(latch));
+
+        fut0.onDone("10");
+
+        // Chained future will be completed asynchronously.
+        chained1.get(100, TimeUnit.MILLISECONDS);
+
+        assertTrue(chained1.isDone());
+
+        assertEquals(10, (int)chained1.get());
+
+        assert latch.await(100, TimeUnit.MILLISECONDS);
+
+        assertTrue(fut.isDone());
+
+        assertEquals("10", fut.get());
+
+        // Test finished future
+        GridFinishedFuture<String> ffut0 = new GridFinishedFuture<>("10");
+
+        CountDownLatch latch1 = new CountDownLatch(1);
+
+        IgniteFuture<Integer> chained2 = createFuture(ffut0).chainAsync(chainClos, customExec);
+
+        chained2.listen(new TestClosure(latch1));
+
+        chained2.get(100, TimeUnit.MILLISECONDS);
+
+        assertTrue(chained2.isDone());
+
+        assertEquals(10, (int)chained2.get());
+
+        assert latch1.await(100, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testChainAsyncOnError() throws Exception {
+        checkChainedOnError(new IgniteException("Test exception"));
+        checkChainedOnError(new IgniteCheckedException("Test checked exception"));
+        checkChainedOnErrorFinishedFuture(new IgniteException("Test exception"));
+        checkChainedOnErrorFinishedFuture(new IgniteCheckedException("Test checked exception"));
+    }
+
+    /**
+     * @param err Exception.
+     * @throws Exception If failed.
+     */
+    private void checkChainedOnError(final Exception err) throws Exception {
+        GridFutureAdapter<String> fut0 = new GridFutureAdapter<>();
+
+        IgniteFutureImpl<String> fut = createFuture(fut0);
+
+        // Chain callback will be invoked in specific executor.
+        IgniteFuture<Integer> chained1 = fut.chainAsync(new C1<IgniteFuture<String>, Integer>() {
+            @Override public Integer apply(IgniteFuture<String> fut) {
+                assertEquals(CUSTOM_THREAD_NAME, Thread.currentThread().getName());
+
+                try {
+                    fut.get();
+
+                    fail();
+                }
+                catch (IgniteException | CacheException e) {
+                    assertExpectedException(e, err);
+
+                    throw e;
+                }
+
+                return -1;
+            }
+        }, customExec);
+
+        assertFalse(chained1.isDone());
+        assertFalse(fut.isDone());
+
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        chained1.listen(new CI1<IgniteFuture<Integer>>() {
+            @Override public void apply(IgniteFuture<Integer> fut) {
+                try {
+                    assertEquals(CUSTOM_THREAD_NAME, Thread.currentThread().getName());
+
+                    fut.get();
+
+                    fail();
+                }
+                catch (IgniteException | CacheException e) {
+                    assertExpectedException(e, err);
+                }
+                finally {
+                    latch.countDown();
+                }
+            }
+        });
+
+        fut0.onDone(err);
+
+        assertExceptionThrown(err, chained1);
+        assertExceptionThrown(err, fut);
+
+        assertTrue(chained1.isDone());
+        assertTrue(fut.isDone());
+
+        assert latch.await(100, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * @param err Err.
+     */
+    private void checkChainedOnErrorFinishedFuture(final Exception err) throws Exception {
+        IgniteFutureImpl<String> fut = createFuture(new GridFinishedFuture<String>(err));
+
+        // Chain callback will be invoked in specific executor.
+        IgniteFuture<Integer> chained1 = fut.chainAsync(new C1<IgniteFuture<String>, Integer>() {
+            @Override public Integer apply(IgniteFuture<String> fut) {
+                assertEquals(CUSTOM_THREAD_NAME, Thread.currentThread().getName());
+
+                try {
+                    fut.get();
+
+                    fail();
+                }
+                catch (IgniteException e) {
+                    assertExpectedException(e, err);
+
+                    throw e;
+                }
+
+                return -1;
+            }
+        }, customExec);
+
+
+        assertExceptionThrown(err, chained1);
+        assertExceptionThrown(err, fut);
+
+        assertTrue(chained1.isDone());
+        assertTrue(fut.isDone());
+    }
+
+    /**
+     * @param err Expected exception.
+     * @param fut Future.
+     */
+    private void assertExceptionThrown(Exception err, IgniteFuture<?> fut) {
+        try {
+            fut.get();
+
+            fail();
+        }
+        catch (IgniteException | CacheException e) {
+            assertExpectedException(e, err);
+        }
+    }
+
+    /**
+     * @param e Actual exception.
+     * @param exp Expected exception.
+     */
+    protected void assertExpectedException(Exception e, Exception exp) {
+        if (exp instanceof IgniteException)
+            assertEquals(exp, e);
+        else
+            assertEquals(exp, e.getCause());
+    }
+
+    /**
+     * @param name Name.
+     */
+    @NotNull private ExecutorService createExecutor(final String name) {
+        return Executors.newSingleThreadExecutor(new ThreadFactory() {
+            @Override public Thread newThread(@NotNull Runnable r) {
+                Thread t = new Thread(r);
+
+                t.setName(name);
+
+                return t;
+            }
+        });
+    }
+
+    /**
+     * @param fut Future.
+     */
+    protected <V> IgniteFutureImpl<V> createFuture(IgniteInternalFuture<V> fut) {
+        return new IgniteFutureImpl<>(fut);
+    }
+
+    /**
+     *
+     */
+    protected Class<? extends Exception> expectedException() {
+        return IgniteException.class;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b375c1ef/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteLangSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteLangSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteLangSelfTestSuite.java
index cfec1ec..70aec72 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteLangSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteLangSelfTestSuite.java
@@ -21,6 +21,7 @@ import junit.framework.TestSuite;
 import org.apache.ignite.internal.util.future.GridCompoundFutureSelfTest;
 import org.apache.ignite.internal.util.future.GridEmbeddedFutureSelfTest;
 import org.apache.ignite.internal.util.future.GridFutureAdapterSelfTest;
+import org.apache.ignite.internal.util.future.IgniteCacheFutureImplTest;
 import org.apache.ignite.internal.util.future.IgniteFutureImplTest;
 import org.apache.ignite.internal.util.future.nio.GridNioEmbeddedFutureSelfTest;
 import org.apache.ignite.internal.util.future.nio.GridNioFutureSelfTest;
@@ -80,7 +81,9 @@ public class IgniteLangSelfTestSuite extends TestSuite {
         suite.addTest(new TestSuite(GridEmbeddedFutureSelfTest.class));
         suite.addTest(new TestSuite(GridNioFutureSelfTest.class));
         suite.addTest(new TestSuite(GridNioEmbeddedFutureSelfTest.class));
+
         suite.addTest(new TestSuite(IgniteFutureImplTest.class));
+        suite.addTest(new TestSuite(IgniteCacheFutureImplTest.class));
 
         // Consistent hash tests.
         suite.addTest(new TestSuite(GridConsistentHashSelfTest.class));

http://git-wip-us.apache.org/repos/asf/ignite/blob/b375c1ef/modules/schedule/src/main/java/org/apache/ignite/internal/processors/schedule/ScheduleFutureImpl.java
----------------------------------------------------------------------
diff --git a/modules/schedule/src/main/java/org/apache/ignite/internal/processors/schedule/ScheduleFutureImpl.java b/modules/schedule/src/main/java/org/apache/ignite/internal/processors/schedule/ScheduleFutureImpl.java
index 63a6164..e3c743c 100644
--- a/modules/schedule/src/main/java/org/apache/ignite/internal/processors/schedule/ScheduleFutureImpl.java
+++ b/modules/schedule/src/main/java/org/apache/ignite/internal/processors/schedule/ScheduleFutureImpl.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.regex.Matcher;
@@ -35,6 +36,7 @@ import org.apache.ignite.IgniteInterruptedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
+import org.apache.ignite.internal.util.future.AsyncFutureListener;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.future.IgniteFutureImpl;
 import org.apache.ignite.internal.util.lang.GridClosureException;
@@ -579,15 +581,42 @@ class ScheduleFutureImpl<R> implements SchedulerFuture<R> {
     }
 
     /** {@inheritDoc} */
+    @Override public void listenAsync(IgniteInClosure<? super IgniteFuture<R>> lsnr, Executor exec) {
+        A.notNull(lsnr, "lsnr");
+        A.notNull(exec, "exec");
+
+        listen(new AsyncFutureListener<>(lsnr, exec));
+    }
+
+    /** {@inheritDoc} */
     @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor")
     @Override public <T> IgniteFuture<T> chain(final IgniteClosure<? super IgniteFuture<R>, T> doneCb) {
-        final GridFutureAdapter<T> fut = new GridFutureAdapter() {
+        A.notNull(doneCb, "doneCb");
+
+        return chain(doneCb, null);
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> IgniteFuture<T> chainAsync(IgniteClosure<? super IgniteFuture<R>, T> doneCb, Executor exec) {
+        A.notNull(doneCb, "");
+        A.notNull(exec, "exec");
+
+        return chain(doneCb, exec);
+    }
+
+    /**
+     * @param doneCb Done callback.
+     * @param exec Executor.
+     * @return Chained future.
+     */
+    private <T> IgniteFuture<T> chain(final IgniteClosure<? super IgniteFuture<R>, T> doneCb, @Nullable Executor exec) {
+        final GridFutureAdapter<T> fut = new GridFutureAdapter<T>() {
             @Override public String toString() {
                 return "ChainFuture[orig=" + ScheduleFutureImpl.this + ", doneCb=" + doneCb + ']';
             }
         };
 
-        listen(new CI1<IgniteFuture<R>>() {
+        IgniteInClosure<? super IgniteFuture<R>> lsnr = new CI1<IgniteFuture<R>>() {
             @Override public void apply(IgniteFuture<R> fut0) {
                 try {
                     fut.onDone(doneCb.apply(fut0));
@@ -607,7 +636,12 @@ class ScheduleFutureImpl<R> implements SchedulerFuture<R> {
                     throw e;
                 }
             }
-        });
+        };
+
+        if (exec != null)
+            lsnr = new AsyncFutureListener<>(lsnr, exec);
+
+        listen(lsnr);
 
         return new IgniteFutureImpl<>(fut);
     }
@@ -861,9 +895,20 @@ class ScheduleFutureImpl<R> implements SchedulerFuture<R> {
         }
 
         /** {@inheritDoc} */
+        @Override public void listenAsync(IgniteInClosure<? super IgniteFuture<R>> lsnr, Executor exec) {
+            ref.listenAsync(lsnr, exec);
+        }
+
+        /** {@inheritDoc} */
         @Override public <T> IgniteFuture<T> chain(IgniteClosure<? super IgniteFuture<R>, T> doneCb) {
             return ref.chain(doneCb);
         }
+
+        /** {@inheritDoc} */
+        @Override public <T> IgniteFuture<T> chainAsync(IgniteClosure<? super IgniteFuture<R>, T> doneCb,
+            Executor exec) {
+            return ref.chainAsync(doneCb, exec);
+        }
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/b375c1ef/modules/schedule/src/test/java/org/apache/ignite/internal/processors/schedule/GridScheduleSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/schedule/src/test/java/org/apache/ignite/internal/processors/schedule/GridScheduleSelfTest.java b/modules/schedule/src/test/java/org/apache/ignite/internal/processors/schedule/GridScheduleSelfTest.java
index f0860f2..122d98d 100644
--- a/modules/schedule/src/test/java/org/apache/ignite/internal/processors/schedule/GridScheduleSelfTest.java
+++ b/modules/schedule/src/test/java/org/apache/ignite/internal/processors/schedule/GridScheduleSelfTest.java
@@ -19,21 +19,29 @@ package org.apache.ignite.internal.processors.schedule;
 
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.util.lang.GridTuple;
 import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteRunnable;
 import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.resources.LoggerResource;
 import org.apache.ignite.scheduler.SchedulerFuture;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.NotNull;
 
 import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
 
 /**
  * Test for task scheduler.
@@ -43,9 +51,15 @@ public class GridScheduleSelfTest extends GridCommonAbstractTest {
     /** */
     private static final int NODES_CNT = 2;
 
+    /** Custom thread name. */
+    private static final String CUSTOM_THREAD_NAME = "custom-async-test";
+
     /** */
     private static AtomicInteger execCntr = new AtomicInteger(0);
 
+    /** Custom executor. */
+    private ExecutorService exec;
+
     /** {@inheritDoc} */
     @Override protected void beforeTestsStarted() throws Exception {
         startGrids(NODES_CNT);
@@ -61,6 +75,22 @@ public class GridScheduleSelfTest extends GridCommonAbstractTest {
     /** {@inheritDoc} */
     @Override protected void beforeTest() throws Exception {
         execCntr.set(0);
+        exec = Executors.newSingleThreadExecutor(new ThreadFactory() {
+            @Override public Thread newThread(@NotNull Runnable r) {
+                Thread t = new Thread(r);
+
+                t.setName(CUSTOM_THREAD_NAME);
+
+                return t;
+            }
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        U.shutdownNow(getClass(), exec, log);
+
+        exec = null;
     }
 
     /**
@@ -124,6 +154,54 @@ public class GridScheduleSelfTest extends GridCommonAbstractTest {
                 }
             });
 
+            final SchedulerFuture<?> fut0 = fut;
+
+            //noinspection ThrowableNotThrown
+            assertThrows(log, new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    fut0.listenAsync(new IgniteInClosure<IgniteFuture<?>>() {
+                        @Override public void apply(IgniteFuture<?> fut) {
+                            // No-op
+                        }
+                    }, null);
+
+                    return null;
+                }
+            }, NullPointerException.class, null);
+
+            fut.listenAsync(new IgniteInClosure<IgniteFuture<?>>() {
+                @Override public void apply(IgniteFuture<?> fut) {
+                    assertEquals(Thread.currentThread().getName(), CUSTOM_THREAD_NAME);
+
+                    notifyCnt.incrementAndGet();
+                }
+            }, exec);
+
+            //noinspection ThrowableNotThrown
+            assertThrows(log, new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    fut0.chainAsync(new IgniteClosure<IgniteFuture<?>, String>() {
+                        @Override public String apply(IgniteFuture<?> fut) {
+                            // No-op
+
+                            return null;
+                        }
+                    }, null);
+
+                    return null;
+                }
+            }, NullPointerException.class, null);
+
+            IgniteFuture<String> chained1 = fut.chainAsync(new IgniteClosure<IgniteFuture<?>, String>() {
+                @Override public String apply(IgniteFuture<?> fut) {
+                    assertEquals(Thread.currentThread().getName(), CUSTOM_THREAD_NAME);
+
+                    fut.get();
+
+                    return "done-custom";
+                }
+            }, exec);
+
             long timeTillRun = freq + delay;
 
             info("Going to wait for the first run: " + timeTillRun);
@@ -135,6 +213,7 @@ public class GridScheduleSelfTest extends GridCommonAbstractTest {
             assert !fut.isDone();
             assert !fut.isCancelled();
             assert fut.last() == null;
+            assertFalse(chained1.isDone());
 
             info("Going to wait for 2nd run: " + timeTillRun);
 
@@ -142,10 +221,13 @@ public class GridScheduleSelfTest extends GridCommonAbstractTest {
             Thread.sleep(timeTillRun * 1000);
 
             assert fut.isDone();
-            assert notifyCnt.get() == 2;
+            assert notifyCnt.get() == 2 * 2;
             assert !fut.isCancelled();
             assert fut.last() == null;
 
+            assertEquals("done-custom", chained1.get());
+
+            assertTrue(chained1.isDone());
         }
         finally {
             assert fut != null;


Mime
View raw message