ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [58/69] [abbrv] ignite git commit: IGNITE-3665: Updated KafkaStreamer dependencies. - Fixes #957.
Date Tue, 13 Sep 2016 09:53:50 GMT
IGNITE-3665: Updated KafkaStreamer dependencies. - Fixes #957.

Signed-off-by: shtykh_roman <rshtykh@yahoo.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/843979db
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/843979db
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/843979db

Branch: refs/heads/ignite-3199-1
Commit: 843979dbbdf65bc005d2cda36fe94b68eb8fd5e4
Parents: 39ec7d0
Author: shtykh_roman <rshtykh@yahoo.com>
Authored: Fri Sep 9 17:14:21 2016 +0900
Committer: shtykh_roman <rshtykh@yahoo.com>
Committed: Fri Sep 9 17:14:21 2016 +0900

----------------------------------------------------------------------
 .../stream/kafka/connect/IgniteSinkConnector.java    |  9 +++++++++
 .../stream/kafka/connect/IgniteSourceConnector.java  |  9 +++++++++
 .../kafka/connect/IgniteSinkConnectorTest.java       | 15 ++++++++++-----
 .../kafka/connect/IgniteSourceConnectorTest.java     | 14 ++++++++------
 modules/osgi-karaf/src/main/resources/features.xml   | 12 ++++++------
 parent/pom.xml                                       |  4 +---
 6 files changed, 43 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/843979db/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnector.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnector.java
