apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t..@apache.org
Subject [17/50] [abbrv] incubator-apex-malhar git commit: MLHR-1947 #comment Ability to restrict number of messages per window by size
Date Fri, 29 Jan 2016 19:12:34 GMT
MLHR-1947 #comment Ability to restrict number of messages per window by size

Fixed count calculation


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/326db94a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/326db94a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/326db94a

Branch: refs/heads/master
Commit: 326db94aba2665dafed2ec2b2d7a464c12e78147
Parents: 6ee61fc
Author: Pramod Immaneni <pramod@datatorrent.com>
Authored: Tue Dec 15 04:36:25 2015 -0800
Committer: Pramod Immaneni <pramod@datatorrent.com>
Committed: Tue Dec 29 17:53:45 2015 -0800

----------------------------------------------------------------------
 .../kafka/AbstractKafkaInputOperator.java       | 88 ++++++++++++++------
 .../contrib/kafka/KafkaConsumer.java            | 30 +++----
 .../contrib/kafka/KafkaInputOperatorTest.java   | 76 ++++++++++++++---
 3 files changed, 140 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/326db94a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
index ec50615..5a5ef36 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
@@ -28,10 +28,6 @@ import com.datatorrent.api.Stats;
 import com.datatorrent.api.StatsListener;
 import com.datatorrent.api.annotation.OperatorAnnotation;
 import com.datatorrent.api.annotation.Stateless;
-
-import static com.datatorrent.contrib.kafka.KafkaConsumer.KafkaMeterStatsUtil.getOffsetsForPartitions;
-import static com.datatorrent.contrib.kafka.KafkaConsumer.KafkaMeterStatsUtil.get_1minMovingAvgParMap;
-
 import com.datatorrent.lib.io.IdempotentStorageManager;
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.io.Input;
@@ -41,7 +37,23 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import kafka.api.FetchRequest;
+import kafka.api.FetchRequestBuilder;
+import kafka.cluster.Broker;
+import kafka.javaapi.FetchResponse;
+import kafka.javaapi.PartitionMetadata;
+import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.message.Message;
+import kafka.message.MessageAndOffset;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import javax.validation.Valid;
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.lang.reflect.Array;
@@ -57,25 +69,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import javax.validation.Valid;
-import javax.validation.constraints.Min;
-import javax.validation.constraints.NotNull;
-
-import kafka.api.FetchRequest;
-import kafka.api.FetchRequestBuilder;
-import kafka.cluster.Broker;
-import kafka.javaapi.FetchResponse;
-import kafka.javaapi.PartitionMetadata;
-import kafka.javaapi.consumer.SimpleConsumer;
-import kafka.message.Message;
-import kafka.message.MessageAndOffset;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.tuple.MutablePair;
-import org.apache.commons.lang3.tuple.Pair;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static com.datatorrent.contrib.kafka.KafkaConsumer.KafkaMeterStatsUtil.getOffsetsForPartitions;
+import static com.datatorrent.contrib.kafka.KafkaConsumer.KafkaMeterStatsUtil.get_1minMovingAvgParMap;
 
 /**
  * This is a base implementation of a Kafka input operator, which consumes data from Kafka
message bus.&nbsp;
@@ -140,7 +135,10 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer>
implem
 
   @Min(1)
   private int maxTuplesPerWindow = Integer.MAX_VALUE;
+  @Min(1)
+  private long maxTotalMsgSizePerWindow = Long.MAX_VALUE;
   private transient int emitCount = 0;
+  private transient long emitTotalMsgSize = 0;
   protected IdempotentStorageManager idempotentStorageManager;
   protected transient long currentWindowId;
   protected transient int operatorId;
@@ -185,6 +183,8 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer>
implem
   // A list store the newly discovered partitions
   private transient List<KafkaPartition> newWaitingPartition = new LinkedList<KafkaPartition>();
 
+  private transient KafkaConsumer.KafkaMessage pendingMessage;
+
   @Min(1)
   private int initialPartitionCount = 1;
 
@@ -214,6 +214,27 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer>
implem
     this.maxTuplesPerWindow = maxTuplesPerWindow;
   }
 
+  /**
+   * Get the maximum total size of messages to be transmitted per window. When the sum of
the message sizes transmitted
+   * in a window reaches this limit no more messages are transmitted till the next window.
There is one exception
+   * however, if the size of the first message in a window is greater than the limit it is
still transmitted so that the
+   * processing of messages doesn't get stuck.
+   * @return The maximum for the total size
+     */
+  public long getMaxTotalMsgSizePerWindow() {
+    return maxTotalMsgSizePerWindow;
+  }
+
+  /**
+   * Set the maximum total size of messages to be transmitted per window. See {@link #getMaxTotalMsgSizePerWindow()}
for
+   * more description about this property.
+   *
+   * @param maxTotalMsgSizePerWindow The maximum for the total size
+     */
+  public void setMaxTotalMsgSizePerWindow(long maxTotalMsgSizePerWindow) {
+    this.maxTotalMsgSizePerWindow = maxTotalMsgSizePerWindow;
+  }
+
   @Override
   public void setup(OperatorContext context)
   {
@@ -246,6 +267,7 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer>
implem
       replay(windowId);
     }
     emitCount = 0;
