Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 61BE7200CD2 for ; Thu, 13 Jul 2017 05:06:54 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 6020816A6E5; Thu, 13 Jul 2017 03:06:54 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A74B716A6E0 for ; Thu, 13 Jul 2017 05:06:53 +0200 (CEST) Received: (qmail 48069 invoked by uid 500); 13 Jul 2017 03:06:36 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 47913 invoked by uid 99); 13 Jul 2017 03:06:13 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 13 Jul 2017 03:06:13 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 38E01F21A9; Thu, 13 Jul 2017 03:06:13 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: takidau@apache.org To: commits@beam.apache.org Date: Thu, 13 Jul 2017 03:06:15 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [03/50] [abbrv] beam git commit: [BEAM-2534] Handle offset gaps in Kafka messages. archived-at: Thu, 13 Jul 2017 03:06:54 -0000 [BEAM-2534] Handle offset gaps in Kafka messages. KafkaIO logged a warning when there is a gap in offstes for messages. Kafka also support 'KV' store style topics where some of the messages are deleted leading gaps in offsets. This PR removes the log and accounts for offset gaps in backlog estimate. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/48627038 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/48627038 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/48627038 Branch: refs/heads/DSL_SQL Commit: 48627038a331a4f142d260ebf347693941113b75 Parents: da3206c Author: Raghu Angadi Authored: Wed Jun 28 12:07:06 2017 -0700 Committer: Tyler Akidau Committed: Wed Jul 12 20:00:59 2017 -0700 ---------------------------------------------------------------------- .../org/apache/beam/sdk/io/kafka/KafkaIO.java | 49 ++++++++++++-------- 1 file changed, 29 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/48627038/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 702bdd3..e520367 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -904,6 +904,22 @@ public class KafkaIO { return name; } + // Maintains approximate average over last 1000 elements + private static class MovingAvg { + private static final int MOVING_AVG_WINDOW = 1000; + private double avg = 0; + private long numUpdates = 0; + + void update(double quantity) { + numUpdates++; + avg += (quantity - avg) / Math.min(MOVING_AVG_WINDOW, numUpdates); + } + + double get() { + return avg; + } + } + // maintains state of each assigned partition (buffered records, consumed offset, etc) private static class PartitionState { private final TopicPartition topicPartition; @@ -911,9 +927,8 @@ public class KafkaIO { private long latestOffset; private Iterator> recordIter = Collections.emptyIterator(); - // simple moving average for size of each record in bytes - private double avgRecordSize = 0; - private static final int movingAvgWindow = 1000; // very roughly avg of last 1000 elements + private MovingAvg avgRecordSize = new MovingAvg(); + private MovingAvg avgOffsetGap = new MovingAvg(); // > 0 only when log compaction is enabled. PartitionState(TopicPartition partition, long nextOffset) { this.topicPartition = partition; @@ -921,17 +936,13 @@ public class KafkaIO { this.latestOffset = UNINITIALIZED_OFFSET; } - // update consumedOffset and avgRecordSize - void recordConsumed(long offset, int size) { + // Update consumedOffset, avgRecordSize, and avgOffsetGap + void recordConsumed(long offset, int size, long offsetGap) { nextOffset = offset + 1; - // this is always updated from single thread. probably not worth making it an AtomicDouble - if (avgRecordSize <= 0) { - avgRecordSize = size; - } else { - // initially, first record heavily contributes to average. - avgRecordSize += ((size - avgRecordSize) / movingAvgWindow); - } + // This is always updated from single thread. Probably not worth making atomic. + avgRecordSize.update(size); + avgOffsetGap.update(offsetGap); } synchronized void setLatestOffset(long latestOffset) { @@ -944,14 +955,15 @@ public class KafkaIO { if (backlogMessageCount == UnboundedReader.BACKLOG_UNKNOWN) { return UnboundedReader.BACKLOG_UNKNOWN; } - return (long) (backlogMessageCount * avgRecordSize); + return (long) (backlogMessageCount * avgRecordSize.get()); } synchronized long backlogMessageCount() { if (latestOffset < 0 || nextOffset < 0) { return UnboundedReader.BACKLOG_UNKNOWN; } - return Math.max(0, (latestOffset - nextOffset)); + double remaining = (latestOffset - nextOffset) / (1 + avgOffsetGap.get()); + return Math.max(0, (long) Math.ceil(remaining)); } } @@ -1154,14 +1166,11 @@ public class KafkaIO { continue; } - // sanity check - if (offset != expected) { - LOG.warn("{}: gap in offsets for {} at {}. {} records missing.", - this, pState.topicPartition, expected, offset - expected); - } + long offsetGap = offset - expected; // could be > 0 when Kafka log compaction is enabled. if (curRecord == null) { LOG.info("{}: first record offset {}", name, offset); + offsetGap = 0; } curRecord = null; // user coders below might throw. @@ -1182,7 +1191,7 @@ public class KafkaIO { int recordSize = (rawRecord.key() == null ? 0 : rawRecord.key().length) + (rawRecord.value() == null ? 0 : rawRecord.value().length); - pState.recordConsumed(offset, recordSize); + pState.recordConsumed(offset, recordSize, offsetGap); bytesRead.inc(recordSize); bytesReadBySplit.inc(recordSize); return true;