beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbono...@apache.org
Subject [1/2] beam git commit: Add timeout to initialization of partition in KafkaIO
Date Mon, 10 Jul 2017 20:00:48 GMT
Repository: beam
Updated Branches:
  refs/heads/release-2.1.0 7f2419f09 -> 2351c7e33


Add timeout to initialization of partition in KafkaIO


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/38fc2b2e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/38fc2b2e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/38fc2b2e

Branch: refs/heads/release-2.1.0
Commit: 38fc2b2e76fd13c3edd304ffcb5378fb36ab48c0
Parents: 53b372b
Author: Raghu Angadi <rangadi@google.com>
Authored: Mon Jul 3 23:54:10 2017 -0700
Committer: Raghu Angadi <rangadi@google.com>
Committed: Thu Jul 6 22:02:40 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   | 81 +++++++++++++++-----
 .../apache/beam/sdk/io/kafka/KafkaIOTest.java   | 30 ++++++++
 2 files changed, 92 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/38fc2b2e/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 702bdd3..28262c9 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -49,9 +49,11 @@ import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
@@ -1049,8 +1051,32 @@ public class KafkaIO {
       curBatch = Iterators.cycle(nonEmpty);
     }
 
+    private void setupInitialOffset(PartitionState pState) {
+      Read<K, V> spec = source.spec;
+
+      if (pState.nextOffset != UNINITIALIZED_OFFSET) {
+        consumer.seek(pState.topicPartition, pState.nextOffset);
+      } else {
+        // nextOffset is unininitialized here, meaning start reading from latest record as
of now
+        // ('latest' is the default, and is configurable) or 'look up offset by startReadTime.
+        // Remember the current position without waiting until the first record is read.
This
+        // ensures checkpoint is accurate even if the reader is closed before reading any
records.
+        Instant startReadTime = spec.getStartReadTime();
+        if (startReadTime != null) {
+          pState.nextOffset =
+              consumerSpEL.offsetForTime(consumer, pState.topicPartition, spec.getStartReadTime());
+          consumer.seek(pState.topicPartition, pState.nextOffset);
+        } else {
+          pState.nextOffset = consumer.position(pState.topicPartition);
+        }
+      }
+    }
+
     @Override
     public boolean start() throws IOException {
+      final int defaultPartitionInitTimeout = 60 * 1000;
+      final int kafkaRequestTimeoutMultiple = 2;
+
       Read<K, V> spec = source.spec;
       consumer = spec.getConsumerFactoryFn().apply(spec.getConsumerConfig());
       consumerSpEL.evaluateAssign(consumer, spec.getTopicPartitions());
@@ -1065,25 +1091,38 @@ public class KafkaIO {
       keyDeserializerInstance.configure(spec.getConsumerConfig(), true);
       valueDeserializerInstance.configure(spec.getConsumerConfig(), false);
 
-      for (PartitionState p : partitionStates) {
-        if (p.nextOffset != UNINITIALIZED_OFFSET) {
-          consumer.seek(p.topicPartition, p.nextOffset);
-        } else {
-          // nextOffset is unininitialized here, meaning start reading from latest record
as of now
-          // ('latest' is the default, and is configurable) or 'look up offset by startReadTime.
-          // Remember the current position without waiting until the first record is read.
This
-          // ensures checkpoint is accurate even if the reader is closed before reading any
records.
-          Instant startReadTime = spec.getStartReadTime();
-          if (startReadTime != null) {
-            p.nextOffset =
-                consumerSpEL.offsetForTime(consumer, p.topicPartition, spec.getStartReadTime());
-            consumer.seek(p.topicPartition, p.nextOffset);
-          } else {
-            p.nextOffset = consumer.position(p.topicPartition);
+      // Seek to start offset for each partition. This is the first interaction with the
server.
+      // Unfortunately it can block forever in case of network issues like incorrect ACLs.
+      // Initialize partition in a separate thread and cancel it if takes longer than a minute.
+      for (final PartitionState pState : partitionStates) {
+        Future<?> future =  consumerPollThread.submit(new Runnable() {
+          public void run() {
+            setupInitialOffset(pState);
           }
-        }
+        });
 
-        LOG.info("{}: reading from {} starting at offset {}", name, p.topicPartition, p.nextOffset);
+        try {
+          // Timeout : 1 minute OR 2 * Kafka consumer request timeout if it is set.
+          Integer reqTimeout = (Integer) source.spec.getConsumerConfig().get(
+              ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+          future.get(reqTimeout != null ? kafkaRequestTimeoutMultiple * reqTimeout
+                         : defaultPartitionInitTimeout,
+                     TimeUnit.MILLISECONDS);
+        } catch (TimeoutException e) {
+          consumer.wakeup(); // This unblocks consumer stuck on network I/O.
+          // Likely reason : Kafka servers are configured to advertise internal ips, but
+          // those ips are not accessible from workers outside.
+          String msg = String.format(
+              "%s: Timeout while initializing partition '%s'. "
+                  + "Kafka client may not be able to connect to servers.",
+              this, pState.topicPartition);
+          LOG.error("{}", msg);
+          throw new IOException(msg);
+        } catch (Exception e) {
+          throw new IOException(e);
+        }
+        LOG.info("{}: reading from {} starting at offset {}",
+                 name, pState.topicPartition, pState.nextOffset);
       }
 
       // Start consumer read loop.
@@ -1320,8 +1359,12 @@ public class KafkaIO {
       // might block to enqueue right after availableRecordsQueue.poll() below.
       while (!isShutdown) {
 
-        consumer.wakeup();
-        offsetConsumer.wakeup();
+        if (consumer != null) {
+          consumer.wakeup();
+        }
+        if (offsetConsumer != null) {
+          offsetConsumer.wakeup();
+        }
         availableRecordsQueue.poll(); // drain unread batch, this unblocks consumer thread.
         try {
           isShutdown = consumerPollThread.awaitTermination(10, TimeUnit.SECONDS)

http://git-wip-us.apache.org/repos/asf/beam/blob/38fc2b2e/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index b69bc83..482f5a2 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -83,6 +83,7 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.MockConsumer;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
@@ -364,6 +365,35 @@ public class KafkaIOTest {
   }
 
   @Test
+  public void testUnreachableKafkaBrokers() {
+    // Expect an exception when the Kafka brokers are not reachable on the workers.
+    // We specify partitions explicitly so that splitting does not involve server interaction.
+    // Set request timeout to 10ms so that test does not take long.
+
+    thrown.expect(Exception.class);
+    thrown.expectMessage("Reader-0: Timeout while initializing partition 'test-0'");
+
+    int numElements = 1000;
+    PCollection<Long> input = p
+        .apply(KafkaIO.<Integer, Long>read()
+            .withBootstrapServers("8.8.8.8:9092") // Google public DNS ip.
+            .withTopicPartitions(ImmutableList.of(new TopicPartition("test", 0)))
+            .withKeyDeserializer(IntegerDeserializer.class)
+            .withValueDeserializer(LongDeserializer.class)
+            .updateConsumerProperties(ImmutableMap.<String, Object>of(
+                ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 10,
+                ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 5,
+                ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 8,
+                ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 8))
+            .withMaxNumRecords(10)
+            .withoutMetadata())
+        .apply(Values.<Long>create());
+
+    addCountingAsserts(input, numElements);
+    p.run();
+  }
+
+  @Test
   public void testUnboundedSourceWithSingleTopic() {
     // same as testUnboundedSource, but with single topic
 


Mime
View raw message