+    emitTotalMsgSize = 0;
   }
 
   protected void replay(long windowId)
@@ -388,13 +410,28 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer>
implem
     if (currentWindowId <= idempotentStorageManager.getLargestRecoveryWindow()) {
       return;
     }
-    int count = consumer.messageSize();
+    int count = consumer.messageSize() + ((pendingMessage != null) ? 1 : 0);
     if (maxTuplesPerWindow > 0) {
       count = Math.min(count, maxTuplesPerWindow - emitCount);
     }
+    KafkaConsumer.KafkaMessage message = null;
     for (int i = 0; i < count; i++) {
-      KafkaConsumer.KafkaMessage message = consumer.pollMessage();
+      if (pendingMessage != null) {
+        message = pendingMessage;
+        pendingMessage = null;
+      } else {
+        message = consumer.pollMessage();
+      }
+      // If the total size transmitted in the window will be exceeded don't transmit anymore
messages in this window
+      // Make an exception for the case when no message has been transmitted in the window
and transmit at least one
+      // message even if the condition is violated so that the processing doesn't get stuck
+      if ((emitCount > 0) && ((maxTotalMsgSizePerWindow - emitTotalMsgSize) <
message.msg.size())) {
+        pendingMessage = message;
+        break;
+      }
       emitTuple(message.msg);
+      emitCount++;
+      emitTotalMsgSize += message.msg.size();
       offsetStats.put(message.kafkaPart, message.offSet);
       MutablePair<Long, Integer> offsetAndCount = currentWindowRecoveryState.get(message.kafkaPart);
       if(offsetAndCount == null) {
@@ -403,7 +440,6 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer>
implem
         offsetAndCount.setRight(offsetAndCount.right+1);
       }
     }
-    emitCount += count;
   }
 
   public void setConsumer(K consumer)

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/326db94a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaConsumer.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaConsumer.java
index cf5179c..805fdc4 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaConsumer.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaConsumer.java
@@ -18,6 +18,19 @@
  */
 package com.datatorrent.contrib.kafka;
 
+import com.datatorrent.api.Context;
+import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.SetMultimap;
+import kafka.message.Message;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import javax.validation.constraints.NotNull;
+import javax.validation.constraints.Pattern;
+import javax.validation.constraints.Pattern.Flag;
 import java.io.Closeable;
 import java.io.Serializable;
 import java.util.Collection;