b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnector.java
index 9385920..3fbfd9c 100644
--- a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnector.java
+++ b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnector.java
@@ -22,6 +22,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.errors.ConnectException;
@@ -34,6 +35,9 @@ public class IgniteSinkConnector extends SinkConnector {
     /** Sink properties. */
     private Map<String, String> configProps;
 
+    /** Expected configurations. */
+    private static final ConfigDef CONFIG_DEF = new ConfigDef();
+
     /** {@inheritDoc} */
     @Override public String version() {
         return AppInfoParser.getVersion();
@@ -88,4 +92,9 @@ public class IgniteSinkConnector extends SinkConnector {
     @Override public void stop() {
         // No-op.
     }
+
+    /** {@inheritDoc} */
+    @Override public ConfigDef config() {
+        return CONFIG_DEF;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/843979db/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnector.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnector.java
b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnector.java
index 59e2ed0..986888e 100644
--- a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnector.java
+++ b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnector.java
@@ -22,6 +22,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.errors.ConnectException;
@@ -36,6 +37,9 @@ public class IgniteSourceConnector extends SourceConnector {
     /** Source properties. */
     private Map<String, String> configProps;
 
+    /** Expected configurations. */
+    private static final ConfigDef CONFIG_DEF = new ConfigDef();
+
     /** {@inheritDoc} */
     @Override public String version() {
         return AppInfoParser.getVersion();
@@ -78,4 +82,9 @@ public class IgniteSourceConnector extends SourceConnector {
     @Override public void stop() {
         // No-op.
     }
+
+    /** {@inheritDoc} */
+    @Override public ConfigDef config() {
+        return CONFIG_DEF;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/843979db/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java
b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java
index 1814c69..efa2fa2 100644
--- a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java
+++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java
@@ -32,6 +32,7 @@ import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.stream.kafka.TestKafkaBroker;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
@@ -40,12 +41,12 @@ import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
+import org.apache.kafka.connect.sink.SinkConnector;
 import org.apache.kafka.connect.storage.OffsetBackingStore;
 import org.apache.kafka.connect.util.Callback;
 import org.apache.kafka.connect.util.FutureCallback;
 
 import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
-import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.mock;
 
 /**
@@ -59,7 +60,7 @@ public class IgniteSinkConnectorTest extends GridCommonAbstractTest {
     private static final String CACHE_NAME = "testCache";
 
     /** Test topics. */
-    private static final String[] TOPICS = {"test1", "test2"};
+    private static final String[] TOPICS = {"sink-test1", "sink-test2"};
 
     /** Kafka partition. */
     private static final int PARTITIONS = 3;
@@ -67,6 +68,9 @@ public class IgniteSinkConnectorTest extends GridCommonAbstractTest {
     /** Kafka replication factor. */
     private static final int REPLICATION_FACTOR = 1;
 
+    /** Worker id. */
+    private static final String WORKER_ID = "workerId";
+
     /** Test Kafka broker. */
     private TestKafkaBroker kafkaBroker;
 
@@ -96,9 +100,9 @@ public class IgniteSinkConnectorTest extends GridCommonAbstractTest {
         WorkerConfig workerCfg = new StandaloneConfig(makeWorkerProps());
 
         OffsetBackingStore offBackingStore = mock(OffsetBackingStore.class);
-        offBackingStore.configure(anyObject(Map.class));
+        offBackingStore.configure(workerCfg);
 
-        worker = new Worker(workerCfg, offBackingStore);
+        worker = new Worker(WORKER_ID, new SystemTime(), workerCfg, offBackingStore);
         worker.start();
 
         herder = new StandaloneHerder(worker);
@@ -211,7 +215,7 @@ public class IgniteSinkConnectorTest extends GridCommonAbstractTest {
     private Map<String, String> makeSinkProps(String topics) {
         Map<String, String> props = new HashMap<>();
 
-        props.put(ConnectorConfig.TOPICS_CONFIG, topics);
+        props.put(SinkConnector.TOPICS_CONFIG, topics);
         props.put(ConnectorConfig.TASKS_MAX_CONFIG, "2");
         props.put(ConnectorConfig.NAME_CONFIG, "test-sink-connector");
         props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, IgniteSinkConnector.class.getName());
@@ -239,6 +243,7 @@ public class IgniteSinkConnectorTest extends GridCommonAbstractTest {
         props.put("key.converter.schemas.enable", "false");
         props.put("value.converter.schemas.enable", "false");
         props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker.getBrokerAddress());
+        props.put("offset.storage.file.filename", "/tmp/connect.offsets");
         // fast flushing for testing.
         props.put(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG, "10");
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/843979db/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java
b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java
index 7cdb09c..a3ce10e 100644
--- a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java
+++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java
@@ -26,13 +26,11 @@ import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CachePeekMode;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.events.CacheEvent;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.stream.kafka.TestKafkaBroker;
@@ -43,6 +41,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
@@ -68,7 +67,10 @@ public class IgniteSourceConnectorTest extends GridCommonAbstractTest {
     private static final String CACHE_NAME = "testCache";
 
     /** Test topics created by connector. */
-    private static final String[] TOPICS = {"test1", "test2"};
+    private static final String[] TOPICS = {"src-test1", "src-test2"};
+
+    /** Worker id. */
+    private static final String WORKER_ID = "workerId";
 
     /** Test Kafka broker. */
     private TestKafkaBroker kafkaBroker;
@@ -104,9 +106,9 @@ public class IgniteSourceConnectorTest extends GridCommonAbstractTest
{
         WorkerConfig workerCfg = new StandaloneConfig(makeWorkerProps());
 
         MemoryOffsetBackingStore offBackingStore = new MemoryOffsetBackingStore();
-        offBackingStore.configure(workerCfg.originals());
+        offBackingStore.configure(workerCfg);
 
-        worker = new Worker(workerCfg, offBackingStore);
+        worker = new Worker(WORKER_ID, new SystemTime(), workerCfg, offBackingStore);
         worker.start();
 
         herder = new StandaloneHerder(worker);
@@ -280,7 +282,6 @@ public class IgniteSourceConnectorTest extends GridCommonAbstractTest
{
                 }
             }, 20_000);
 
-
             info("Waiting for unexpected records for 5 secs.");
 
             assertFalse(GridTestUtils.waitForCondition(new GridAbsPredicate() {
@@ -345,6 +346,7 @@ public class IgniteSourceConnectorTest extends GridCommonAbstractTest
{
         props.put("key.converter.schemas.enable", "false");
         props.put("value.converter.schemas.enable", "false");
         props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker.getBrokerAddress());
+        props.put("offset.storage.file.filename", "/tmp/connect.offsets");
         // fast flushing for testing.
         props.put(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG, "10");
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/843979db/modules/osgi-karaf/src/main/resources/features.xml
----------------------------------------------------------------------
diff --git a/modules/osgi-karaf/src/main/resources/features.xml b/modules/osgi-karaf/src/main/resources/features.xml
index 584429d..0f761f1 100644
--- a/modules/osgi-karaf/src/main/resources/features.xml
+++ b/modules/osgi-karaf/src/main/resources/features.xml
@@ -154,7 +154,7 @@
 
     <feature name="ignite-kafka" version="${project.version}" description="Apache Ignite
:: Kafka">
         <details>
-            <![CDATA[The Apache Ignite Kafka module + dependencies. This module installs
the Scala 2.1 library bundle.]]>
+            <![CDATA[The Apache Ignite Kafka module + dependencies. This module installs
the Scala 2.11 library bundle.]]>
         </details>
         <feature prerequisite="true">wrap</feature>
         <bundle start="true" dependency="true">mvn:org.scala-lang/scala-library/${scala211.library.version}</bundle>
@@ -190,14 +190,14 @@
     </feature>   
 
     <feature name="ignite-rest-http" version="${project.version}" description="Apache
Ignite :: REST HTTP">
-         <!-- NOTICE: XOM cannot be included by default due to an incompatible license;

+         <!-- NOTICE: XOM cannot be included by default due to an incompatible license;
                       please review its license model and install the dependency manually
if you agree. -->
         <details>
-            <![CDATA[The Apache Ignite REST HTTP module + dependencies. 
-            
+            <![CDATA[The Apache Ignite REST HTTP module + dependencies.
+
             Installing this feature will trigger the installation of the 'http' feature from
the Apache Karaf distribution.
-            
-            NOTE: Before using this feature you must review the license of the XOM bundle
and install it manually if you accept it: 
+
+            NOTE: Before using this feature you must review the license of the XOM bundle
and install it manually if you accept it:
             install -s mvn:xom/xom/1.2.5]]>
         </details>
         <feature dependency="true">http</feature>

http://git-wip-us.apache.org/repos/asf/ignite/blob/843979db/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index 246c36b..b7b5be2 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -86,9 +86,7 @@
         <jsonlib.bundle.version>2.4_1</jsonlib.bundle.version>
         <jsonlib.version>2.4</jsonlib.version>
         <jtidy.version>r938</jtidy.version>
-        <kafka.bundle.version>0.9.0.0_1</kafka.bundle.version>
-        <kafka.clients.bundle.version>0.9.0.0_1</kafka.clients.bundle.version>
-        <kafka.version>0.9.0.0</kafka.version>
+        <kafka.version>0.10.0.1</kafka.version>
         <karaf.version>4.0.2</karaf.version>
         <lucene.bundle.version>3.5.0_1</lucene.bundle.version>
         <lucene.version>3.5.0</lucene.version>


Mime
View raw message