apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vro...@apache.org
Subject [1/2] incubator-apex-malhar git commit: APEXMALHAR-1983 #resolve #comment split string in setter
Date Sat, 30 Jan 2016 06:02:13 GMT
Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/devel-3 ab800233b -> a44e81a83


APEXMALHAR-1983 #resolve #comment split string in setter


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/0627199c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/0627199c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/0627199c

Branch: refs/heads/devel-3
Commit: 0627199c8d3d299dfaf51e21d31859ff6322cde6
Parents: 9e77ef7
Author: Siyuan Hua <hsy541@apache.org>
Authored: Fri Jan 29 21:22:30 2016 -0800
Committer: Siyuan Hua <hsy541@apache.org>
Committed: Fri Jan 29 21:22:30 2016 -0800

----------------------------------------------------------------------
 .../malhar/kafka/AbstractKafkaInputOperator.java     | 15 ++++++++++-----
 .../apex/malhar/kafka/AbstractKafkaPartitioner.java  |  4 +++-
 .../apex/malhar/kafka/KafkaConsumerWrapper.java      |  2 ++
 .../org/apache/apex/malhar/kafka/KafkaMetrics.java   |  2 ++
 .../org/apache/apex/malhar/kafka/KafkaPartition.java |  3 +++
 .../apex/malhar/kafka/OneToManyPartitioner.java      |  2 ++
 .../apex/malhar/kafka/OneToOnePartitioner.java       |  2 ++
 .../apache/apex/malhar/kafka/PartitionStrategy.java  |  3 +++
 8 files changed, 27 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0627199c/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
index 512f058..89104a3 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
@@ -34,12 +34,15 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
 import org.apache.kafka.common.TopicPartition;
 
 import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
 
 import com.datatorrent.api.AutoMetric;
 import com.datatorrent.api.Context;
@@ -66,6 +69,7 @@ import com.datatorrent.api.StatsListener;
  *
  * @since 3.3.0
  */
+@InterfaceStability.Evolving
 public abstract class AbstractKafkaInputOperator implements InputOperator, Operator.ActivationListener<Context.OperatorContext>,
Operator.CheckpointListener, Partitioner<AbstractKafkaInputOperator>, StatsListener,
OffsetCommitCallback
 {
 
@@ -366,17 +370,18 @@ public abstract class AbstractKafkaInputOperator implements InputOperator,
Opera
     return Joiner.on(';').join(clusters);
   }
 
-  public void setTopics(String... topics)
+  public void setTopics(String topics)
   {
-    this.topics = topics;
+    this.topics = Iterables.toArray(Splitter.on(',').trimResults().omitEmptyStrings().split(topics),
String.class);
   }
 
   /**
-   * The topics the operator consumes
+   * The topics the operator consumes, separate by','
+   * Topic name can only contain ASCII alphanumerics, '.', '_' and '-'
    */
-  public String[] getTopics()
+  public String getTopics()
   {
-    return topics;
+    return Joiner.on(", ").join(topics);
   }
 
   public void setStrategy(String policy)

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0627199c/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
index 53bbd2a..01907e4 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
@@ -33,6 +33,7 @@ import java.util.Set;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
@@ -56,6 +57,7 @@ import com.datatorrent.api.StatsListener;
  *
  * @since 3.3.0
  */
+@InterfaceStability.Evolving
 public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKafkaInputOperator>,
StatsListener
 {
 
@@ -274,7 +276,7 @@ public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKa
     {
       return "PartitionMeta{" +
         "cluster='" + cluster + '\'' +
-        ", topicPartition=" + topicPartition +
+        ", topicPartition=" + getTopicPartition() +
         '}';
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0627199c/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java
index cac2ad2..7a1211a 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java
@@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -63,6 +64,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
  *
  * @since 3.3.0
  */
+@InterfaceStability.Evolving
 public class KafkaConsumerWrapper implements Closeable
 {
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0627199c/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaMetrics.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaMetrics.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaMetrics.java
index 12a375d..75449a1 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaMetrics.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaMetrics.java
@@ -23,6 +23,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 
@@ -33,6 +34,7 @@ import com.datatorrent.api.AutoMetric;
  *
  * @since 3.3.0
  */
+@InterfaceStability.Evolving
 public class KafkaMetrics implements Serializable
 {
   private KafkaConsumerStats[] stats;

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0627199c/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaPartition.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaPartition.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaPartition.java
index 4a4ebf3..1646ffe 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaPartition.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaPartition.java
@@ -21,9 +21,12 @@ package org.apache.apex.malhar.kafka;
 
 import java.io.Serializable;
 
+import org.apache.hadoop.classification.InterfaceStability;
+
 /**
  * @since 2.1.0
  */
+@InterfaceStability.Evolving
 public class KafkaPartition implements Serializable
 {
   protected static final String DEFAULT_CLUSTERID = "com.datatorrent.contrib.kafka.defaultcluster";

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0627199c/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java
index bcd3073..3b4d3f3 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.kafka.common.PartitionInfo;
 
 /**
@@ -32,6 +33,7 @@ import org.apache.kafka.common.PartitionInfo;
  *
  * @since 3.3.0
  */
+@InterfaceStability.Evolving
 public class OneToManyPartitioner extends AbstractKafkaPartitioner
 {
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0627199c/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToOnePartitioner.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToOnePartitioner.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToOnePartitioner.java
index b787932..570bdea 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToOnePartitioner.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToOnePartitioner.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.kafka.common.PartitionInfo;
 
 import com.google.common.collect.Sets;
@@ -33,6 +34,7 @@ import com.google.common.collect.Sets;
  *
  * @since 3.3.0
  */
+@InterfaceStability.Evolving
 public class OneToOnePartitioner extends AbstractKafkaPartitioner
 {
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0627199c/kafka/src/main/java/org/apache/apex/malhar/kafka/PartitionStrategy.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/PartitionStrategy.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/PartitionStrategy.java
index aaa35ec..7c142c5 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/PartitionStrategy.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/PartitionStrategy.java
@@ -18,9 +18,12 @@
  */
 package org.apache.apex.malhar.kafka;
 
+import org.apache.hadoop.classification.InterfaceStability;
+
 /**
  * @since 3.3.0
  */
+@InterfaceStability.Evolving
 public enum PartitionStrategy
 {
   /**


Mime
View raw message