ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a.@apache.org
Subject [35/50] [abbrv] ignite git commit: IGNITE-8697 Flink sink throws java.lang.IllegalArgumentException when running in flink cluster mode. - Fixes #4398.
Date Fri, 03 Aug 2018 09:58:48 GMT
IGNITE-8697 Flink sink throws java.lang.IllegalArgumentException when running in flink cluster
mode. - Fixes #4398.

Signed-off-by: Dmitriy Pavlov <dpavlov@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/891da2a5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/891da2a5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/891da2a5

Branch: refs/heads/ignite-8446
Commit: 891da2a5b61e2aa70a0decf5afca91f5294d94b9
Parents: 137dd06
Author: samaitra <saikat.maitra@gmail.com>
Authored: Wed Aug 1 21:02:02 2018 +0300
Committer: Dmitriy Pavlov <dpavlov@apache.org>
Committed: Wed Aug 1 21:02:02 2018 +0300

----------------------------------------------------------------------
 .../apache/ignite/sink/flink/IgniteSink.java    |  88 +++++------
 .../sink/flink/FlinkIgniteSinkSelfTest.java     | 154 +++----------------
 .../flink/src/test/resources/example-ignite.xml |   7 +-
 3 files changed, 71 insertions(+), 178 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/891da2a5/modules/flink/src/main/java/org/apache/ignite/sink/flink/IgniteSink.java
----------------------------------------------------------------------
diff --git a/modules/flink/src/main/java/org/apache/ignite/sink/flink/IgniteSink.java b/modules/flink/src/main/java/org/apache/ignite/sink/flink/IgniteSink.java
index 2f18f80..ad29490 100644
--- a/modules/flink/src/main/java/org/apache/ignite/sink/flink/IgniteSink.java
+++ b/modules/flink/src/main/java/org/apache/ignite/sink/flink/IgniteSink.java
@@ -18,10 +18,12 @@
 package org.apache.ignite.sink.flink;
 
 import java.util.Map;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteIllegalStateException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.Ignition;
 import org.apache.ignite.internal.util.typedef.internal.A;
