ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [13/28] ignite git commit: ignite-comm-balance
Date Thu, 08 Dec 2016 13:16:58 GMT
ignite-comm-balance


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3d33d241
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3d33d241
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3d33d241

Branch: refs/heads/ignite-4371
Commit: 3d33d241bf7bc018092b6129f6306a2b79e3ec2f
Parents: d6a9767
Author: sboikov <sboikov@gridgain.com>
Authored: Wed Dec 7 12:30:29 2016 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Wed Dec 7 14:54:28 2016 +0300

----------------------------------------------------------------------
 .../ignite/internal/IgniteInternalFuture.java   |  11 ++
 .../transactions/IgniteTxLocalAdapter.java      |   8 +-
 .../processors/igfs/IgfsDataManager.java        |   6 +-
 .../platform/compute/PlatformCompute.java       |   6 +
 .../util/future/GridFinishedFuture.java         |  24 ++++
 .../internal/util/future/GridFutureAdapter.java |  15 ++-
 .../util/future/GridFutureChainListener.java    |  30 ++++-
 .../TxDeadlockDetectionNoHangsTest.java         |   2 +-
 .../util/future/GridFutureAdapterSelfTest.java  | 122 ++++++++++---------
 9 files changed, 157 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/3d33d241/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java
index b80a755..789556d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal;
 
