gobblin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject incubator-gobblin git commit: [GOBBLIN-171] Add a writer wrapper that closes the wrapped writer and creates a new one
Date Sat, 29 Jul 2017 20:50:51 GMT
Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 252300e99 -> 11646563a


[GOBBLIN-171] Add a writer wrapper that closes the wrapped writer and creates a new one

Closes #2027 from htran1/close_on_flush_writer


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/11646563
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/11646563
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/11646563

Branch: refs/heads/master
Commit: 11646563a7cb17931b68d7ad124727e6df4e5bdb
Parents: 252300e
Author: Hung Tran <hutran@linkedin.com>
Authored: Sat Jul 29 13:50:46 2017 -0700
Committer: Abhishek Tiwari <abhishektiwari.btech@gmail.com>
Committed: Sat Jul 29 13:50:46 2017 -0700

----------------------------------------------------------------------
 .../configuration/ConfigurationKeys.java        |   3 +
 .../writer/InstrumentedDataWriterDecorator.java |   5 +-
 .../writer/CloseOnFlushWriterWrapper.java       | 146 +++++++++++++++++
 .../gobblin/writer/PartitionedDataWriter.java   |  33 +++-
 .../writer/CloseOnFlushWriterWrapperTest.java   | 160 +++++++++++++++++++
 5 files changed, 342 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/11646563/gobblin-api/src/main/java/gobblin/configuration/ConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/gobblin/configuration/ConfigurationKeys.java
