beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From reuven...@apache.org
Subject [10/13] beam git commit: Close producers after one minute of inactivity. This closes producers when ranges move to other workers. Misc fixes around.
Date Tue, 17 Oct 2017 15:17:45 GMT
Close producers after one minute of inactivity.
This closes producers when ranges move to other workers.
Misc fixes around.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f4f6105e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f4f6105e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f4f6105e

Branch: refs/heads/master
Commit: f4f6105e9ca7da0af938ba65544398be2f6fc4e8
Parents: 5767156
Author: Raghu Angadi <rangadi@google.com>
Authored: Fri Jul 28 17:39:17 2017 -0700
Committer: Raghu Angadi <rangadi@google.com>
Committed: Tue Oct 17 00:02:05 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   | 172 +++++++++++++------
 1 file changed, 115 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f4f6105e/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 5d50cf7..63dc734 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -28,6 +28,13 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Joiner;
 import com.google.common.base.MoreObjects;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalCause;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
 import com.google.common.collect.ComparisonChain;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
@@ -1510,6 +1517,13 @@ public class KafkaIO {
       return toBuilder().setProducerFactoryFn(producerFactoryFn).build();
     }
 
+    /**
+     * TODO: User friendly javadoc.
+     * Note on performance: Exactly-once sink involves two shuffles of input records in order
to
+     * provide the right semantics. As a result, the input records go through 2
+     * serialization-deserialization cycles. Depending on volume and cost of serialization,
the
+     * CPU cost might be noticeable. The cost could be minimized by writing byte arrays.
+     */
     public Write<K, V> withEOS() {
       return toBuilder().setEOS(true).build();
     }
