gobblin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hut...@apache.org
Subject incubator-gobblin git commit: [GOBBLIN-234] Add a ControlMessageInjector that generates metadata up…
Date Wed, 20 Sep 2017 21:25:10 GMT
Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 6160adca2 -> 99b715238


[GOBBLIN-234] Add a ControlMessageInjector that generates metadata up…

Closes #2107 from htran1/metadata_update


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/99b71523
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/99b71523
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/99b71523

Branch: refs/heads/master
Commit: 99b71523887c7d7f780ac78c20941093bd3751f5
Parents: 6160adc
Author: Hung Tran <hutran@linkedin.com>
Authored: Wed Sep 20 14:25:02 2017 -0700
Committer: Hung Tran <hutran@linkedin.com>
Committed: Wed Sep 20 14:25:02 2017 -0700

----------------------------------------------------------------------
 .../configuration/ConfigurationKeys.java        |  4 +-
 .../org/apache/gobblin/converter/Converter.java |  4 +-
 .../gobblin/stream/FlushControlMessage.java     | 18 +++---
 .../gobblin/converter/AsyncConverter1to1.java   |  2 +-
 .../writer/CloseOnFlushWriterWrapper.java       | 65 +++++++++++++++++---
 .../gobblin/writer/PartitionedDataWriter.java   | 13 +++-
 .../writer/CloseOnFlushWriterWrapperTest.java   | 36 +++++------
 .../gobblin/writer/PartitionedWriterTest.java   |  2 +-
 .../gobblin/runtime/TestRecordStream.java       | 62 ++++++++-----------
 9 files changed, 125 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/99b71523/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index e54a54b..303ad15 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -342,6 +342,7 @@ public class ConfigurationKeys {
   public static final String SIMPLE_WRITER_DELIMITER = "simple.writer.delimiter";
   public static final String SIMPLE_WRITER_PREPEND_SIZE = "simple.writer.prepend.size";
 
+
   // Internal use only - used to send metadata to publisher
   public static final String WRITER_METADATA_KEY = WRITER_PREFIX + "._internal.metadata";
   public static final String WRITER_PARTITION_PATH_KEY = WRITER_PREFIX + "._internal.partition.path";
@@ -354,9 +355,6 @@ public class ConfigurationKeys {
   public static final String WRITER_BYTES_WRITTEN = WRITER_PREFIX + ".bytes.written";
   public static final String WRITER_EARLIEST_TIMESTAMP = WRITER_PREFIX + ".earliest.timestamp";
   public static final String WRITER_AVERAGE_TIMESTAMP = WRITER_PREFIX + ".average.timestamp";
-  // Used internally to enable closing of the writer on flush
-  public static final String WRITER_CLOSE_ON_FLUSH_KEY = WRITER_PREFIX + ".closeOnFlush";
-  public static final boolean DEFAULT_WRITER_CLOSE_ON_FLUSH = false;
 
   /**
    * Configuration properties used by the quality checker.

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/99b71523/gobblin-api/src/main/java/org/apache/gobblin/converter/Converter.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/converter/Converter.java b/gobblin-api/src/main/java/org/apache/gobblin/converter/Converter.java
index b4a45b7..514f5be 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/converter/Converter.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/converter/Converter.java
@@ -128,7 +128,7 @@ public abstract class Converter<SI, SO, DI, DO> implements Closeable,
FinalState
       WorkUnitState workUnitState) throws SchemaConversionException {
     init(workUnitState);
     this.outputGlobalMetadata = GlobalMetadata.<SI, SO>builderWithInput(inputStream.getGlobalMetadata(),
-        Optional.of(convertSchema(inputStream.getGlobalMetadata().getSchema(), workUnitState))).build();
+        Optional.fromNullable(convertSchema(inputStream.getGlobalMetadata().getSchema(),
workUnitState))).build();
     Flowable<StreamEntity<DO>> outputStream =
         inputStream.getRecordStream()
             .flatMap(in -> {
@@ -141,7 +141,7 @@ public abstract class Converter<SI, SO, DI, DO> implements Closeable,
FinalState
                 if (in instanceof MetadataUpdateControlMessage) {
                   this.outputGlobalMetadata = GlobalMetadata.<SI, SO>builderWithInput(
                       ((MetadataUpdateControlMessage) in).getGlobalMetadata(),
-                      Optional.of(convertSchema((SI)((MetadataUpdateControlMessage) in).getGlobalMetadata()
+                      Optional.fromNullable(convertSchema((SI)((MetadataUpdateControlMessage)
in).getGlobalMetadata()
                           .getSchema(), workUnitState))).build();
                   out = new MetadataUpdateControlMessage<SO, DO>(this.outputGlobalMetadata);
                 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/99b71523/gobblin-api/src/main/java/org/apache/gobblin/stream/FlushControlMessage.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/stream/FlushControlMessage.java
b/gobblin-api/src/main/java/org/apache/gobblin/stream/FlushControlMessage.java
index 8760754..7c23dd3 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/stream/FlushControlMessage.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/stream/FlushControlMessage.java
@@ -16,7 +16,9 @@
  */
 package org.apache.gobblin.stream;
 
+import lombok.AccessLevel;
 import lombok.AllArgsConstructor;
+import lombok.Builder;
 import lombok.EqualsAndHashCode;
 import lombok.Getter;
 
@@ -25,21 +27,23 @@ import lombok.Getter;
  * Control message for flushing writers
  * @param <D>
  */
-@AllArgsConstructor
+@AllArgsConstructor(access= AccessLevel.PRIVATE)
 @EqualsAndHashCode
+@Builder
 public class FlushControlMessage<D> extends ControlMessage<D> {
   @Getter
-  private final FlushReason flushReason;
+  private final String flushReason;
+  @Getter
+  private final FlushType flushType;
 
   @Override
   protected StreamEntity<D> buildClone() {
-    return new FlushControlMessage(flushReason);
+    return FlushControlMessage.<D>builder().flushReason(this.flushReason).flushType(this.flushType).build();
   }
 
   @AllArgsConstructor
-  @EqualsAndHashCode
-  public static class FlushReason {
-    @Getter
-    private final String reason;
+  public enum FlushType {
+    FlUSH,
+    FLUSH_AND_CLOSE /* use this type to request a close after flush */
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/99b71523/gobblin-core-base/src/main/java/org/apache/gobblin/converter/AsyncConverter1to1.java
----------------------------------------------------------------------
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/converter/AsyncConverter1to1.java
b/gobblin-core-base/src/main/java/org/apache/gobblin/converter/AsyncConverter1to1.java
index b5092ef..6d56e8a 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/converter/AsyncConverter1to1.java
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/converter/AsyncConverter1to1.java
@@ -95,7 +95,7 @@ public abstract class AsyncConverter1to1<SI, SO, DI, DO> extends Converter<SI,
S
               }
             }, false, maxConcurrentAsyncConversions);
     return inputStream.withRecordStream(outputStream, GlobalMetadata.<SI, SO>builderWithInput(inputStream.getGlobalMetadata(),
-        Optional.of(outputSchema)).build());
+        Optional.fromNullable(outputSchema)).build());
   }
 
   @RequiredArgsConstructor

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/99b71523/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java
b/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java
index 5b30cba..66e5a26 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java
@@ -29,6 +29,9 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.records.ControlMessageHandler;
 import org.apache.gobblin.records.FlushControlMessageHandler;
