Return-Path: X-Original-To: apmail-apex-commits-archive@minotaur.apache.org Delivered-To: apmail-apex-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7C64D180A2 for ; Fri, 29 Jan 2016 19:12:41 +0000 (UTC) Received: (qmail 60245 invoked by uid 500); 29 Jan 2016 19:12:41 -0000 Delivered-To: apmail-apex-commits-archive@apex.apache.org Received: (qmail 60205 invoked by uid 500); 29 Jan 2016 19:12:41 -0000 Mailing-List: contact commits-help@apex.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@apex.incubator.apache.org Delivered-To: mailing list commits@apex.incubator.apache.org Received: (qmail 60196 invoked by uid 99); 29 Jan 2016 19:12:41 -0000 Received: from Unknown (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 29 Jan 2016 19:12:41 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id F24D51A034C for ; Fri, 29 Jan 2016 19:12:40 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.247 X-Spam-Level: * X-Spam-Status: No, score=1.247 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RP_MATCHES_RCVD=-0.554, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id IS17D-1siMnh for ; Fri, 29 Jan 2016 19:12:25 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with SMTP id 893E5215D2 for ; Fri, 29 Jan 2016 19:12:19 +0000 (UTC) Received: (qmail 55332 invoked by uid 99); 29 Jan 2016 19:12:19 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 29 Jan 2016 19:12:19 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6AB37E01C1; Fri, 29 Jan 2016 19:12:19 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: thw@apache.org To: commits@apex.incubator.apache.org Date: Fri, 29 Jan 2016 19:12:34 -0000 Message-Id: <680cc683d0d34fc19160cebc641b22fa@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [17/50] [abbrv] incubator-apex-malhar git commit: MLHR-1947 #comment Ability to restrict number of messages per window by size MLHR-1947 #comment Ability to restrict number of messages per window by size Fixed count calculation Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/326db94a Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/326db94a Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/326db94a Branch: refs/heads/master Commit: 326db94aba2665dafed2ec2b2d7a464c12e78147 Parents: 6ee61fc Author: Pramod Immaneni Authored: Tue Dec 15 04:36:25 2015 -0800 Committer: Pramod Immaneni Committed: Tue Dec 29 17:53:45 2015 -0800 ---------------------------------------------------------------------- .../kafka/AbstractKafkaInputOperator.java | 88 ++++++++++++++------ .../contrib/kafka/KafkaConsumer.java | 30 +++---- .../contrib/kafka/KafkaInputOperatorTest.java | 76 ++++++++++++++--- 3 files changed, 140 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/326db94a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java index ec50615..5a5ef36 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java @@ -28,10 +28,6 @@ import com.datatorrent.api.Stats; import com.datatorrent.api.StatsListener; import com.datatorrent.api.annotation.OperatorAnnotation; import com.datatorrent.api.annotation.Stateless; - -import static com.datatorrent.contrib.kafka.KafkaConsumer.KafkaMeterStatsUtil.getOffsetsForPartitions; -import static com.datatorrent.contrib.kafka.KafkaConsumer.KafkaMeterStatsUtil.get_1minMovingAvgParMap; - import com.datatorrent.lib.io.IdempotentStorageManager; import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.io.Input; @@ -41,7 +37,23 @@ import com.google.common.base.Predicate; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import kafka.api.FetchRequest; +import kafka.api.FetchRequestBuilder; +import kafka.cluster.Broker; +import kafka.javaapi.FetchResponse; +import kafka.javaapi.PartitionMetadata; +import kafka.javaapi.consumer.SimpleConsumer; +import kafka.message.Message; +import kafka.message.MessageAndOffset; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.MutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import javax.validation.Valid; +import javax.validation.constraints.Min; +import javax.validation.constraints.NotNull; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.lang.reflect.Array; @@ -57,25 +69,8 @@ import java.util.List; import java.util.Map; import java.util.Set; -import javax.validation.Valid; -import javax.validation.constraints.Min; -import javax.validation.constraints.NotNull; - -import kafka.api.FetchRequest; -import kafka.api.FetchRequestBuilder; -import kafka.cluster.Broker; -import kafka.javaapi.FetchResponse; -import kafka.javaapi.PartitionMetadata; -import kafka.javaapi.consumer.SimpleConsumer; -import kafka.message.Message; -import kafka.message.MessageAndOffset; - -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.tuple.MutablePair; -import org.apache.commons.lang3.tuple.Pair; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import static com.datatorrent.contrib.kafka.KafkaConsumer.KafkaMeterStatsUtil.getOffsetsForPartitions; +import static com.datatorrent.contrib.kafka.KafkaConsumer.KafkaMeterStatsUtil.get_1minMovingAvgParMap; /** * This is a base implementation of a Kafka input operator, which consumes data from Kafka message bus.  @@ -140,7 +135,10 @@ public abstract class AbstractKafkaInputOperator implem @Min(1) private int maxTuplesPerWindow = Integer.MAX_VALUE; + @Min(1) + private long maxTotalMsgSizePerWindow = Long.MAX_VALUE; private transient int emitCount = 0; + private transient long emitTotalMsgSize = 0; protected IdempotentStorageManager idempotentStorageManager; protected transient long currentWindowId; protected transient int operatorId; @@ -185,6 +183,8 @@ public abstract class AbstractKafkaInputOperator implem // A list store the newly discovered partitions private transient List newWaitingPartition = new LinkedList(); + private transient KafkaConsumer.KafkaMessage pendingMessage; + @Min(1) private int initialPartitionCount = 1; @@ -214,6 +214,27 @@ public abstract class AbstractKafkaInputOperator implem this.maxTuplesPerWindow = maxTuplesPerWindow; } + /** + * Get the maximum total size of messages to be transmitted per window. When the sum of the message sizes transmitted + * in a window reaches this limit no more messages are transmitted till the next window. There is one exception + * however, if the size of the first message in a window is greater than the limit it is still transmitted so that the + * processing of messages doesn't get stuck. + * @return The maximum for the total size + */ + public long getMaxTotalMsgSizePerWindow() { + return maxTotalMsgSizePerWindow; + } + + /** + * Set the maximum total size of messages to be transmitted per window. See {@link #getMaxTotalMsgSizePerWindow()} for + * more description about this property. + * + * @param maxTotalMsgSizePerWindow The maximum for the total size + */ + public void setMaxTotalMsgSizePerWindow(long maxTotalMsgSizePerWindow) { + this.maxTotalMsgSizePerWindow = maxTotalMsgSizePerWindow; + } + @Override public void setup(OperatorContext context) { @@ -246,6 +267,7 @@ public abstract class AbstractKafkaInputOperator implem replay(windowId); } emitCount = 0; + emitTotalMsgSize = 0; } protected void replay(long windowId) @@ -388,13 +410,28 @@ public abstract class AbstractKafkaInputOperator implem if (currentWindowId <= idempotentStorageManager.getLargestRecoveryWindow()) { return; } - int count = consumer.messageSize(); + int count = consumer.messageSize() + ((pendingMessage != null) ? 1 : 0); if (maxTuplesPerWindow > 0) { count = Math.min(count, maxTuplesPerWindow - emitCount); } + KafkaConsumer.KafkaMessage message = null; for (int i = 0; i < count; i++) { - KafkaConsumer.KafkaMessage message = consumer.pollMessage(); + if (pendingMessage != null) { + message = pendingMessage; + pendingMessage = null; + } else { + message = consumer.pollMessage(); + } + // If the total size transmitted in the window will be exceeded don't transmit anymore messages in this window + // Make an exception for the case when no message has been transmitted in the window and transmit at least one + // message even if the condition is violated so that the processing doesn't get stuck + if ((emitCount > 0) && ((maxTotalMsgSizePerWindow - emitTotalMsgSize) < message.msg.size())) { + pendingMessage = message; + break; + } emitTuple(message.msg); + emitCount++; + emitTotalMsgSize += message.msg.size(); offsetStats.put(message.kafkaPart, message.offSet); MutablePair offsetAndCount = currentWindowRecoveryState.get(message.kafkaPart); if(offsetAndCount == null) { @@ -403,7 +440,6 @@ public abstract class AbstractKafkaInputOperator implem offsetAndCount.setRight(offsetAndCount.right+1); } } - emitCount += count; } public void setConsumer(K consumer) http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/326db94a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaConsumer.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaConsumer.java index cf5179c..805fdc4 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaConsumer.java +++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaConsumer.java @@ -18,6 +18,19 @@ */ package com.datatorrent.contrib.kafka; +import com.datatorrent.api.Context; +import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind; +import com.esotericsoftware.kryo.serializers.JavaSerializer; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Maps; +import com.google.common.collect.SetMultimap; +import kafka.message.Message; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; + +import javax.validation.constraints.NotNull; +import javax.validation.constraints.Pattern; +import javax.validation.constraints.Pattern.Flag; import java.io.Closeable; import java.io.Serializable; import java.util.Collection; @@ -32,23 +45,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import javax.validation.constraints.NotNull; -import javax.validation.constraints.Pattern; -import javax.validation.constraints.Pattern.Flag; - -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; - -import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind; -import com.esotericsoftware.kryo.serializers.JavaSerializer; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Maps; -import com.google.common.collect.SetMultimap; - -import com.datatorrent.api.Context; - -import kafka.message.Message; - /** * Base Kafka Consumer class used by kafka input operator * http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/326db94a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java index 76b3550..2a5a38d 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java @@ -19,8 +19,6 @@ package com.datatorrent.contrib.kafka; import com.datatorrent.api.Attribute; -import com.datatorrent.api.StringCodec; -import com.datatorrent.common.util.BaseOperator; import com.datatorrent.api.Context; import com.datatorrent.api.DAG; import com.datatorrent.api.DAG.Locality; @@ -29,10 +27,20 @@ import com.datatorrent.api.LocalMode; import com.datatorrent.api.Operator; import com.datatorrent.api.Partitioner; import com.datatorrent.common.util.FSStorageAgent; +import com.datatorrent.common.util.BaseOperator; import com.datatorrent.lib.helper.OperatorContextTestHelper; import com.datatorrent.lib.io.IdempotentStorageManager; import com.datatorrent.lib.partitioner.StatelessPartitionerTest; import com.datatorrent.lib.testbench.CollectorTestSink; +import org.apache.commons.io.FileUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; +import org.slf4j.LoggerFactory; + import java.io.File; import java.io.IOException; import java.util.Collection; @@ -42,17 +50,8 @@ import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; -import org.junit.After; -import org.junit.Assert; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestWatcher; -import org.junit.runner.Description; -import org.slf4j.LoggerFactory; - public class KafkaInputOperatorTest extends KafkaOperatorTestBase { static final org.slf4j.Logger logger = LoggerFactory.getLogger(KafkaInputOperatorTest.class); @@ -339,6 +338,61 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase } @Test + public void testMaxTotalSize() throws InterruptedException { + int totalCount = 1500; + int maxTotalSize = 500; + + // initial the latch for this test + latch = new CountDownLatch(1); + + // Start producer + KafkaTestProducer p = new KafkaTestProducer(TEST_TOPIC); + p.setSendCount(totalCount); + Thread t = new Thread(p); + t.start(); + + Attribute.AttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap(); + attributeMap.put(Context.DAGContext.APPLICATION_PATH, testMeta.baseDir); + + Context.OperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(1, attributeMap); + KafkaSinglePortStringInputOperator operator = new KafkaSinglePortStringInputOperator(); + + KafkaConsumer consumer = new SimpleKafkaConsumer(); + consumer.setTopic(TEST_TOPIC); + consumer.setInitialOffset("earliest"); + + operator.setConsumer(consumer); + operator.setZookeeper("localhost:" + KafkaOperatorTestBase.TEST_ZOOKEEPER_PORT[0]); + operator.setMaxTotalMsgSizePerWindow(maxTotalSize); + + List>> partitions = new LinkedList>>(); + + Collection>> newPartitions = operator.definePartitions(partitions, new StatelessPartitionerTest.PartitioningContextImpl(null, 0)); + Assert.assertEquals(1, newPartitions.size()); + + operator = (KafkaSinglePortStringInputOperator)newPartitions.iterator().next().getPartitionedInstance(); + + CollectorTestSink sink = new CollectorTestSink(); + operator.outputPort.setSink(sink); + operator.setup(context); + operator.activate(context); + latch.await(4000, TimeUnit.MILLISECONDS); + operator.beginWindow(1); + operator.emitTuples(); + operator.endWindow(); + + t.join(); + + operator.deactivate(); + operator.teardown(); + int size = 0; + for (Object o : sink.collectedTuples) { + size += ((String)o).getBytes().length; + } + Assert.assertTrue("Total emitted size comparison", size < maxTotalSize); + } + + @Test public void testZookeeper() throws Exception { // initial the latch for this test