Return-Path: X-Original-To: apmail-flume-commits-archive@www.apache.org Delivered-To: apmail-flume-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 92E6610650 for ; Wed, 26 Aug 2015 01:42:01 +0000 (UTC) Received: (qmail 84228 invoked by uid 500); 26 Aug 2015 01:42:01 -0000 Delivered-To: apmail-flume-commits-archive@flume.apache.org Received: (qmail 84198 invoked by uid 500); 26 Aug 2015 01:42:01 -0000 Mailing-List: contact commits-help@flume.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flume.apache.org Delivered-To: mailing list commits@flume.apache.org Received: (qmail 84189 invoked by uid 99); 26 Aug 2015 01:42:01 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 26 Aug 2015 01:42:01 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 29AB4E01CA; Wed, 26 Aug 2015 01:42:01 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: roshannaik@apache.org To: commits@flume.apache.org Message-Id: <7bf7c830ba974a3aa7678287ff789747@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: flume git commit: FLUME-2754 - Hive Sink skipping first transaction in each Batch of Hive Transactions Date: Wed, 26 Aug 2015 01:42:01 +0000 (UTC) Repository: flume Updated Branches: refs/heads/trunk fff13b5e0 -> 318da2088 FLUME-2754 - Hive Sink skipping first transaction in each Batch of Hive Transactions (Deepesh Khandelwal via Roshan Naik) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/318da208 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/318da208 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/318da208 Branch: refs/heads/trunk Commit: 318da208844d02ed7554724ae526cefe94dd894c Parents: fff13b5 Author: Roshan Naik Authored: Tue Aug 25 18:38:14 2015 -0700 Committer: Roshan Naik Committed: Tue Aug 25 18:39:23 2015 -0700 ---------------------------------------------------------------------- .../org/apache/flume/sink/hive/HiveWriter.java | 10 ++++-- .../apache/flume/sink/hive/TestHiveWriter.java | 32 ++++++++++++++++++++ 2 files changed, 39 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/318da208/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java index aa8576e..46309be 100644 --- a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java +++ b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java @@ -88,6 +88,7 @@ class HiveWriter { this.serializer = serializer; this.recordWriter = serializer.createRecordWriter(endPoint); this.txnBatch = nextTxnBatch(recordWriter); + this.txnBatch.beginNextTransaction(); this.closed = false; this.lastUsed = System.currentTimeMillis(); } catch (InterruptedException e) { @@ -117,6 +118,10 @@ class HiveWriter { hearbeatNeeded = true; } + public int getRemainingTxns() { + return txnBatch.remainingTransactions(); + } + /** * Write data, update stats @@ -212,7 +217,7 @@ class HiveWriter { /** * Aborts the current Txn and switches to next Txn. - * @throws StreamingException if could not get new Transaction Batch, or switch to next Txn + * @throws InterruptedException */ public void abort() throws InterruptedException { batch.clear(); @@ -332,8 +337,7 @@ class HiveWriter { return connection.fetchTransactionBatch(txnsPerBatch, recordWriter); // could block } }); - LOG.info("Acquired Txn Batch {}. Switching to first txn", batch); - batch.beginNextTransaction(); + LOG.info("Acquired Transaction batch {}", batch); } catch (Exception e) { throw new TxnBatchException(endPoint, e); } http://git-wip-us.apache.org/repos/asf/flume/blob/318da208/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveWriter.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveWriter.java b/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveWriter.java index 174f179..41bf0f6 100644 --- a/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveWriter.java +++ b/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveWriter.java @@ -174,6 +174,38 @@ public class TestHiveWriter { checkRecordCountInTable(3); } + @Test + public void testTxnBatchConsumption() throws Exception { + // get a small txn batch and consume it, then roll to new batch, very + // the number of remaining txns to ensure Txns are not accidentally skipped + + HiveEndPoint endPoint = new HiveEndPoint(metaStoreURI, dbName, tblName, partVals); + SinkCounter sinkCounter = new SinkCounter(this.getClass().getName()); + + int txnPerBatch = 3; + + HiveWriter writer = new HiveWriter(endPoint, txnPerBatch, true, timeout + , callTimeoutPool, "flumetest", serializer, sinkCounter); + + Assert.assertEquals(writer.getRemainingTxns(),2); + writer.flush(true); + + Assert.assertEquals(writer.getRemainingTxns(), 1); + writer.flush(true); + + Assert.assertEquals(writer.getRemainingTxns(), 0); + writer.flush(true); + + // flip over to next batch + Assert.assertEquals(writer.getRemainingTxns(), 2); + writer.flush(true); + + Assert.assertEquals(writer.getRemainingTxns(), 1); + + writer.close(); + + } + private void checkRecordCountInTable(int expectedCount) throws CommandNeedRetryException, IOException { int count = TestUtil.listRecordsInTable(driver, dbName, tblName).size();