@@ -32,23 +45,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
-import javax.validation.constraints.NotNull;
-import javax.validation.constraints.Pattern;
-import javax.validation.constraints.Pattern.Flag;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.StringUtils;
-
-import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
-import com.esotericsoftware.kryo.serializers.JavaSerializer;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Maps;
-import com.google.common.collect.SetMultimap;
-
-import com.datatorrent.api.Context;
-
-import kafka.message.Message;
-
 /**
  * Base Kafka Consumer class used by kafka input operator
  *

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/326db94a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java
b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java
index 76b3550..2a5a38d 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java
@@ -19,8 +19,6 @@
 package com.datatorrent.contrib.kafka;
 
 import com.datatorrent.api.Attribute;
-import com.datatorrent.api.StringCodec;
-import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.api.Context;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
@@ -29,10 +27,20 @@ import com.datatorrent.api.LocalMode;
 import com.datatorrent.api.Operator;
 import com.datatorrent.api.Partitioner;
 import com.datatorrent.common.util.FSStorageAgent;
+import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.lib.helper.OperatorContextTestHelper;
 import com.datatorrent.lib.io.IdempotentStorageManager;
 import com.datatorrent.lib.partitioner.StatelessPartitionerTest;
 import com.datatorrent.lib.testbench.CollectorTestSink;
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+import org.slf4j.LoggerFactory;
+
 import java.io.File;
 import java.io.IOException;
 import java.util.Collection;
@@ -42,17 +50,8 @@ import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestWatcher;
-import org.junit.runner.Description;
-import org.slf4j.LoggerFactory;
-
 public class KafkaInputOperatorTest extends KafkaOperatorTestBase
 {
   static final org.slf4j.Logger logger = LoggerFactory.getLogger(KafkaInputOperatorTest.class);
@@ -339,6 +338,61 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
   }
 
   @Test
+  public void testMaxTotalSize() throws InterruptedException {
+    int totalCount = 1500;
+    int maxTotalSize = 500;
+
+    // initial the latch for this test
+    latch = new CountDownLatch(1);
+
+    // Start producer
+    KafkaTestProducer p = new KafkaTestProducer(TEST_TOPIC);
+    p.setSendCount(totalCount);
+    Thread t = new Thread(p);
+    t.start();
+
+    Attribute.AttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
+    attributeMap.put(Context.DAGContext.APPLICATION_PATH, testMeta.baseDir);
+
+    Context.OperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(1,
attributeMap);
+    KafkaSinglePortStringInputOperator operator = new KafkaSinglePortStringInputOperator();
+
+    KafkaConsumer consumer = new SimpleKafkaConsumer();
+    consumer.setTopic(TEST_TOPIC);
+    consumer.setInitialOffset("earliest");
+
+    operator.setConsumer(consumer);
+    operator.setZookeeper("localhost:" + KafkaOperatorTestBase.TEST_ZOOKEEPER_PORT[0]);
+    operator.setMaxTotalMsgSizePerWindow(maxTotalSize);
+
+    List<Partitioner.Partition<AbstractKafkaInputOperator<KafkaConsumer>>>
partitions = new LinkedList<Partitioner.Partition<AbstractKafkaInputOperator<KafkaConsumer>>>();
+
+    Collection<Partitioner.Partition<AbstractKafkaInputOperator<KafkaConsumer>>>
newPartitions = operator.definePartitions(partitions, new StatelessPartitionerTest.PartitioningContextImpl(null,
0));
+    Assert.assertEquals(1, newPartitions.size());
+
+    operator = (KafkaSinglePortStringInputOperator)newPartitions.iterator().next().getPartitionedInstance();
+
+    CollectorTestSink<Object> sink = new CollectorTestSink<Object>();
+    operator.outputPort.setSink(sink);
+    operator.setup(context);
+    operator.activate(context);
+    latch.await(4000, TimeUnit.MILLISECONDS);
+    operator.beginWindow(1);
+    operator.emitTuples();
+    operator.endWindow();
+
+    t.join();
+
+    operator.deactivate();
+    operator.teardown();
+    int size = 0;
+    for (Object o : sink.collectedTuples) {
+      size += ((String)o).getBytes().length;
+    }
+    Assert.assertTrue("Total emitted size comparison", size < maxTotalSize);
+  }
+
+  @Test
   public void testZookeeper() throws Exception
   {
     // initial the latch for this test


Mime
View raw message