@@ -1827,8 +1841,8 @@ public class KafkaIO {
       if (numShards <= 0) {
         try (Consumer<?, ?> consumer = openConsumer(spec)) {
           numShards = consumer.partitionsFor(spec.getTopic()).size();
-          LOG.info("Using {} shards for exactly-once, matching number of partitions for topic
'{}'",
-                   numShards, spec.getTopic());
+          LOG.info("Using {} shards for exactly-once writer, matching number of partitions
"
+                   + "for topic '{}'", numShards, spec.getTopic());
         }
       }
       checkState(numShards > 0, "Could not set number of shards");
@@ -1911,11 +1925,6 @@ public class KafkaIO {
 
     private final Write<K, V> spec;
 
-    // One cache for each sink (usually there is only one sink per pipeline
-    private static final Map<String, Map<Integer, ShardWriter<?, ?>>> CACHE_BY_GROUP_ID
=
-        new HashMap<>();
-    // TODO: Need a way to close producers that are no longer relevant (may be have a timeout?).
-
     // Metrics
     private final Counter elementsWritten = SinkMetrics.elementsWritten();
     // Elements buffered due to out of order arrivals.
@@ -1942,13 +1951,19 @@ public class KafkaIO {
       long nextId = MoreObjects.firstNonNull(nextIdState.read(), 0L);
       long minBufferedId = MoreObjects.firstNonNull(minBufferedIdState.read(), Long.MAX_VALUE);
 
-      ShardWriter<K, V> writer = getShardWriter(shard, writerIdState, nextId);
+      ShardWriterCache<K, V> cache =
+          (ShardWriterCache<K, V>) CACHE_BY_GROUP_ID.getUnchecked(spec.getSinkGroupId());
+      ShardWriter<K, V> writer = cache.removeIfPresent(shard);
+      if (writer == null) {
+        writer = initShardWriter(shard, writerIdState, nextId);
+      }
+
       long committedId = writer.committedId;
 
       if (committedId >= nextId) {
         // This is a retry of an already committed batch.
         LOG.info("{}: committed id {} is ahead of expected {}. {} records will be dropped
"
-                 + "(these are already written).",
+                     + "(these are already written).",
                  shard, committedId, nextId - 1, committedId - nextId + 1);
         nextId = committedId + 1;
       }
@@ -2007,7 +2022,7 @@ public class KafkaIO {
             List<KV<Long, KV<K, V>>> buffered = Lists.newArrayList(oooBufferState.read());
             Collections.sort(buffered, new KV.OrderByKey<Long, KV<K, V>>());
 
-            LOG.info("{} : merging {} buffered records with min buffered id",
+            LOG.info("{} : merging {} buffered records (min buffered id is {}).",
                      shard, buffered.size(), minBufferedId);
 
             oooBufferState.clear();
@@ -2021,25 +2036,31 @@ public class KafkaIO {
 
         writer.commitTxn(nextId - 1, numTransactions);
         nextIdState.write(nextId);
+
       } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException
e) {
-        // JavaDoc says these are not recoverable errors and producer should be closed.
+        // Producer JavaDoc says these are not recoverable errors and producer should be
closed.
 
-        // Close the producer and and remove it from the cache. A new producer will be created
-        // in retry. It is possible that a rough worker keeps retrying and ends up fencing
off
+        // Close the producer and a new producer will be initialized in retry.
+        // It is possible that a rough worker keeps retrying and ends up fencing off
         // active producers. How likely this might be or how well such a scenario is handled
         // depends on the runner. For now we will leave it to upper layers, will need to
revisit.
 
-        LOG.warn("{} : closing producer {} after unrecoverable error. The work might be migrated.
"
-                 + "committed id {}, current id {}.",
+        LOG.warn("{} : closing producer {} after unrecoverable error. The work might have
migrated."
+                     + " Committed id {}, current id {}.",
                  writer.shard, writer.producerName, writer.committedId, nextId - 1, e);
 
-        CACHE_BY_GROUP_ID.get(spec.getSinkGroupId()).remove(writer.shard);
         writer.producer.close();
+        writer = null; // No need to cache it.
         throw e;
+      } finally {
+        if (writer != null) {
+          cache.insert(shard, writer);
+        }
       }
     }
 
     private static class ShardMetadata {
+
       @JsonProperty("seq")
       public final long sequenceId;
       @JsonProperty("id")
@@ -2056,7 +2077,11 @@ public class KafkaIO {
       }
     }
 
+    /**
+     * A wrapper around Kafka producer. One for each of the shards.
+     */
     private static class ShardWriter<K, V> {
+
       private final int shard;
       private final String writerId;
       private final Producer<K, V> producer;
@@ -2102,8 +2127,8 @@ public class KafkaIO {
           producer.sendOffsetsToTransaction(
               ImmutableMap.of(new TopicPartition(spec.getTopic(), shard),
                               new OffsetAndMetadata(
-                                0L, JSON_MAPPER.writeValueAsString(new ShardMetadata(lastRecordId,
-                                                                                     writerId)))),
+                                  0L, JSON_MAPPER.writeValueAsString(new ShardMetadata(lastRecordId,
+                                                                                       writerId)))),
               spec.getSinkGroupId());
           producer.commitTransaction();
 
@@ -2118,39 +2143,18 @@ public class KafkaIO {
       }
     }
 
-    private ShardWriter<K, V> getShardWriter(int shard,
-                                             ValueState<String> writerIdState,
-                                             long nextId)  throws IOException {
-
-      Map<Integer, ShardWriter<?, ?>> cache;
-
-      synchronized (CACHE_BY_GROUP_ID) {
-        cache = CACHE_BY_GROUP_ID.get(spec.getSinkGroupId());
-        if (cache == null) {
-          cache = new HashMap<>();
-          CACHE_BY_GROUP_ID.put(spec.getSinkGroupId(), cache);
-        }
-      }
-
-      synchronized (cache) {
-        @SuppressWarnings("unchecked")
-        ShardWriter<K, V> shardWriter = (ShardWriter<K, V>) cache.get(shard);
-        if (shardWriter != null) {
-          // any sanity checks?
-          return shardWriter;
-        }
-      }
-
-      // initialize new shard
+    private ShardWriter<K, V> initShardWriter(int shard,
+                                              ValueState<String> writerIdState,
+                                              long nextId) throws IOException {
 
       String producerName = String.format("producer_%d_for_%s", shard, spec.getSinkGroupId());
       Producer<K, V> producer = initializeEosProducer(spec, producerName);
 
-      String writerId = writerIdState.read();
-
       // Fetch latest committed metadata for the partition (if any). Checks committed sequence
ids.
       try {
 
+        String writerId = writerIdState.read();
+
         OffsetAndMetadata committed;
 
         try (Consumer<?, ?> consumer = openConsumer(spec)) {
@@ -2161,7 +2165,7 @@ public class KafkaIO {
 
         if (committed == null || committed.metadata() == null || committed.metadata().isEmpty())
{
           checkState(nextId == 0 && writerId == null,
-                     "State exists for shard %d (nextId %s, writerId '%s'), but there is
no state "
+                     "State exists for shard %s (nextId %s, writerId '%s'), but there is
no state "
                          + "stored with Kafka topic '%s' group id '%s'",
                      shard, nextId, writerId, spec.getTopic(), spec.getSinkGroupId());
 
@@ -2191,9 +2195,9 @@ public class KafkaIO {
             // We could let users explicitly an option to override the existing metadata.
             //
             throw new IllegalStateException(String.format(
-              "Kafka metadata exists for shard %d, but there is no stored state for it. "
+              "Kafka metadata exists for shard %s, but there is no stored state for it. "
               + "This mostly indicates groupId '%s' is already used else where or in earlier
runs. "
-              + "Try another group id. Metadata : '%s'.",
+              + "Try another group id. Metadata for this shard on Kafka : '%s'",
               shard, spec.getSinkGroupId(), committed.metadata()));
           }
 
@@ -2209,25 +2213,79 @@ public class KafkaIO {
                      nextId - 1, committed.metadata());
         }
 
-        ShardWriter<K, V> shardWriter = new ShardWriter<>(shard, writerId, producer,
producerName,
-                                                          spec, committedSeqId);
-
-        synchronized (cache) {
-          checkState(cache.get(shard) == null,
-                     "Unexpected concurrent execution of shard %s", shard);
-          cache.put(shard, shardWriter);
-        }
-
         LOG.info("{} : initialized producer {} with committed sequence id {}",
                  shard, producerName, committedSeqId);
 
-        return shardWriter;
+        return new ShardWriter<>(shard, writerId, producer, producerName, spec, committedSeqId);
 
       } catch (Exception e) {
         producer.close();
         throw e;
       }
     }
+
+    /**
+     * A wrapper around guava cache to provide insert()/remove() semantics. A ShardWriter
will
+     * be closed if it is stays in cache for more than 1 minute, i.e. not used inside EOSWrite
+     * DoFn for a minute or more.
+     */
+    private static class ShardWriterCache<K, V> {
+
+      static final ScheduledExecutorService SCHEDULED_CLEAN_UP_THREAD =
+          Executors.newSingleThreadScheduledExecutor();
+
+      static final int CLEAN_UP_CHECK_INTERVAL_MS = 10 * 1000;
+      static final int IDLE_TIMEOUT_MS = 60 * 1000;
+
+      private final Cache<Integer, ShardWriter<K, V>> cache;
+
+      ShardWriterCache() {
+        this.cache = CacheBuilder
+            .newBuilder()
+            .expireAfterWrite(IDLE_TIMEOUT_MS, TimeUnit.MILLISECONDS)
+            .removalListener(new RemovalListener<Integer, ShardWriter<K, V>>()
{
+              @Override
+              public void onRemoval(RemovalNotification<Integer, ShardWriter<K, V>>
notification) {
+                if (notification.getCause() != RemovalCause.EXPLICIT) {
+                  ShardWriter writer = notification.getValue();
+                  LOG.info("{} : Closing idle shard writer {} after 1 minute of idle time.",
+                           writer.shard, writer.producerName);
+                  writer.producer.close();
+                }
+              }
+            }).build();
+
+        // run cache.cleanUp() every 10 seconds.
+        SCHEDULED_CLEAN_UP_THREAD.scheduleAtFixedRate(
+            new Runnable() {
+              @Override
+              public void run() {
+                cache.cleanUp();
+              }
+            },
+            CLEAN_UP_CHECK_INTERVAL_MS, CLEAN_UP_CHECK_INTERVAL_MS, TimeUnit.MILLISECONDS);
+      }
+
+      ShardWriter<K, V> removeIfPresent(int shard) {
+        return cache.asMap().remove(shard);
+      }
+
+      void insert(int shard, ShardWriter<K, V> writer) {
+        ShardWriter<K, V> existing = cache.asMap().putIfAbsent(shard, writer);
+        checkState(existing == null,
+                   "Unexpected multiple instances of writers for shard %s", shard);
+      }
+    }
+
+    // One cache for each sink (usually there is only one sink per pipeline)
+    private static final LoadingCache<String, ShardWriterCache<?, ?>> CACHE_BY_GROUP_ID
=
+        CacheBuilder.newBuilder()
+            .build(new CacheLoader<String, ShardWriterCache<?, ?>>() {
+              @Override
+              public ShardWriterCache<?, ?> load(String key) throws Exception {
+                return new ShardWriterCache<>();
+              }
+            });
   }
 
   /**


Mime
View raw message