index bb54b5d..5bb8460 100644
--- a/gobblin-api/src/main/java/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/gobblin/configuration/ConfigurationKeys.java
@@ -344,6 +344,9 @@ public class ConfigurationKeys {
   public static final String WRITER_BYTES_WRITTEN = WRITER_PREFIX + ".bytes.written";
   public static final String WRITER_EARLIEST_TIMESTAMP = WRITER_PREFIX + ".earliest.timestamp";
   public static final String WRITER_AVERAGE_TIMESTAMP = WRITER_PREFIX + ".average.timestamp";
+  // Used internally to enable closing of the writer on flush
+  public static final String WRITER_CLOSE_ON_FLUSH_KEY = WRITER_PREFIX + ".closeOnFlush";
+  public static final boolean DEFAULT_WRITER_CLOSE_ON_FLUSH = false;
 
   /**
    * Configuration properties used by the quality checker.

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/11646563/gobblin-core-base/src/main/java/gobblin/instrumented/writer/InstrumentedDataWriterDecorator.java
----------------------------------------------------------------------
diff --git a/gobblin-core-base/src/main/java/gobblin/instrumented/writer/InstrumentedDataWriterDecorator.java
b/gobblin-core-base/src/main/java/gobblin/instrumented/writer/InstrumentedDataWriterDecorator.java
index 5bd7b8a..3c63a8b 100644
--- a/gobblin-core-base/src/main/java/gobblin/instrumented/writer/InstrumentedDataWriterDecorator.java
+++ b/gobblin-core-base/src/main/java/gobblin/instrumented/writer/InstrumentedDataWriterDecorator.java
@@ -50,8 +50,9 @@ public class InstrumentedDataWriterDecorator<D> extends InstrumentedDataWriterBa
     super(state, Optional.<Class<?>> of(DecoratorUtils.resolveUnderlyingObject(writer).getClass()));
     this.embeddedWriter = this.closer.register(writer);
     this.isEmbeddedInstrumented = Instrumented.isLineageInstrumented(writer);
-    if (this.embeddedWriter instanceof WatermarkAwareWriter) {
-      this.watermarkAwareWriter = Optional.of((WatermarkAwareWriter) this.embeddedWriter);
+    Object underlying = DecoratorUtils.resolveUnderlyingObject(embeddedWriter);
+    if (underlying instanceof WatermarkAwareWriter) {
+      this.watermarkAwareWriter = Optional.of((WatermarkAwareWriter) underlying);
     } else {
       this.watermarkAwareWriter = Optional.absent();
     }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/11646563/gobblin-core/src/main/java/gobblin/writer/CloseOnFlushWriterWrapper.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/gobblin/writer/CloseOnFlushWriterWrapper.java b/gobblin-core/src/main/java/gobblin/writer/CloseOnFlushWriterWrapper.java
new file mode 100644
index 0000000..746140a
--- /dev/null
+++ b/gobblin-core/src/main/java/gobblin/writer/CloseOnFlushWriterWrapper.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package gobblin.writer;
+
+import java.io.IOException;
+import java.util.function.Supplier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.rholder.retry.RetryerBuilder;
+import com.google.common.base.Preconditions;
+
+import gobblin.configuration.ConfigurationKeys;
+import gobblin.configuration.State;
+import gobblin.records.ControlMessageHandler;
+import gobblin.stream.RecordEnvelope;
+import gobblin.util.Decorator;
+import gobblin.util.FinalState;
+
+/**
+ * The {@link CloseOnFlushWriterWrapper} closes the wrapped writer on flush and creates a
new writer using a
+ * {@link Supplier} on the next write. After the writer is closed the reference is still
available for inspection until
+ * a new writer is created on the next write.
+ * @param <D>
+ */
+public class CloseOnFlushWriterWrapper<D> extends WriterWrapper<D> implements
Decorator, FinalState, Retriable {
+  private static final Logger LOG = LoggerFactory.getLogger(CloseOnFlushWriterWrapper.class);
+
+  private final State state;
+  private DataWriter<D> writer;
+  private final Supplier<DataWriter<D>> writerSupplier;
+  private boolean closed;
+  // is the close functionality enabled?
+  private final boolean closeOnFlush;
+
+  public CloseOnFlushWriterWrapper(Supplier<DataWriter<D>> writerSupplier, State
state) {
+    Preconditions.checkNotNull(state, "State is required.");
+
+    this.state = state;
+    this.writerSupplier = writerSupplier;
+
+    this.writer = writerSupplier.get();
+    this.closed = false;
+
+    this.closeOnFlush = this.state.getPropAsBoolean(ConfigurationKeys.WRITER_CLOSE_ON_FLUSH_KEY,
+        ConfigurationKeys.DEFAULT_WRITER_CLOSE_ON_FLUSH);
+  }
+
+  @Override
+  public Object getDecoratedObject() {
+    return this.writer;
+  }
+
+  @Override
+  public void writeEnvelope(RecordEnvelope<D> record) throws IOException {
+    // get a new writer if last one was closed
+    if (this.closed) {
+      this.writer = writerSupplier.get();
+      this.closed = false;
+    }
+    this.writer.writeEnvelope(record);
+  }
+
+  @Override
+  public void close() throws IOException {
+    writer.close();
+    this.closed = true;
+  }
+
+  @Override
+  public void commit() throws IOException {
+    writer.commit();
+  }
+
+  @Override
+  public void cleanup() throws IOException {
+    writer.cleanup();
+
+  }
+
+  @Override
+  public long recordsWritten() {
+    return writer.recordsWritten();
+  }
+
+  @Override
+  public long bytesWritten() throws IOException {
+    return writer.bytesWritten();
+  }
+
+  @Override
+  public RetryerBuilder<Void> getRetryerBuilder() {
+    if (writer instanceof Retriable) {
+      return ((Retriable) writer).getRetryerBuilder();
+    }
+    return RetryWriter.createRetryBuilder(state);
+  }
+
+  @Override
+  public State getFinalState() {
+    State state = new State();
+
+    if (this.writer instanceof FinalState) {
+      state.addAll(((FinalState)this.writer).getFinalState());
+    } else {
+      LOG.warn("Wrapped writer does not implement FinalState: " + this.writer.getClass());
+    }
+
+    return state;
+  }
+
+  @Override
+  public ControlMessageHandler getMessageHandler() {
+    return this.writer.getMessageHandler();
+  }
+
+  /**
+   * The writer will be flushed. It will also be committed and closed if configured to be
closed on flush.
+   * @throws IOException
+   */
+  @Override
+  public void flush() throws IOException {
+    this.writer.flush();
+
+    // commit data then close the writer
+    if (this.closeOnFlush) {
+      commit();
+      close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/11646563/gobblin-core/src/main/java/gobblin/writer/PartitionedDataWriter.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/gobblin/writer/PartitionedDataWriter.java b/gobblin-core/src/main/java/gobblin/writer/PartitionedDataWriter.java
index 2cc8169..f5cb017 100644
--- a/gobblin-core/src/main/java/gobblin/writer/PartitionedDataWriter.java
+++ b/gobblin-core/src/main/java/gobblin/writer/PartitionedDataWriter.java
@@ -20,6 +20,7 @@ package gobblin.writer;
 import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
 
 import org.apache.avro.SchemaBuilder;
 import org.apache.avro.generic.GenericData;
@@ -81,8 +82,19 @@ public class PartitionedDataWriter<S, D> extends WriterWrapper<D>
implements Fin
       @Override
       public DataWriter<D> load(final GenericRecord key)
           throws Exception {
+        /* wrap the data writer to allow the option to close the writer on flush */
         return PartitionedDataWriter.this.closer
-            .register(new InstrumentedPartitionedDataWriterDecorator<>(createPartitionWriter(key),
state, key));
+            .register(new InstrumentedPartitionedDataWriterDecorator<>(
+                new CloseOnFlushWriterWrapper<D>(new Supplier<DataWriter<D>>()
{
+                  @Override
+                  public DataWriter<D> get() {
+                    try {
+                      return createPartitionWriter(key);
+                    } catch (IOException e) {
+                      throw new RuntimeException("Error creating writer", e);
+                    }
+                  }
+                }, state), state, key));
       }
     });
 
@@ -106,9 +118,24 @@ public class PartitionedDataWriter<S, D> extends WriterWrapper<D>
implements Fin
       }
     } else {
       this.shouldPartition = false;
-      DataWriter<D> dataWriter = builder.build();
+      // Support configuration to close the DataWriter on flush to allow publishing intermediate
results in a task
+      CloseOnFlushWriterWrapper closeOnFlushWriterWrapper =
+          new CloseOnFlushWriterWrapper<D>(new Supplier<DataWriter<D>>()
{
+            @Override
+            public DataWriter<D> get() {
+              try {
+                return builder.withWriterId(PartitionedDataWriter.this.baseWriterId + "_"
+                    + PartitionedDataWriter.this.writerIdSuffix++).build();
+              } catch (IOException e) {
+                throw new RuntimeException("Error creating writer", e);
+              }
+            }
+          }, state);
+      DataWriter<D> dataWriter = (DataWriter)closeOnFlushWriterWrapper.getDecoratedObject();
+
       InstrumentedDataWriterDecorator<D> writer =
-          this.closer.register(new InstrumentedDataWriterDecorator<>(dataWriter, state));
+          this.closer.register(new InstrumentedDataWriterDecorator<>(closeOnFlushWriterWrapper,
state));
+
       this.isSpeculativeAttemptSafe = this.isDataWriterForPartitionSafe(dataWriter);
       this.isWatermarkCapable = this.isDataWriterWatermarkCapable(dataWriter);
       this.partitionWriters.put(NON_PARTITIONED_WRITER_KEY, writer);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/11646563/gobblin-core/src/test/java/gobblin/writer/CloseOnFlushWriterWrapperTest.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/java/gobblin/writer/CloseOnFlushWriterWrapperTest.java
b/gobblin-core/src/test/java/gobblin/writer/CloseOnFlushWriterWrapperTest.java
new file mode 100644
index 0000000..84a81ec
--- /dev/null
+++ b/gobblin-core/src/test/java/gobblin/writer/CloseOnFlushWriterWrapperTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package gobblin.writer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Supplier;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import gobblin.configuration.ConfigurationKeys;
+import gobblin.configuration.WorkUnitState;
+import gobblin.stream.RecordEnvelope;
+
+public class CloseOnFlushWriterWrapperTest {
+
+  @Test
+  public void testCloseOnFlushDisabled()
+      throws IOException {
+    WorkUnitState state = new WorkUnitState();
+    List<DummyWriter> dummyWriters = new ArrayList<>();
+    CloseOnFlushWriterWrapper<byte[]> writer = getCloseOnFlushWriter(dummyWriters,
state);
+
+    byte[] record = new byte[]{'a', 'b', 'c', 'd'};
+
+    writer.writeEnvelope(new RecordEnvelope(record));
+    writer.flush();
+
+    Assert.assertEquals(dummyWriters.get(0).recordsWritten(), 1);
+    Assert.assertEquals(dummyWriters.get(0).flushCount, 1);
+    Assert.assertEquals(dummyWriters.get(0).closed, false);
+    Assert.assertEquals(dummyWriters.get(0).committed, false);
+  }
+
+  @Test
+  public void testCloseOnFlushEnabled()
+      throws IOException {
+    WorkUnitState state = new WorkUnitState();
+    state.getJobState().setProp(ConfigurationKeys.WRITER_CLOSE_ON_FLUSH_KEY, "true");
+    List<DummyWriter> dummyWriters = new ArrayList<>();
+    CloseOnFlushWriterWrapper<byte[]> writer = getCloseOnFlushWriter(dummyWriters,
state);
+
+    byte[] record = new byte[]{'a', 'b', 'c', 'd'};
+
+    writer.writeEnvelope(new RecordEnvelope(record));
+    writer.flush();
+
+    Assert.assertEquals(dummyWriters.get(0).recordsWritten(), 1);
+    Assert.assertEquals(dummyWriters.get(0).flushCount, 1);
+    Assert.assertEquals(dummyWriters.get(0).closed, true);
+    Assert.assertEquals(dummyWriters.get(0).committed, true);
+  }
+
+  @Test
+  public void testWriteAfterFlush()
+      throws IOException {
+    WorkUnitState state = new WorkUnitState();
+    state.getJobState().setProp(ConfigurationKeys.WRITER_CLOSE_ON_FLUSH_KEY, "true");
+    List<DummyWriter> dummyWriters = new ArrayList<>();
+    CloseOnFlushWriterWrapper<byte[]> writer = getCloseOnFlushWriter(dummyWriters,
state);
+
+    byte[] record = new byte[]{'a', 'b', 'c', 'd'};
+
+    writer.writeEnvelope(new RecordEnvelope(record));
+    writer.flush();
+
+    Assert.assertEquals(dummyWriters.size(), 1);
+    Assert.assertEquals(dummyWriters.get(0).recordsWritten(), 1);
+    Assert.assertEquals(dummyWriters.get(0).flushCount, 1);
+    Assert.assertEquals(dummyWriters.get(0).closed, true);
+    Assert.assertEquals(dummyWriters.get(0).committed, true);
+
+    writer.writeEnvelope(new RecordEnvelope(record));
+    writer.flush();
+
+    Assert.assertEquals(dummyWriters.size(), 2);
+    Assert.assertEquals(dummyWriters.get(1).recordsWritten(), 1);
+    Assert.assertEquals(dummyWriters.get(1).flushCount, 1);
+    Assert.assertEquals(dummyWriters.get(1).closed, true);
+    Assert.assertEquals(dummyWriters.get(1).committed, true);
+  }
+
+  private CloseOnFlushWriterWrapper getCloseOnFlushWriter(List<DummyWriter> dummyWriters,
WorkUnitState state) {
+    return new CloseOnFlushWriterWrapper<>(new Supplier<DataWriter<byte[]>>()
{
+      @Override
+      public DataWriter<byte[]> get() {
+        DummyWriter writer = new DummyWriter();
+        dummyWriters.add(writer);
+        return writer;
+      }
+    }, state.getJobState());
+  }
+
+  private static class DummyWriter implements DataWriter<byte[]> {
+    private int recordsSeen = 0;
+    private byte[] lastWrittenRecord;
+    private int flushCount = 0;
+    private boolean committed = false;
+    private boolean closed = false;
+
+    DummyWriter() {
+    }
+
+    @Override
+    public void write(byte[] record)
+        throws IOException {
+      this.recordsSeen++;
+      this.lastWrittenRecord = record;
+    }
+
+    @Override
+    public void commit()
+        throws IOException {
+      this.committed = true;
+    }
+
+    @Override
+    public void cleanup()
+        throws IOException {
+    }
+
+    @Override
+    public long recordsWritten() {
+      return this.recordsSeen;
+    }
+
+    @Override
+    public long bytesWritten()
+        throws IOException {
+      return 0;
+    }
+
+    @Override
+    public void close()
+        throws IOException {
+      this.closed = true;
+    }
+
+    @Override
+    public void flush() {
+      this.flushCount++;
+    }
+  }
+}


Mime
View raw message