+import org.apache.gobblin.stream.ControlMessage;
+import org.apache.gobblin.stream.FlushControlMessage;
+import org.apache.gobblin.stream.MetadataUpdateControlMessage;
 import org.apache.gobblin.stream.RecordEnvelope;
 import org.apache.gobblin.util.Decorator;
 import org.apache.gobblin.util.FinalState;
@@ -40,6 +43,13 @@ import org.apache.gobblin.util.FinalState;
  * @param <D>
  */
 public class CloseOnFlushWriterWrapper<D> extends WriterWrapper<D> implements
Decorator, FinalState, Retriable {
+  // Used internally to enable closing of the writer on flush
+  public static final String WRITER_CLOSE_ON_FLUSH_KEY = ConfigurationKeys.WRITER_PREFIX
+ ".closeOnFlush";
+  public static final boolean DEFAULT_WRITER_CLOSE_ON_FLUSH = false;
+
+  public static final String WRITER_CLOSE_ON_METADATA_UPDATE = ConfigurationKeys.WRITER_PREFIX
+ ".closeOnMetadataUpdate";
+  public static final boolean DEFAULT_CLOSE_ON_METADATA_UPDATE = true;
+
   private static final Logger LOG = LoggerFactory.getLogger(CloseOnFlushWriterWrapper.class);
 
   private final State state;
@@ -48,6 +58,8 @@ public class CloseOnFlushWriterWrapper<D> extends WriterWrapper<D>
implements De
   private boolean closed;
   // is the close functionality enabled?
   private final boolean closeOnFlush;
+  private final ControlMessageHandler controlMessageHandler;
+  private final boolean closeOnMetadataUpdate;
 
   public CloseOnFlushWriterWrapper(Supplier<DataWriter<D>> writerSupplier, State
state) {
     Preconditions.checkNotNull(state, "State is required.");
@@ -58,8 +70,12 @@ public class CloseOnFlushWriterWrapper<D> extends WriterWrapper<D>
implements De
     this.writer = writerSupplier.get();
     this.closed = false;
 
-    this.closeOnFlush = this.state.getPropAsBoolean(ConfigurationKeys.WRITER_CLOSE_ON_FLUSH_KEY,
-        ConfigurationKeys.DEFAULT_WRITER_CLOSE_ON_FLUSH);
+    this.closeOnFlush = this.state.getPropAsBoolean(WRITER_CLOSE_ON_FLUSH_KEY,
+        DEFAULT_WRITER_CLOSE_ON_FLUSH);
+
+    this.controlMessageHandler = new CloseOnFlushWriterMessageHandler();
+    this.closeOnMetadataUpdate = this.state.getPropAsBoolean(WRITER_CLOSE_ON_METADATA_UPDATE,
+        DEFAULT_CLOSE_ON_METADATA_UPDATE);
   }
 
   @Override
@@ -129,13 +145,7 @@ public class CloseOnFlushWriterWrapper<D> extends WriterWrapper<D>
implements De
 
   @Override
   public ControlMessageHandler getMessageHandler() {
-    // if close on flush is configured then create a handler that will invoke the wrapper's
flush to perform close
-    // on flush operations, otherwise return the wrapped writer's handler.
-    if (this.closeOnFlush) {
-      return new FlushControlMessageHandler(this);
-    } else {
-      return this.writer.getMessageHandler();
-    }
+    return this.controlMessageHandler;
   }
 
   /**
@@ -144,12 +154,47 @@ public class CloseOnFlushWriterWrapper<D> extends WriterWrapper<D>
implements De
    */
   @Override
   public void flush() throws IOException {
+    flush(this.closeOnFlush);
+  }
+
+  private void flush(boolean close) throws IOException {
     this.writer.flush();
 
     // commit data then close the writer
-    if (this.closeOnFlush) {
+    if (close) {
       commit();
       close();
     }
   }
+
+  /**
+   * A {@link ControlMessageHandler} that handles closing on flush
+   */
+  private class CloseOnFlushWriterMessageHandler implements ControlMessageHandler {
+    @Override
+    public void handleMessage(ControlMessage message) {
+      ControlMessageHandler underlyingHandler = CloseOnFlushWriterWrapper.this.writer.getMessageHandler();
+
+      // let underlying writer handle the control messages first
+      underlyingHandler.handleMessage(message);
+
+      // Handle close after flush logic. The file is closed if requested by the flush or
the configuration.
+      if ((message instanceof FlushControlMessage &&
+          (CloseOnFlushWriterWrapper.this.closeOnFlush ||
+              ((FlushControlMessage) message).getFlushType() == FlushControlMessage.FlushType.FLUSH_AND_CLOSE))
||
+          (message instanceof MetadataUpdateControlMessage && CloseOnFlushWriterWrapper.this.closeOnMetadataUpdate))
{
+        try {
+          // avoid flushing again
+          if (underlyingHandler instanceof FlushControlMessageHandler) {
+            commit();
+            close();
+          } else {
+            flush(true);
+          }
+        } catch (IOException e) {
+          throw new RuntimeException("Could not flush when handling FlushControlMessage",
e);
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/99b71523/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
b/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
index a667b86..83dd074 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
@@ -44,6 +44,7 @@ import org.apache.gobblin.instrumented.writer.InstrumentedPartitionedDataWriterD
 import org.apache.gobblin.records.ControlMessageHandler;
 import org.apache.gobblin.source.extractor.CheckpointableWatermark;
 import org.apache.gobblin.stream.ControlMessage;
+import org.apache.gobblin.stream.MetadataUpdateControlMessage;
 import org.apache.gobblin.stream.RecordEnvelope;
 import org.apache.gobblin.stream.StreamEntity;
 import org.apache.gobblin.util.AvroUtils;
@@ -68,8 +69,10 @@ public class PartitionedDataWriter<S, D> extends WriterWrapper<D>
implements Fin
   private final Optional<WriterPartitioner> partitioner;
   private final LoadingCache<GenericRecord, DataWriter<D>> partitionWriters;
   private final Optional<PartitionAwareDataWriterBuilder> builder;
+  private final DataWriterBuilder writerBuilder;
   private final boolean shouldPartition;
   private final Closer closer;
+  private final ControlMessageHandler controlMessageHandler;
   private boolean isSpeculativeAttemptSafe;
   private boolean isWatermarkCapable;
 
@@ -79,6 +82,8 @@ public class PartitionedDataWriter<S, D> extends WriterWrapper<D>
implements Fin
     this.isWatermarkCapable = true;
     this.baseWriterId = builder.getWriterId();
     this.closer = Closer.create();
+    this.writerBuilder = builder;
+    this.controlMessageHandler = new PartitionDataWriterMessageHandler();
     this.partitionWriters = CacheBuilder.newBuilder().build(new CacheLoader<GenericRecord,
DataWriter<D>>() {
       @Override
       public DataWriter<D> load(final GenericRecord key)
@@ -322,7 +327,7 @@ public class PartitionedDataWriter<S, D> extends WriterWrapper<D>
implements Fin
 
   @Override
   public ControlMessageHandler getMessageHandler() {
-    return new PartitionDataWriterMessageHandler();
+    return this.controlMessageHandler;
   }
 
   /**
@@ -333,6 +338,12 @@ public class PartitionedDataWriter<S, D> extends WriterWrapper<D>
implements Fin
     public void handleMessage(ControlMessage message) {
       StreamEntity.ForkCloner cloner = message.forkCloner();
 
+      // update the schema used to build writers
+      if (message instanceof MetadataUpdateControlMessage) {
+        PartitionedDataWriter.this.writerBuilder.withSchema(((MetadataUpdateControlMessage)
message)
+            .getGlobalMetadata().getSchema());
+      }
+
       for (DataWriter writer : PartitionedDataWriter.this.partitionWriters.asMap().values())
{
         ControlMessage cloned = (ControlMessage) cloner.getClone();
         writer.getMessageHandler().handleMessage(cloned);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/99b71523/gobblin-core/src/test/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapperTest.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapperTest.java
b/gobblin-core/src/test/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapperTest.java
index 6006572..ef435b1 100644
--- a/gobblin-core/src/test/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapperTest.java
+++ b/gobblin-core/src/test/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapperTest.java
@@ -27,6 +27,7 @@ import org.testng.annotations.Test;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.records.ControlMessageHandler;
+import org.apache.gobblin.records.FlushControlMessageHandler;
 import org.apache.gobblin.stream.ControlMessage;
 import org.apache.gobblin.stream.FlushControlMessage;
 import org.apache.gobblin.stream.RecordEnvelope;
@@ -43,86 +44,85 @@ public class CloseOnFlushWriterWrapperTest {
     byte[] record = new byte[]{'a', 'b', 'c', 'd'};
 
     writer.writeEnvelope(new RecordEnvelope(record));
-    writer.getMessageHandler().handleMessage(new FlushControlMessage(new FlushControlMessage.FlushReason("flush")));
+
+    writer.getMessageHandler().handleMessage(FlushControlMessage.builder().build());
 
     Assert.assertEquals(dummyWriters.get(0).recordsWritten(), 1);
     Assert.assertEquals(dummyWriters.get(0).flushCount, 1);
     Assert.assertEquals(dummyWriters.get(0).closeCount, 0);
     Assert.assertFalse(dummyWriters.get(0).committed);
-    Assert.assertTrue(dummyWriters.get(0).handlerCalled);
+    Assert.assertEquals(dummyWriters.get(0).handlerCalled, 1);
   }
 
   @Test
   public void testCloseOnFlushEnabled()
       throws IOException {
     WorkUnitState state = new WorkUnitState();
-    state.getJobState().setProp(ConfigurationKeys.WRITER_CLOSE_ON_FLUSH_KEY, "true");
+    state.getJobState().setProp(CloseOnFlushWriterWrapper.WRITER_CLOSE_ON_FLUSH_KEY, "true");
     List<DummyWriter> dummyWriters = new ArrayList<>();
     CloseOnFlushWriterWrapper<byte[]> writer = getCloseOnFlushWriter(dummyWriters,
state);
 
     byte[] record = new byte[]{'a', 'b', 'c', 'd'};
 
     writer.writeEnvelope(new RecordEnvelope(record));
-    writer.getMessageHandler().handleMessage(new FlushControlMessage(new FlushControlMessage.FlushReason("flush")));
+    writer.getMessageHandler().handleMessage(FlushControlMessage.builder().build());
 
     Assert.assertEquals(dummyWriters.get(0).recordsWritten(), 1);
     Assert.assertEquals(dummyWriters.get(0).flushCount, 1);
     Assert.assertEquals(dummyWriters.get(0).closeCount, 1);
     Assert.assertTrue(dummyWriters.get(0).committed);
-    // handler from CloseOnFlushWriterWrapper should have been called instead
-    Assert.assertFalse(dummyWriters.get(0).handlerCalled);
+    Assert.assertEquals(dummyWriters.get(0).handlerCalled, 1);
   }
 
   @Test
   public void testWriteAfterFlush()
       throws IOException {
     WorkUnitState state = new WorkUnitState();
-    state.getJobState().setProp(ConfigurationKeys.WRITER_CLOSE_ON_FLUSH_KEY, "true");
+    state.getJobState().setProp(CloseOnFlushWriterWrapper.WRITER_CLOSE_ON_FLUSH_KEY, "true");
     List<DummyWriter> dummyWriters = new ArrayList<>();
     CloseOnFlushWriterWrapper<byte[]> writer = getCloseOnFlushWriter(dummyWriters,
state);
 
     byte[] record = new byte[]{'a', 'b', 'c', 'd'};
 
     writer.writeEnvelope(new RecordEnvelope(record));
-    writer.getMessageHandler().handleMessage(new FlushControlMessage(new FlushControlMessage.FlushReason("flush")));
+    writer.getMessageHandler().handleMessage(FlushControlMessage.builder().build());
 
     Assert.assertEquals(dummyWriters.size(), 1);
     Assert.assertEquals(dummyWriters.get(0).recordsWritten(), 1);
     Assert.assertEquals(dummyWriters.get(0).flushCount, 1);
     Assert.assertEquals(dummyWriters.get(0).closeCount, 1);
     Assert.assertTrue(dummyWriters.get(0).committed);
-    Assert.assertFalse(dummyWriters.get(0).handlerCalled);
+    Assert.assertEquals(dummyWriters.get(0).handlerCalled, 1);
 
     writer.writeEnvelope(new RecordEnvelope(record));
-    writer.getMessageHandler().handleMessage(new FlushControlMessage(new FlushControlMessage.FlushReason("flush")));
+    writer.getMessageHandler().handleMessage(FlushControlMessage.builder().build());
 
     Assert.assertEquals(dummyWriters.size(), 2);
     Assert.assertEquals(dummyWriters.get(1).recordsWritten(), 1);
     Assert.assertEquals(dummyWriters.get(1).flushCount, 1);
     Assert.assertEquals(dummyWriters.get(1).closeCount, 1);
     Assert.assertTrue(dummyWriters.get(1).committed);
-    Assert.assertFalse(dummyWriters.get(1).handlerCalled);
+    Assert.assertEquals(dummyWriters.get(1).handlerCalled, 1);
   }
 
   @Test
   public void testCloseAfterFlush()
       throws IOException {
     WorkUnitState state = new WorkUnitState();
-    state.getJobState().setProp(ConfigurationKeys.WRITER_CLOSE_ON_FLUSH_KEY, "true");
+    state.getJobState().setProp(CloseOnFlushWriterWrapper.WRITER_CLOSE_ON_FLUSH_KEY, "true");
     List<DummyWriter> dummyWriters = new ArrayList<>();
     CloseOnFlushWriterWrapper<byte[]> writer = getCloseOnFlushWriter(dummyWriters,
state);
 
     byte[] record = new byte[]{'a', 'b', 'c', 'd'};
 
     writer.writeEnvelope(new RecordEnvelope(record));
-    writer.getMessageHandler().handleMessage(new FlushControlMessage(new FlushControlMessage.FlushReason("flush")));
+    writer.getMessageHandler().handleMessage(FlushControlMessage.builder().build());
 
     Assert.assertEquals(dummyWriters.get(0).recordsWritten(), 1);
     Assert.assertEquals(dummyWriters.get(0).flushCount, 1);
     Assert.assertEquals(dummyWriters.get(0).closeCount, 1);
     Assert.assertTrue(dummyWriters.get(0).committed);
-    // handler from CloseOnFlushWriterWrapper should have been called instead
-    Assert.assertFalse(dummyWriters.get(0).handlerCalled);
+    Assert.assertEquals(dummyWriters.get(0).handlerCalled, 1);
 
     writer.close();
 
@@ -148,7 +148,7 @@ public class CloseOnFlushWriterWrapperTest {
     private int flushCount = 0;
     private int closeCount = 0;
     private boolean committed = false;
-    private boolean handlerCalled = false;
+    private int handlerCalled = 0;
 
     DummyWriter() {
     }
@@ -190,10 +190,10 @@ public class CloseOnFlushWriterWrapperTest {
 
     @Override
     public ControlMessageHandler getMessageHandler() {
-      return new ControlMessageHandler() {
+      return new FlushControlMessageHandler(this) {
         @Override
         public void handleMessage(ControlMessage message) {
-          handlerCalled = true;
+          handlerCalled++;
           if (message instanceof FlushControlMessage) {
             flush();
           }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/99b71523/gobblin-core/src/test/java/org/apache/gobblin/writer/PartitionedWriterTest.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/java/org/apache/gobblin/writer/PartitionedWriterTest.java
b/gobblin-core/src/test/java/org/apache/gobblin/writer/PartitionedWriterTest.java
index c00c823..0dc9846 100644
--- a/gobblin-core/src/test/java/org/apache/gobblin/writer/PartitionedWriterTest.java
+++ b/gobblin-core/src/test/java/org/apache/gobblin/writer/PartitionedWriterTest.java
@@ -184,7 +184,7 @@ public class PartitionedWriterTest {
     String record2 = "123";
     writer.writeEnvelope(new RecordEnvelope(record2));
 
-    FlushControlMessage controlMessage = new FlushControlMessage<>(new FlushControlMessage.FlushReason("test"));
+    FlushControlMessage controlMessage = FlushControlMessage.builder().build();
     BasicAckableForTesting ackable = new BasicAckableForTesting();
 
     controlMessage.addCallBack(ackable);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/99b71523/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TestRecordStream.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TestRecordStream.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TestRecordStream.java
index d00938e..620e401 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TestRecordStream.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TestRecordStream.java
@@ -39,7 +39,6 @@ import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.converter.Converter;
 import org.apache.gobblin.converter.DataConversionException;
 import org.apache.gobblin.converter.SchemaConversionException;
-import org.apache.gobblin.converter.SingleRecordIterable;
 import org.apache.gobblin.fork.IdentityForkOperator;
 import org.apache.gobblin.metadata.GlobalMetadata;
 import org.apache.gobblin.publisher.TaskPublisher;
@@ -47,6 +46,7 @@ import org.apache.gobblin.qualitychecker.row.RowLevelPolicyChecker;
 import org.apache.gobblin.qualitychecker.task.TaskLevelPolicyCheckResults;
 import org.apache.gobblin.qualitychecker.task.TaskLevelPolicyChecker;
 import org.apache.gobblin.records.ControlMessageHandler;
+import org.apache.gobblin.records.FlushControlMessageHandler;
 import org.apache.gobblin.records.RecordStreamProcessor;
 import org.apache.gobblin.records.RecordStreamWithMetadata;
 import org.apache.gobblin.source.extractor.Extractor;
@@ -97,8 +97,8 @@ public class TestRecordStream {
   @Test
   public void testFlushControlMessages() throws Exception {
     MyExtractor extractor = new MyExtractor(new StreamEntity[]{new RecordEnvelope<>("a"),
-        new FlushControlMessage(new FlushControlMessage.FlushReason("flush1")), new RecordEnvelope<>("b"),
-        new FlushControlMessage(new FlushControlMessage.FlushReason("flush2"))});
+        FlushControlMessage.builder().flushReason("flush1").build(), new RecordEnvelope<>("b"),
+        FlushControlMessage.builder().flushReason("flush2").build()});
     MyConverter converter = new MyConverter();
     MyFlushDataWriter writer = new MyFlushDataWriter();
 
@@ -109,11 +109,12 @@ public class TestRecordStream {
     Assert.assertEquals(task.getTaskState().getWorkingState(), WorkUnitState.WorkingState.SUCCESSFUL);
 
     Assert.assertEquals(converter.records, Lists.newArrayList("a", "b"));
-    Assert.assertEquals(converter.messages, Lists.newArrayList(new FlushControlMessage(new
FlushControlMessage.FlushReason("flush1")),
-        new FlushControlMessage(new FlushControlMessage.FlushReason("flush2"))));
+    Assert.assertEquals(converter.messages, Lists.newArrayList(
+        FlushControlMessage.builder().flushReason("flush1").build(),
+        FlushControlMessage.builder().flushReason("flush2").build()));
 
     Assert.assertEquals(writer.records, Lists.newArrayList("a", "b"));
-    Assert.assertEquals(writer.messages, Lists.newArrayList("flush called", "flush called"));
+    Assert.assertEquals(writer.flush_messages, Lists.newArrayList("flush called", "flush
called"));
   }
 
   /**
@@ -187,7 +188,7 @@ public class TestRecordStream {
         new RecordEnvelope<>("schema:b"), new RecordEnvelope<>("schema1:c"),
new RecordEnvelope<>("schema2:d")});
     SchemaChangeDetectionInjector injector = new SchemaChangeDetectionInjector();
     SchemaAppendConverter converter = new SchemaAppendConverter();
-    MyDataWriter writer = new MyDataWriter();
+    MyDataWriter writer = new MyDataWriterWithSchemaCheck();
 
     Task task = setupTask(extractor, writer, Collections.EMPTY_LIST,
         Lists.newArrayList(injector, converter));
@@ -351,8 +352,9 @@ public class TestRecordStream {
   }
 
   static class MyDataWriter extends DataWriterBuilder<String, String> implements DataWriter<String>
{
-    private List<String> records = new ArrayList<>();
-    private List<ControlMessage<String>> messages = new ArrayList<>();
+    protected List<String> records = new ArrayList<>();
+    protected List<ControlMessage<String>> messages = new ArrayList<>();
+    protected String writerSchema;
 
     @Override
     public void write(String record) throws IOException {
@@ -382,6 +384,7 @@ public class TestRecordStream {
 
     @Override
     public DataWriter<String> build() throws IOException {
+      this.writerSchema = this.schema;
       return this;
     }
 
@@ -467,7 +470,7 @@ public class TestRecordStream {
       String recordSchema = inputRecordEnvelope.getRecord().split(":")[0];
 
       if (!recordSchema.equals(this.globalMetadata.getSchema())) {
-        return new SingleRecordIterable<>(new MetadataUpdateControlMessage<>(
+        return Lists.newArrayList(new MetadataUpdateControlMessage<>(
             GlobalMetadata.<String>builder().schema(recordSchema).build()));
       }
 
@@ -486,43 +489,26 @@ public class TestRecordStream {
     }
   }
 
-  static class MyFlushDataWriter extends DataWriterBuilder<String, String> implements
DataWriter<String> {
-    private List<String> records = new ArrayList<>();
-    private List<String> messages = new ArrayList<>();
-
-    @Override
-    public void write(String record) throws IOException {
-      this.records.add(record);
-    }
-
-    @Override
-    public void commit() throws IOException {}
-
-    @Override
-    public void cleanup() throws IOException {}
-
-    @Override
-    public long recordsWritten() {
-      return 0;
-    }
+  static class MyFlushDataWriter extends MyDataWriter {
+    private List<String> flush_messages = new ArrayList<>();
 
     @Override
-    public long bytesWritten() throws IOException {
-      return 0;
+    public ControlMessageHandler getMessageHandler() {
+      return new FlushControlMessageHandler(this);
     }
 
     @Override
-    public DataWriter<String> build() throws IOException {
-      return this;
+    public void flush() throws IOException {
+      flush_messages.add("flush called");
     }
+  }
 
+  static class MyDataWriterWithSchemaCheck extends MyDataWriter {
     @Override
-    public void close() throws IOException {}
+    public void write(String record) throws IOException {
+      super.write(record);
 
-    @Override
-    public void flush() throws IOException {
-      messages.add("flush called");
+      Assert.assertEquals(this.writerSchema, record.split(":")[1]);
     }
   }
-
 }


Mime
View raw message