crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mkw...@apache.org
Subject crunch git commit: CRUNCH-629: Kafka source pulling is aggressive
Date Tue, 03 Jan 2017 18:59:15 GMT
Repository: crunch
Updated Branches:
  refs/heads/master 901d0644d -> 628098317


CRUNCH-629: Kafka source pulling is aggressive

Added some parenthesis to force proper order of operations in KafkaRecordReader.

Signed-off-by: Micah Whitacre <mkwhit@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/62809831
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/62809831
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/62809831

Branch: refs/heads/master
Commit: 6280983179e9c690af69c2bf0e296b054122d724
Parents: 901d064
Author: Brian Tieman <brian.tieman@cerner.com>
Authored: Tue Dec 13 09:01:08 2016 -0600
Committer: Micah Whitacre <mkwhit@gmail.com>
Committed: Tue Jan 3 12:48:02 2017 -0600

----------------------------------------------------------------------
 .../apache/crunch/kafka/inputformat/KafkaRecordReader.java   | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/62809831/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaRecordReader.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaRecordReader.java
b/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaRecordReader.java
index 3ed799b..0c49c66 100644
--- a/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaRecordReader.java
+++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaRecordReader.java
@@ -163,11 +163,11 @@ public class KafkaRecordReader<K, V> extends RecordReader<K,
V> {
   }
 
   private Iterator<ConsumerRecord<K, V>> getRecords() {
-    if (recordIterator == null || !recordIterator.hasNext()) {
+    if ((recordIterator == null) || !recordIterator.hasNext()) {
       ConsumerRecords<K, V> records = null;
       int numTries = 0;
       boolean success = false;
-      while(!success && numTries < maxNumberAttempts) {
+      while(!success && (numTries < maxNumberAttempts)) {
         try {
           records = getConsumer().poll(consumerPollTimeout);
         } catch (RetriableException re) {
@@ -179,7 +179,7 @@ public class KafkaRecordReader<K, V> extends RecordReader<K,
V> {
             throw re;
           }
         }
-        if((records == null || records.isEmpty()) && hasPendingData()){
+        if(((records == null) || records.isEmpty()) && hasPendingData()){
           concurrentEmptyResponses++;
           LOG.warn("No records retrieved but pending offsets to consume therefore polling
again. Attempt {}/{}",
             concurrentEmptyResponses, maxConcurrentEmptyResponses);
@@ -189,7 +189,7 @@ public class KafkaRecordReader<K, V> extends RecordReader<K,
V> {
       }
       concurrentEmptyResponses = 0;
 
-      if(records == null || records.isEmpty()){
+      if((records == null) || records.isEmpty()){
         LOG.info("No records retrieved from Kafka therefore nothing to iterate over.");
       }else{
         LOG.info("Retrieved records from Kafka to iterate over.");


Mime
View raw message