ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [15/33] ignite git commit: IGNITE-3055: IgniteDataStreamer can't be timed out
Date Fri, 22 Jul 2016 11:29:29 GMT
IGNITE-3055: IgniteDataStreamer can't be timed out


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/10224dfe
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/10224dfe
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/10224dfe

Branch: refs/heads/ignite-3547
Commit: 10224dfee22b497bc671f878028881f99465a4c8
Parents: 06b24a9
Author: Vladislav Pyatkov <vldpyatkov@gmail.com>
Authored: Wed Jul 13 15:24:53 2016 +0300
Committer: Denis Magda <dmagda@gridgain.com>
Committed: Wed Jul 13 15:24:53 2016 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/IgniteDataStreamer.java   |  52 +++++-
 .../IgniteDataStreamerTimeoutException.java     |  45 +++++
 .../datastreamer/DataStreamerImpl.java          |  71 ++++++--
 .../ignite/internal/util/IgniteUtils.java       |  21 +++
 .../datastreamer/DataStreamerTimeoutTest.java   | 163 +++++++++++++++++++
 .../ignite/testsuites/IgniteCacheTestSuite.java |   2 +
 6 files changed, 335 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/10224dfe/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
index 20d0057..887443f 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
@@ -96,6 +96,9 @@ public interface IgniteDataStreamer<K, V> extends AutoCloseable {
     /** Default per node buffer size. */
     public static final int DFLT_PER_NODE_BUFFER_SIZE = 1024;
 
+    /** Default timeout for streamer's operations. */
+    public static final long DFLT_UNLIMIT_TIMEOUT = -1;
+
     /**
      * Name of cache to stream data to.
      *
@@ -198,6 +201,29 @@ public interface IgniteDataStreamer<K, V> extends AutoCloseable
{
     public void perNodeParallelOperations(int parallelOps);
 
     /**
+     * Sets the timeout that is used in the following cases:
+     * <ul>
+     * <li>any data addition method can be blocked when all per node parallel operations
are exhausted.
+     * The timeout defines the max time you will be blocked waiting for a permit to add a
chunk of data
+     * into the streamer;</li>
+     * <li>Total timeout time for {@link #flush()} operation;</li>
+     * <li>Total timeout time for {@link #close()} operation.</li>
+     * </ul>
+     * By default the timeout is disabled.
+     *
+     * @param timeout Timeout in milliseconds.
+     * @throws IllegalArgumentException If {@param timeout} is zero or less than {@code -1}.
+     */
+    public void timeout(long timeout);
+
+    /**
+     * Gets timeout set by {@link #timeout(long)}.
+     *
+     * @return Timeout in milliseconds.
+     */
+    public long timeout();
+
+    /**
      * Gets automatic flush frequency. Essentially, this is the time after which the
      * streamer will make an attempt to submit all data added so far to remote nodes.
      * Note that there is no guarantee that data will be delivered after this concrete
@@ -287,10 +313,11 @@ public interface IgniteDataStreamer<K, V> extends AutoCloseable
{
      * @throws IgniteInterruptedException If thread has been interrupted.
      * @throws IllegalStateException If grid has been concurrently stopped or
      *      {@link #close(boolean)} has already been called on streamer.
+     * @throws IgniteDataStreamerTimeoutException If {@code timeout} is exceeded.
      * @see #allowOverwrite()
      */
     public IgniteFuture<?> addData(K key, @Nullable V val) throws CacheException, IgniteInterruptedException,
-        IllegalStateException;
+        IllegalStateException, IgniteDataStreamerTimeoutException;
 
     /**
      * Adds data for streaming on remote node. This method can be called from multiple
@@ -310,10 +337,11 @@ public interface IgniteDataStreamer<K, V> extends AutoCloseable
{
      * @throws IgniteInterruptedException If thread has been interrupted.
      * @throws IllegalStateException If grid has been concurrently stopped or
      *      {@link #close(boolean)} has already been called on streamer.
+     * @throws IgniteDataStreamerTimeoutException If {@code timeout} is exceeded.
      * @see #allowOverwrite()
      */
     public IgniteFuture<?> addData(Map.Entry<K, V> entry) throws CacheException,
IgniteInterruptedException,
-        IllegalStateException;
+        IllegalStateException, IgniteDataStreamerTimeoutException;
 
     /**
      * Adds data for streaming on remote node. This method can be called from multiple
@@ -330,10 +358,12 @@ public interface IgniteDataStreamer<K, V> extends AutoCloseable
{
      * @param entries Collection of entries to be streamed.
      * @throws IllegalStateException If grid has been concurrently stopped or
      *      {@link #close(boolean)} has already been called on streamer.
+     * @throws IgniteDataStreamerTimeoutException If {@code timeout} is exceeded.
      * @return Future for this stream operation.
      * @see #allowOverwrite()
      */
-    public IgniteFuture<?> addData(Collection<? extends Map.Entry<K, V>>
entries) throws IllegalStateException;
+    public IgniteFuture<?> addData(Collection<? extends Map.Entry<K, V>>
entries) throws IllegalStateException,
+        IgniteDataStreamerTimeoutException;
 
     /**
      * Adds data for streaming on remote node. This method can be called from multiple
@@ -350,10 +380,12 @@ public interface IgniteDataStreamer<K, V> extends AutoCloseable
{
      * @param entries Map to be streamed.
      * @throws IllegalStateException If grid has been concurrently stopped or
      *      {@link #close(boolean)} has already been called on streamer.
+     * @throws IgniteDataStreamerTimeoutException If {@code timeout} is exceeded.
      * @return Future for this stream operation.
      * @see #allowOverwrite()
      */
-    public IgniteFuture<?> addData(Map<K, V> entries) throws IllegalStateException;
+    public IgniteFuture<?> addData(Map<K, V> entries) throws IllegalStateException,
+        IgniteDataStreamerTimeoutException;
 
     /**
      * Streams any remaining data, but doesn't close the streamer. Data can be still added
after
@@ -368,9 +400,11 @@ public interface IgniteDataStreamer<K, V> extends AutoCloseable
{
      * @throws IgniteInterruptedException If thread has been interrupted.
      * @throws IllegalStateException If grid has been concurrently stopped or
      *      {@link #close(boolean)} has already been called on streamer.
+     * @throws IgniteDataStreamerTimeoutException If {@code timeout} is exceeded.
      * @see #tryFlush()
      */
-    public void flush() throws CacheException, IgniteInterruptedException, IllegalStateException;
+    public void flush() throws CacheException, IgniteInterruptedException, IllegalStateException,
+        IgniteDataStreamerTimeoutException;
 
     /**
      * Makes an attempt to stream remaining data. This method is mostly similar to {@link
#flush},
@@ -390,8 +424,10 @@ public interface IgniteDataStreamer<K, V> extends AutoCloseable
{
      * @param cancel {@code True} to cancel ongoing streaming operations.
      * @throws CacheException If failed to map key to node.
      * @throws IgniteInterruptedException If thread has been interrupted.
+     * @throws IgniteDataStreamerTimeoutException If {@code timeout} is exceeded, only if
{@param cancel} is {@code false}.
      */
-    public void close(boolean cancel) throws CacheException, IgniteInterruptedException;
+    public void close(boolean cancel) throws CacheException, IgniteInterruptedException,
+        IgniteDataStreamerTimeoutException;
 
     /**
      * Closes data streamer. This method is identical to calling {@link #close(boolean) close(false)}
method.
@@ -401,7 +437,9 @@ public interface IgniteDataStreamer<K, V> extends AutoCloseable
{
      *
      * @throws CacheException If failed to close data streamer.
      * @throws IgniteInterruptedException If thread has been interrupted.
+     * @throws IgniteDataStreamerTimeoutException If {@code timeout} is exceeded.
      */
-    @Override public void close() throws CacheException, IgniteInterruptedException;
+    @Override public void close() throws CacheException, IgniteInterruptedException,
+        IgniteDataStreamerTimeoutException;
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/10224dfe/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamerTimeoutException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamerTimeoutException.java
b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamerTimeoutException.java
new file mode 100644
index 0000000..c6c7367
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamerTimeoutException.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite;
+
+/**
+ * Exception is thrown when timeout of some {@link IgniteDataStreamer} operations occurs.
+ */
+public class IgniteDataStreamerTimeoutException extends IgniteException {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * Creates new timeout exception with given error message.
+     *
+     * @param msg Error message.
+     */
+    public IgniteDataStreamerTimeoutException(String msg) {
+        super(msg);
+    }
+
+    /**
+     * Creates new timeout exception with given error message and optional nested exception.
+     *
+     * @param msg Error message.
+     * @param cause Optional nested exception (can be {@code null}).
+     */
+    public IgniteDataStreamerTimeoutException(String msg, Throwable cause) {
+        super(msg, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10224dfe/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 9dc6a7f..21df559 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -46,6 +46,7 @@ import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteInterruptedException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteDataStreamerTimeoutException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.cluster.ClusterTopologyException;
 import org.apache.ignite.configuration.CacheConfiguration;
@@ -53,6 +54,7 @@ import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
+import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
@@ -147,6 +149,9 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K,
V>, Delayed
     private int parallelOps = DFLT_MAX_PARALLEL_OPS;
 
     /** */
+    private long timeout = DFLT_UNLIMIT_TIMEOUT;
+
+    /** */
     private long autoFlushFreq;
 
     /** Mapping. */
@@ -453,6 +458,19 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K,
V>, Delayed
     }
 
     /** {@inheritDoc} */
+    @Override public void timeout(long timeout) {
+        if (timeout < -1 || timeout == 0)
+            throw new IllegalArgumentException();
+
+        this.timeout = timeout;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long timeout() {
+        return this.timeout;
+    }
+
+    /** {@inheritDoc} */
     @Override public long autoFlushFrequency() {
         return autoFlushFreq;
     }
@@ -517,6 +535,9 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K,
V>, Delayed
 
             return new IgniteCacheFutureImpl<>(resFut);
         }
+        catch (IgniteDataStreamerTimeoutException e) {
+            throw e;
+        }
         catch (IgniteException e) {
             return new IgniteFinishedFutureImpl<>(e);
         }
@@ -572,7 +593,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K,
V>, Delayed
         catch (Throwable e) {
             resFut.onDone(e);
 
-            if (e instanceof Error)
+            if (e instanceof Error || e instanceof IgniteDataStreamerTimeoutException)
                 throw e;
 
             return new IgniteFinishedFutureImpl<>(e);
@@ -854,9 +875,20 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K,
V>, Delayed
 
                 boolean err = false;
 
+                long startTimeMillis = U.currentTimeMillis();
+
                 for (IgniteInternalFuture fut = q.poll(); fut != null; fut = q.poll()) {
                     try {
-                        fut.get();
+                        if (timeout == DFLT_UNLIMIT_TIMEOUT)
+                            fut.get();
+                        else {
+                            long timeRemain = timeout - U.currentTimeMillis() + startTimeMillis;
+
+                            if (timeRemain <= 0)
+                                throw new IgniteDataStreamerTimeoutException("Data streamer
exceeded timeout on flush.");
+
+                            fut.get(timeRemain);
+                        }
                     }
                     catch (IgniteClientDisconnectedCheckedException e) {
                         if (log.isDebugEnabled())
@@ -864,6 +896,12 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K,
V>, Delayed
 
                         throw CU.convertToCacheException(e);
                     }
+                    catch (IgniteFutureTimeoutCheckedException e) {
+                        if (log.isDebugEnabled())
+                            log.debug("Failed to flush buffer: " + e);
+
+                        throw new IgniteDataStreamerTimeoutException("Data streamer exceeded
timeout on flush.", e);
+                    }
                     catch (IgniteCheckedException e) {
                         if (log.isDebugEnabled())
                             log.debug("Failed to flush buffer: " + e);
@@ -976,8 +1014,6 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K,
V>, Delayed
         if (log.isDebugEnabled())
             log.debug("Closing data streamer [ldr=" + this + ", cancel=" + cancel + ']');
 
-        IgniteCheckedException e = null;
-
         try {
             // Assuming that no methods are called on this loader after this method is called.
             if (cancel) {
@@ -993,14 +1029,12 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K,
V>, Delayed
 
             ctx.io().removeMessageListener(topic);
         }
-        catch (IgniteCheckedException e0) {
-            e = e0;
+        catch (IgniteCheckedException | IgniteDataStreamerTimeoutException e) {
+            fut.onDone(e);
+            throw e;
         }
 
-        fut.onDone(null, e != null ? e : err);
-
-        if (e != null)
-            throw e;
+        fut.onDone(err);
     }
 
     /**
@@ -1242,7 +1276,15 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K,
V>, Delayed
          * @throws IgniteInterruptedCheckedException If thread has been interrupted.
          */
         private void incrementActiveTasks() throws IgniteInterruptedCheckedException {
-            U.acquire(sem);
+            if (timeout == DFLT_UNLIMIT_TIMEOUT)
+                U.acquire(sem);
+            else
+                if (!U.tryAcquire(sem, timeout, TimeUnit.MILLISECONDS)) {
+                    if (log.isDebugEnabled())
+                        log.debug("Failed to add parallel operation.");
+
+                    throw new IgniteDataStreamerTimeoutException("Data streamer exceeded
timeout when starts parallel operation.");
+                }
         }
 
         /**
@@ -1268,7 +1310,12 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K,
V>, Delayed
             assert !entries.isEmpty();
             assert curFut != null;
 
-            incrementActiveTasks();
+            try {
+                incrementActiveTasks();
+            } catch (IgniteDataStreamerTimeoutException e) {
+                curFut.onDone(e);
+                throw e;
+            }
 
             IgniteInternalFuture<Object> fut;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/10224dfe/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index fa18923..08c9219 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -7447,6 +7447,27 @@ public abstract class IgniteUtils {
     }
 
     /**
+     * Tries to acquire a permit from provided semaphore during {@code timeout}.
+     *
+     * @param sem Semaphore.
+     * @param timeout The maximum time to wait.
+     * @param unit The unit of the {@code time} argument.
+     * @throws org.apache.ignite.internal.IgniteInterruptedCheckedException Wrapped {@link
InterruptedException}.
+     * @return {@code True} if acquires a permit, {@code false} another.
+     */
+    public static boolean tryAcquire(Semaphore sem, long timeout, TimeUnit unit)
+        throws IgniteInterruptedCheckedException {
+        try {
+            return sem.tryAcquire(timeout, unit);
+        }
+        catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+
+            throw new IgniteInterruptedCheckedException(e);
+        }
+    }
+
+    /**
      * Gets cache attributes for the node.
      *
      * @param n Node to get cache attributes for.

http://git-wip-us.apache.org/repos/asf/ignite/blob/10224dfe/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerTimeoutTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerTimeoutTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerTimeoutTest.java
new file mode 100644
index 0000000..4e981b7
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerTimeoutTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.datastreamer;
+
+import java.util.Collection;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteDataStreamerTimeoutException;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.stream.StreamReceiver;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ * Test timeout for Data streamer.
+ */
+public class DataStreamerTimeoutTest extends GridCommonAbstractTest {
+
+    /** Cache name. */
+    public static final String CACHE_NAME = "cacheName";
+
+    /** Timeout. */
+    public static final int TIMEOUT = 1_000;
+
+    /** Amount of entries. */
+    public static final int ENTRY_AMOUNT = 100;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setCacheConfiguration(cacheConfiguration());
+
+        return cfg;
+    }
+
+    /**
+     * Gets cache configuration.
+     *
+     * @return Cache configuration.
+     */
+    private CacheConfiguration cacheConfiguration() {
+        CacheConfiguration cacheCfg = defaultCacheConfiguration();
+
+        cacheCfg.setCacheMode(PARTITIONED);
+        cacheCfg.setBackups(1);
+        cacheCfg.setWriteSynchronizationMode(FULL_SYNC);
+        cacheCfg.setName(CACHE_NAME);
+
+        return cacheCfg;
+    }
+
+    /**
+     * Test timeout on {@code DataStreamer.addData()} method
+     * @throws Exception If fail.
+     */
+    public void testTimeoutOnCloseMethod() throws Exception {
+        Ignite ignite = startGrid(1);
+
+        boolean thrown = false;
+
+        try (IgniteDataStreamer ldr = ignite.dataStreamer(CACHE_NAME)) {
+            ldr.timeout(TIMEOUT);
+            ldr.receiver(new TestDataReceiver());
+            ldr.perNodeBufferSize(ENTRY_AMOUNT);
+
+            for (int i=0; i < ENTRY_AMOUNT; i++)
+                ldr.addData(i, i);
+
+        }
+        catch (IgniteDataStreamerTimeoutException e) {
+            assertEquals(e.getMessage(), "Data streamer exceeded timeout on flush.");
+            thrown = true;
+        }
+        finally {
+            stopAllGrids();
+        }
+
+        assertTrue(thrown);
+    }
+
+    /**
+     * Test timeout on {@code DataStreamer.close()} method
+     * @throws Exception If fail.
+     */
+    public void testTimeoutOnAddDataMethod() throws Exception {
+        Ignite ignite = startGrid(1);
+
+        boolean thrown = false;
+
+        IgniteDataStreamer ldr = ignite.dataStreamer(CACHE_NAME);
+
+        try {
+            ldr.timeout(TIMEOUT);
+            ldr.receiver(new TestDataReceiver());
+            ldr.perNodeBufferSize(ENTRY_AMOUNT/2);
+            ldr.perNodeParallelOperations(1);
+
+            try {
+                for (int i=0; i < ENTRY_AMOUNT; i++)
+                    ldr.addData(i, i);
+            }
+            catch (IgniteDataStreamerTimeoutException e) {
+                assertEquals(e.getMessage(), "Data streamer exceeded timeout when starts
parallel operation.");
+
+                thrown = true;
+            }
+
+        }
+        finally {
+            if (thrown)
+                ldr.close(true);
+
+            stopAllGrids();
+        }
+
+        assertTrue(thrown);
+    }
+
+    /**
+     * Test receiver for timeout expiration emulation.
+     */
+    private static class TestDataReceiver implements StreamReceiver {
+
+        /** Is first. */
+        boolean isFirst = true;
+
+        /** {@inheritDoc} */
+        @Override public void receive(IgniteCache cache, Collection collection) throws IgniteException
{
+            try {
+                if (isFirst)
+                    U.sleep(2 * TIMEOUT);
+
+                isFirst = false;
+            }
+            catch (IgniteInterruptedCheckedException e) {
+                throw new IgniteException(e);
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10224dfe/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index 33aae9a..0b9345d 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -138,6 +138,7 @@ import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessorSel
 import org.apache.ignite.internal.processors.datastreamer.DataStreamerImplSelfTest;
 import org.apache.ignite.internal.processors.datastreamer.DataStreamerMultiThreadedSelfTest;
 import org.apache.ignite.internal.processors.datastreamer.DataStreamerMultinodeCreateCacheTest;
+import org.apache.ignite.internal.processors.datastreamer.DataStreamerTimeoutTest;
 import org.apache.ignite.internal.processors.datastreamer.DataStreamerUpdateAfterLoadTest;
 import org.apache.ignite.testframework.GridTestUtils;
 
@@ -244,6 +245,7 @@ public class IgniteCacheTestSuite extends TestSuite {
             suite.addTestSuite(DataStreamerMultiThreadedSelfTest.class);
         suite.addTestSuite(DataStreamerMultinodeCreateCacheTest.class);
         suite.addTestSuite(DataStreamerImplSelfTest.class);
+        suite.addTestSuite(DataStreamerTimeoutTest.class);
         GridTestUtils.addTestIfNeeded(suite, GridCacheEntryMemorySizeSelfTest.class, ignoredTests);
         suite.addTestSuite(GridCacheClearAllSelfTest.class);
         suite.addTestSuite(GridCacheObjectToStringSelfTest.class);


Mime
View raw message