@@ -34,7 +36,7 @@ public class IgniteSink<IN> extends RichSinkFunction<IN> {
     private static final long DFLT_FLUSH_FREQ = 10000L;
 
     /** Logger. */
-    private final transient IgniteLogger log;
+    private transient IgniteLogger log;
 
     /** Automatic flush frequency. */
     private long autoFlushFrequency = DFLT_FLUSH_FREQ;
@@ -43,13 +45,19 @@ public class IgniteSink<IN> extends RichSinkFunction<IN> {
     private boolean allowOverwrite = false;
 
     /** Flag for stopped state. */
-    private static volatile boolean stopped = true;
+    private volatile boolean stopped = true;
+
+    /** Ignite instance. */
+    protected transient Ignite ignite;
+
+    /** Ignite Data streamer instance. */
+    protected transient IgniteDataStreamer streamer;
 
     /** Ignite grid configuration file. */
-    private static String igniteCfgFile;
+    protected final String igniteCfgFile;
 
     /** Cache name. */
-    private static String cacheName;
+    protected final String cacheName;
 
     /**
      * Gets the cache name.
@@ -70,6 +78,15 @@ public class IgniteSink<IN> extends RichSinkFunction<IN> {
     }
 
     /**
+     * Gets the Ignite instance.
+     *
+     * @return Ignite instance.
+     */
+    public Ignite getIgnite() {
+        return ignite;
+    }
+
+    /**
      * Obtains data flush frequency.
      *
      * @return Flush frequency.
@@ -109,12 +126,10 @@ public class IgniteSink<IN> extends RichSinkFunction<IN>
{
      * Default IgniteSink constructor.
      *
      * @param cacheName Cache name.
-     * @param igniteCfgFile Ignite configuration file.
      */
     public IgniteSink(String cacheName, String igniteCfgFile) {
         this.cacheName = cacheName;
         this.igniteCfgFile = igniteCfgFile;
-        this.log = SinkContext.getIgnite().log();
     }
 
     /**
@@ -122,13 +137,26 @@ public class IgniteSink<IN> extends RichSinkFunction<IN>
{
      *
      * @throws IgniteException If failed.
      */
+    @Override
     @SuppressWarnings("unchecked")
-    public void start() throws IgniteException {
+    public void open(Configuration parameter) {
         A.notNull(igniteCfgFile, "Ignite config file");
         A.notNull(cacheName, "Cache name");
 
-        SinkContext.getStreamer().autoFlushFrequency(autoFlushFrequency);
-        SinkContext.getStreamer().allowOverwrite(allowOverwrite);
+        try {
+            // if an ignite instance is already started in same JVM then use it.
+            this.ignite = Ignition.ignite();
+        } catch (IgniteIllegalStateException e) {
+            this.ignite = Ignition.start(igniteCfgFile);
+        }
+
+        this.ignite.getOrCreateCache(cacheName);
+
+        this.log = this.ignite.log();
+
+        this.streamer = this.ignite.dataStreamer(cacheName);
+        this.streamer.autoFlushFrequency(autoFlushFrequency);
+        this.streamer.allowOverwrite(allowOverwrite);
 
         stopped = false;
     }
@@ -138,15 +166,14 @@ public class IgniteSink<IN> extends RichSinkFunction<IN>
{
      *
      * @throws IgniteException If failed.
      */
-    public void stop() throws IgniteException {
+    @Override
+    public void close() {
         if (stopped)
             return;
 
         stopped = true;
 
-        SinkContext.getStreamer().close();
-        SinkContext.getIgnite().cache(cacheName).close();
-        SinkContext.getIgnite().close();
+        this.streamer.close();
     }
 
     /**
@@ -162,43 +189,10 @@ public class IgniteSink<IN> extends RichSinkFunction<IN>
{
             if (!(in instanceof Map))
                 throw new IgniteException("Map as a streamer input is expected!");
 
-            SinkContext.getStreamer().addData((Map)in);
+            this.streamer.addData((Map)in);
         }
         catch (Exception e) {
             log.error("Error while processing IN of " + cacheName, e);
         }
     }
-
-    /**
-     * Streamer context initializing grid and data streamer instances on demand.
-     */
-    private static class SinkContext {
-        /** Constructor. */
-        private SinkContext() {
-        }
-
-        /** Instance holder. */
-        private static class Holder {
-            private static final Ignite IGNITE = Ignition.start(igniteCfgFile);
-            private static final IgniteDataStreamer STREAMER = IGNITE.dataStreamer(cacheName);
-        }
-
-        /**
-         * Obtains grid instance.
-         *
-         * @return Grid instance.
-         */
-        private static Ignite getIgnite() {
-            return Holder.IGNITE;
-        }
-
-        /**
-         * Obtains data streamer instance.
-         *
-         * @return Data streamer instance.
-         */
-        private static IgniteDataStreamer getStreamer() {
-            return Holder.STREAMER;
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/891da2a5/modules/flink/src/test/java/org/apache/ignite/sink/flink/FlinkIgniteSinkSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/flink/src/test/java/org/apache/ignite/sink/flink/FlinkIgniteSinkSelfTest.java
b/modules/flink/src/test/java/org/apache/ignite/sink/flink/FlinkIgniteSinkSelfTest.java
index 50eedb8..eb59379 100644
--- a/modules/flink/src/test/java/org/apache/ignite/sink/flink/FlinkIgniteSinkSelfTest.java
+++ b/modules/flink/src/test/java/org/apache/ignite/sink/flink/FlinkIgniteSinkSelfTest.java
@@ -19,19 +19,11 @@ package org.apache.ignite.sink.flink;
 
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.CountDownLatch;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.events.CacheEvent;
-import org.apache.ignite.events.EventType;
-import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
-import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
-
 /**
  * Tests for {@link IgniteSink}.
  */
@@ -39,147 +31,51 @@ public class FlinkIgniteSinkSelfTest extends GridCommonAbstractTest {
     /** Cache name. */
     private static final String TEST_CACHE = "testCache";
 
-    /** Cache entries count. */
-    private static final int CACHE_ENTRY_COUNT = 10000;
-
-    /** Streaming events for testing. */
-    private static final long DFLT_STREAMING_EVENT = 10000;
-
-    /** Ignite instance. */
-    private Ignite ignite;
-
     /** Ignite test configuration file. */
     private static final String GRID_CONF_FILE = "modules/flink/src/test/resources/example-ignite.xml";
 
-    /** {@inheritDoc} */
-    @Override protected long getTestTimeout() {
-        return 20_000;
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override protected void beforeTest() throws Exception {
-        IgniteConfiguration cfg = loadConfiguration(GRID_CONF_FILE);
-
-        cfg.setClientMode(false);
-
-        ignite = startGrid("igniteServerNode", cfg);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTest() throws Exception {
-        stopAllGrids();
-    }
-
-    /**
-     * Tests for the Flink sink.
-     * Ignite started in sink based on what is specified in the configuration file.
-     *
-     * @throws Exception
-     */
-    @SuppressWarnings("unchecked")
-    public void testFlinkIgniteSink() throws Exception {
-        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-        env.getConfig().disableSysoutLogging();
+    public void testIgniteSink() throws Exception {
+        Configuration configuration = new Configuration();
 
         IgniteSink igniteSink = new IgniteSink(TEST_CACHE, GRID_CONF_FILE);
 
         igniteSink.setAllowOverwrite(true);
 
-        igniteSink.setAutoFlushFrequency(10);
+        igniteSink.setAutoFlushFrequency(1L);
+
+        igniteSink.open(configuration);
 
-        igniteSink.start();
+        Map<String, String> myData = new HashMap<>();
+        myData.put("testData", "testValue");
 
-        CacheListener listener = subscribeToPutEvents();
+        igniteSink.invoke(myData);
 
-        DataStream<Map> stream = env.addSource(new SourceFunction<Map>() {
+        /** waiting for a small duration for the cache flush to complete */
+        Thread.sleep(2000);
 
-            private boolean running = true;
+        assertEquals("testValue", igniteSink.getIgnite().getOrCreateCache(TEST_CACHE).get("testData"));
+    }
 
-            @Override public void run(SourceContext<Map> ctx) throws Exception {
-                Map testDataMap = new HashMap<>();
-                long cnt = 0;
+    public void testIgniteSinkStreamExecution() throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
-                while (running && (cnt < DFLT_STREAMING_EVENT))  {
-                    testDataMap.put(cnt, "ignite-" + cnt);
-                    cnt++;
-                }
+        IgniteSink igniteSink = new IgniteSink(TEST_CACHE, GRID_CONF_FILE);
 
-                ctx.collect(testDataMap);
-            }
+        igniteSink.setAllowOverwrite(true);
 
-            @Override public void cancel() {
-                running = false;
-            }
-        }).setParallelism(1);
+        igniteSink.setAutoFlushFrequency(1);
 
-        assertEquals(0, ignite.cache(TEST_CACHE).size());
+        Map<String, String> myData = new HashMap<>();
+        myData.put("testdata", "testValue");
+        DataStream<Map> stream = env.fromElements(myData);
 
-        // sink data into the grid.
         stream.addSink(igniteSink);
-
         try {
             env.execute();
-
-            CountDownLatch latch = listener.getLatch();
-
-            // Enough events was handled in 10 seconds. Limited by test's timeout.
-            latch.await();
-
-            unsubscribeToPutEvents(listener);
-
-            assertEquals(DFLT_STREAMING_EVENT, ignite.getOrCreateCache(TEST_CACHE).size());
-
-            for (long i = 0; i < DFLT_STREAMING_EVENT; i++)
-                assertEquals("ignite-" + i, ignite.getOrCreateCache(TEST_CACHE).get(i));
-
-        }
-        finally {
-            igniteSink.stop();
         }
-    }
-
-    /**
-     * Sets a listener for {@link EventType#EVT_CACHE_OBJECT_PUT}.
-     *
-     * @return Cache listener.
-     */
-    private CacheListener subscribeToPutEvents() {
-        // Listen to cache PUT events and expect as many as messages as test data items.
-        CacheListener listener = new CacheListener();
-
-        ignite.events(ignite.cluster().forCacheNodes(DEFAULT_CACHE_NAME)).localListen(listener,
EVT_CACHE_OBJECT_PUT);
-
-        return listener;
-    }
-
-    /**
-     * Removes the listener for {@link EventType#EVT_CACHE_OBJECT_PUT}.
-     *
-     * @param listener Cache listener.
-     */
-    private void unsubscribeToPutEvents(CacheListener listener) {
-        ignite.events(ignite.cluster().forCacheNodes(DEFAULT_CACHE_NAME)).stopLocalListen(listener,
EVT_CACHE_OBJECT_PUT);
-    }
-
-    /** Listener. */
-    private class CacheListener implements IgnitePredicate<CacheEvent> {
-        private final CountDownLatch latch = new CountDownLatch(CACHE_ENTRY_COUNT);
-
-        /** @return Latch. */
-        public CountDownLatch getLatch() {
-            return latch;
-        }
-
-        /**
-         * @param evt Cache Event.
-         * @return {@code true}.
-         */
-        @Override public boolean apply(CacheEvent evt) {
-            latch.countDown();
-
-            return true;
+        catch (Exception e) {
+            e.printStackTrace();
+            fail("Stream execution process failed.");
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/891da2a5/modules/flink/src/test/resources/example-ignite.xml
----------------------------------------------------------------------
diff --git a/modules/flink/src/test/resources/example-ignite.xml b/modules/flink/src/test/resources/example-ignite.xml
index b8ddc8f..d4f4dc1 100644
--- a/modules/flink/src/test/resources/example-ignite.xml
+++ b/modules/flink/src/test/resources/example-ignite.xml
@@ -31,7 +31,7 @@
         http://www.springframework.org/schema/util/spring-util.xsd">
     <bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
         <!-- Enable client mode. -->
-        <property name="clientMode" value="true"/>
+        <property name="clientMode" value="false"/>
 
         <!-- Cache accessed from IgniteSink. -->
         <property name="cacheConfiguration">
@@ -49,6 +49,9 @@
             <list>
                 <!-- Cache events (only EVT_CACHE_OBJECT_PUT for tests). -->
                 <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/>
+                <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/>
+                <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/>
+
             </list>
         </property>
 
@@ -59,7 +62,7 @@
                     <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
                         <property name="addresses">
                             <list>
-                                <value>127.0.0.1:47500</value>
+                                <value>127.0.0.1:47500..47509</value>
                             </list>
                         </property>
                     </bean>


Mime
View raw message