+import java.util.concurrent.Executor;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.IgniteCheckedException;
@@ -133,6 +134,16 @@ public interface IgniteInternalFuture<R> {
     public <T> IgniteInternalFuture<T> chain(IgniteClosure<? super IgniteInternalFuture<R>,
T> doneCb);
 
     /**
+     * Make a chained future to convert result of this future (when complete) into a new
format.
+     * It is guaranteed that done callback will be called only ONCE.
+     *
+     * @param doneCb Done callback that is applied to this future when it finishes to produce
chained future result.
+     * @param exec Executor to run callback.
+     * @return Chained future that finishes after this future completes and done callback
is called.
+     */
+    public <T> IgniteInternalFuture<T> chain(IgniteClosure<? super IgniteInternalFuture<R>,
T> doneCb, Executor exec);
+
+    /**
      * @return Error value if future has already been completed with error.
      */
     public Throwable error();

http://git-wip-us.apache.org/repos/asf/ignite/blob/3d33d241/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 6d21dcf..393fb1a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -391,7 +391,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements
Ig
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<Void> loadMissing(
         final GridCacheContext cacheCtx,
-        AffinityTopologyVersion topVer,
+        final AffinityTopologyVersion topVer,
         final boolean readThrough,
         boolean async,
         final Collection<KeyCacheObject> keys,
@@ -472,7 +472,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements
Ig
                             CacheObject cacheVal = cacheCtx.toCacheObject(val);
 
                             while (true) {
-                                GridCacheEntryEx entry = cacheCtx.cache().entryEx(key);
+                                GridCacheEntryEx entry = cacheCtx.cache().entryEx(key, topVer);
 
                                 try {
                                     GridCacheVersion setVer = entry.versionedValue(cacheVal,
ver, null);
@@ -1507,7 +1507,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements
Ig
 
                         assert txEntry != null || readCommitted() || skipVals;
 
-                        GridCacheEntryEx e = txEntry == null ? entryEx(cacheCtx, txKey) :
txEntry.cached();
+                        GridCacheEntryEx e = txEntry == null ? entryEx(cacheCtx, txKey, topVer)
: txEntry.cached();
 
                         if (readCommitted() || skipVals) {
                             cacheCtx.evicts().touch(e, topologyVersion());
@@ -1658,7 +1658,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements
Ig
                                             IgniteTxLocalAdapter.this,
                                             /*swap*/cacheCtx.isSwapOrOffheapEnabled(),
                                             /*unmarshal*/true,
-                                            /**update-metrics*/true,
+                                            /*update-metrics*/true,
                                             /*event*/!skipVals,
                                             CU.subjectId(IgniteTxLocalAdapter.this, cctx),
                                             transformClo,

http://git-wip-us.apache.org/repos/asf/ignite/blob/3d33d241/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
index e534800..4490a68 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.igfs;
 
+import java.util.concurrent.Executor;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.IgniteException;
@@ -36,6 +37,7 @@ import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadabl
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
@@ -325,6 +327,8 @@ public class IgfsDataManager extends IgfsManager {
         IgniteInternalFuture<byte[]> fut = dataCachePrj.getAsync(key);
 
         if (secReader != null) {
+            Executor exec = igfsCtx.kernalContext().pools().poolForPolicy(GridIoPolicy.IGFS_POOL);
+
             fut = fut.chain(new CX1<IgniteInternalFuture<byte[]>, byte[]>() {
                 @Override public byte[] applyx(IgniteInternalFuture<byte[]> fut) throws
IgniteCheckedException {
                     byte[] res = fut.get();
@@ -365,7 +369,7 @@ public class IgfsDataManager extends IgfsManager {
 
                     return res;
                 }
-            });
+            }, exec);
         }
         else
             igfsCtx.metrics().addReadBlocks(1, 0);

http://git-wip-us.apache.org/repos/asf/ignite/blob/3d33d241/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
index 8ff15d5..5383151 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.platform.compute;
 
+import java.util.concurrent.Executor;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteCompute;
 import org.apache.ignite.binary.BinaryObject;
@@ -409,6 +410,11 @@ public class PlatformCompute extends PlatformAbstractTarget {
         }
 
         /** {@inheritDoc} */
+        @Override public IgniteInternalFuture chain(IgniteClosure doneCb, Executor exec)
{
+            throw new UnsupportedOperationException("Chain operation is not supported.");
+        }
+
+        /** {@inheritDoc} */
         @Override public Throwable error() {
             return fut.error();
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/3d33d241/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java
index 6baedbd..dc63adc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.util.future;
 
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
@@ -152,6 +153,29 @@ public class GridFinishedFuture<T> implements IgniteInternalFuture<T>
{
     }
 
     /** {@inheritDoc} */
+    @Override public <T1> IgniteInternalFuture<T1> chain(final IgniteClosure<?
super IgniteInternalFuture<T>, T1> doneCb, Executor exec) {
+        final GridFutureAdapter<T1> fut = new GridFutureAdapter<>();
+
+        exec.execute(new Runnable() {
+            @Override public void run() {
+                try {
+                    fut.onDone(doneCb.apply(GridFinishedFuture.this));
+                }
+                catch (GridClosureException e) {
+                    fut.onDone(e.unwrap());
+                }
+                catch (RuntimeException | Error e) {
+                    fut.onDone(e);
+
+                    throw e;
+                }
+            }
+        });
+
+        return fut;
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridFinishedFuture.class, this);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/3d33d241/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
index 2cd534e..c8d85cd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.util.future;
 
 import java.util.Arrays;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.AbstractQueuedSynchronizer;
 import org.apache.ignite.IgniteCheckedException;
@@ -229,7 +230,13 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer
implements
 
     /** {@inheritDoc} */
     @Override public <T> IgniteInternalFuture<T> chain(final IgniteClosure<?
super IgniteInternalFuture<R>, T> doneCb) {
-        return new ChainFuture<>(this, doneCb);
+        return new ChainFuture<>(this, doneCb, null);
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> IgniteInternalFuture<T> chain(final IgniteClosure<?
super IgniteInternalFuture<R>, T> doneCb,
+        Executor exec) {
+        return new ChainFuture<>(this, doneCb, exec);
     }
 
     /**
@@ -487,15 +494,17 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer
implements
         /**
          * @param fut Future.
          * @param doneCb Closure.
+         * @param cbExec Optional executor to run callback.
          */
         ChainFuture(
             GridFutureAdapter<R> fut,
-            IgniteClosure<? super IgniteInternalFuture<R>, T> doneCb
+            IgniteClosure<? super IgniteInternalFuture<R>, T> doneCb,
+            @Nullable Executor cbExec
         ) {
             this.fut = fut;
             this.doneCb = doneCb;
 
-            fut.listen(new GridFutureChainListener<>(this, doneCb));
+            fut.listen(new GridFutureChainListener<>(this, doneCb, cbExec));
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/3d33d241/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureChainListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureChainListener.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureChainListener.java
index 947b2ad..367f5d1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureChainListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureChainListener.java
@@ -17,15 +17,17 @@
 
 package org.apache.ignite.internal.util.future;
 
+import java.util.concurrent.Executor;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.util.lang.GridClosureException;
 import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteInClosure;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Future listener to fill chained future with converted result of the source future.
  */
-public class GridFutureChainListener<T, R> implements IgniteInClosure<IgniteInternalFuture<T>>
{
+class GridFutureChainListener<T, R> implements IgniteInClosure<IgniteInternalFuture<T>>
{
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -35,21 +37,43 @@ public class GridFutureChainListener<T, R> implements IgniteInClosure<IgniteInte
     /** Done callback. */
     private final IgniteClosure<? super IgniteInternalFuture<T>, R> doneCb;
 
+    /** */
+    private Executor cbExec;
+
     /**
      * Constructs chain listener.
+     *
      *  @param fut Target future.
      * @param doneCb Done callback.
+     * @param cbExec Optional executor to run callback.
      */
     public GridFutureChainListener(
         GridFutureAdapter<R> fut,
-        IgniteClosure<? super IgniteInternalFuture<T>, R> doneCb
+        IgniteClosure<? super IgniteInternalFuture<T>, R> doneCb,
+        @Nullable Executor cbExec
     ) {
         this.fut = fut;
         this.doneCb = doneCb;
+        this.cbExec = cbExec;
     }
 
     /** {@inheritDoc} */
-    @Override public void apply(IgniteInternalFuture<T> t) {
+    @Override public void apply(final IgniteInternalFuture<T> t) {
+        if (cbExec != null) {
+            cbExec.execute(new Runnable() {
+                @Override public void run() {
+                    applyCallback(t);
+                }
+            });
+        }
+        else
+            applyCallback(t);
+    }
+
+    /**
+     * @param t Target future.
+     */
+    private void applyCallback(IgniteInternalFuture<T> t) {
         try {
             fut.onDone(doneCb.apply(t));
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/3d33d241/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionNoHangsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionNoHangsTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionNoHangsTest.java
index c9d18eb..e9d74ff 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionNoHangsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionNoHangsTest.java
@@ -211,7 +211,7 @@ public class TxDeadlockDetectionNoHangsTest extends GridCommonAbstractTest
{
                             tx.commit();
                         }
                         catch (Exception e) {
-                            e.printStackTrace();
+                            log.info("Ignore error: " + e);
                         }
                     }
                 }, NODES_CNT * 3, "tx-thread");

http://git-wip-us.apache.org/repos/asf/ignite/blob/3d33d241/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureAdapterSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureAdapterSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureAdapterSelfTest.java
index adcd144..4bc9f01 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureAdapterSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureAdapterSelfTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.util.future;
 
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
@@ -227,87 +228,98 @@ public class GridFutureAdapterSelfTest extends GridCommonAbstractTest
{
      *
      * @throws Exception In case of any exception.
      */
-    @SuppressWarnings("ErrorNotRethrown")
     public void testChaining() throws Exception {
+        checkChaining(null);
+
+        ExecutorService exec = Executors.newFixedThreadPool(1);
+
+        try {
+            checkChaining(exec);
+
+            GridFinishedFuture<Integer> fut = new GridFinishedFuture<>(1);
+
+            IgniteInternalFuture<Object> chain = fut.chain(new CX1<IgniteInternalFuture<Integer>,
Object>() {
+                @Override public Object applyx(IgniteInternalFuture<Integer> fut) throws
IgniteCheckedException {
+                    return fut.get() + 1;
+                }
+            }, exec);
+
+            assertEquals(2, chain.get());
+        }
+        finally {
+            exec.shutdown();
+        }
+    }
+
+    /**
+     * @param exec Executor for chain callback.
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("ErrorNotRethrown")
+    private void checkChaining(ExecutorService exec) throws Exception {
         final CX1<IgniteInternalFuture<Object>, Object> passThrough = new CX1<IgniteInternalFuture<Object>,
Object>() {
             @Override public Object applyx(IgniteInternalFuture<Object> f) throws IgniteCheckedException
{
                 return f.get();
             }
         };
 
-        final GridTestKernalContext ctx = new GridTestKernalContext(log);
-
-        ctx.setExecutorService(Executors.newFixedThreadPool(1));
-        ctx.setSystemExecutorService(Executors.newFixedThreadPool(1));
-
-        ctx.add(new PoolProcessor(ctx));
-        ctx.add(new GridClosureProcessor(ctx));
+        GridFutureAdapter<Object> fut = new GridFutureAdapter<>();
+        IgniteInternalFuture<Object> chain = exec != null ? fut.chain(passThrough,
exec) : fut.chain(passThrough);
 
-        ctx.start();
+        assertFalse(fut.isDone());
+        assertFalse(chain.isDone());
 
         try {
-            // Test result returned.
-
-            GridFutureAdapter<Object> fut = new GridFutureAdapter<>();
-            IgniteInternalFuture<Object> chain = fut.chain(passThrough);
+            chain.get(20);
 
-            assertFalse(fut.isDone());
-            assertFalse(chain.isDone());
-
-            try {
-                chain.get(20);
-
-                fail("Expects timeout exception.");
-            }
-            catch (IgniteFutureTimeoutCheckedException e) {
-                info("Expected timeout exception: " + e.getMessage());
-            }
+            fail("Expects timeout exception.");
+        }
+        catch (IgniteFutureTimeoutCheckedException e) {
+            info("Expected timeout exception: " + e.getMessage());
+        }
 
-            fut.onDone("result");
+        fut.onDone("result");
 
-            assertEquals("result", chain.get(1));
+        assertEquals("result", chain.get(1));
 
-            // Test exception re-thrown.
+        // Test exception re-thrown.
 
-            fut = new GridFutureAdapter<>();
-            chain = fut.chain(passThrough);
+        fut = new GridFutureAdapter<>();
+        chain = exec != null ? fut.chain(passThrough, exec) : fut.chain(passThrough);
 
-            fut.onDone(new ClusterGroupEmptyCheckedException("test exception"));
+        fut.onDone(new ClusterGroupEmptyCheckedException("test exception"));
 
-            try {
-                chain.get();
+        try {
+            chain.get();
 
-                fail("Expects failed with exception.");
-            }
-            catch (ClusterGroupEmptyCheckedException e) {
-                info("Expected exception: " + e.getMessage());
-            }
+            fail("Expects failed with exception.");
+        }
+        catch (ClusterGroupEmptyCheckedException e) {
+            info("Expected exception: " + e.getMessage());
+        }
 
-            // Test error re-thrown.
+        // Test error re-thrown.
 
-            fut = new GridFutureAdapter<>();
-            chain = fut.chain(passThrough);
+        fut = new GridFutureAdapter<>();
+        chain = exec != null ? fut.chain(passThrough, exec) : fut.chain(passThrough);
 
-            try {
-                fut.onDone(new StackOverflowError("test error"));
+        try {
+            fut.onDone(new StackOverflowError("test error"));
 
+            if (exec == null)
                 fail("Expects failed with error.");
-            }
-            catch (StackOverflowError e) {
-                info("Expected error: " + e.getMessage());
-            }
+        }
+        catch (StackOverflowError e) {
+            info("Expected error: " + e.getMessage());
+        }
 
-            try {
-                chain.get();
+        try {
+            chain.get();
 
-                fail("Expects failed with error.");
-            }
-            catch (StackOverflowError e) {
-                info("Expected error: " + e.getMessage());
-            }
+            fail("Expects failed with error.");
         }
-        finally {
-            ctx.stop(false);
+        catch (StackOverflowError e) {
+            info("Expected error: " + e.getMessage());
         }
     }
 


Mime
View raw message