gobblin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hut...@apache.org
Subject incubator-gobblin git commit: [GOBBLIN-236] Add a ControlMessage injector as a RecordStreamProcessor
Date Thu, 14 Sep 2017 19:54:30 GMT
Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 14c1b03bb -> 3a035737f


[GOBBLIN-236] Add a ControlMessage injector as a RecordStreamProcessor

Closes #2090 from htran1/control_message_injection


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/3a035737
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/3a035737
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/3a035737

Branch: refs/heads/master
Commit: 3a035737f72ce2bffbbe58beac2044b3b46da992
Parents: 14c1b03
Author: Hung Tran <hutran@linkedin.com>
Authored: Thu Sep 14 12:54:23 2017 -0700
Committer: Hung Tran <hutran@linkedin.com>
Committed: Thu Sep 14 12:54:23 2017 -0700

----------------------------------------------------------------------
 .../configuration/ConfigurationKeys.java        |   1 +
 .../org/apache/gobblin/converter/Converter.java |  29 ++-
 .../java/org/apache/gobblin/fork/Forker.java    |   6 +-
 .../apache/gobblin/metadata/GlobalMetadata.java |  69 +++++++
 .../records/RecordStreamWithMetadata.java       |  20 +-
 .../gobblin/source/extractor/Extractor.java     |   3 +-
 .../gobblin/stream/ControlMessageInjector.java  | 143 ++++++++++++++
 .../stream/MetadataUpdateControlMessage.java    |  41 ++++
 .../apache/gobblin/converter/ConverterTest.java |  13 +-
 .../org/apache/gobblin/fork/ForkerTest.java     |   4 +-
 .../gobblin/converter/AsyncConverter1to1.java   |  16 +-
 .../extractor/InstrumentedExtractorBase.java    |   4 +-
 .../converter/AsyncConverter1to1Test.java       |   7 +-
 .../gobblin/runtime/StreamModelTaskRunner.java  |  23 ++-
 .../java/org/apache/gobblin/runtime/Task.java   |  50 +++--
 .../org/apache/gobblin/runtime/TaskContext.java |  57 ++++++
 .../gobblin/runtime/TestRecordStream.java       | 188 ++++++++++++++++++-
 17 files changed, 628 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3a035737/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index c986fd6..32d4729 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -147,6 +147,7 @@ public class ConfigurationKeys {
   public static final String TASK_DATA_ROOT_DIR_KEY = "task.data.root.dir";
   public static final String SOURCE_CLASS_KEY = "source.class";
   public static final String CONVERTER_CLASSES_KEY = "converter.classes";
+  public static final String RECORD_STREAM_PROCESSOR_CLASSES_KEY = "recordStreamProcessor.classes";
   public static final String FORK_OPERATOR_CLASS_KEY = "fork.operator.class";
   public static final String DEFAULT_FORK_OPERATOR_CLASS = "org.apache.gobblin.fork.IdentityForkOperator";
   public static final String JOB_COMMIT_POLICY_KEY = "job.commit.policy";

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3a035737/gobblin-api/src/main/java/org/apache/gobblin/converter/Converter.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/converter/Converter.java b/gobblin-api/src/main/java/org/apache/gobblin/converter/Converter.java
index 82cf10e..b4a45b7 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/converter/Converter.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/converter/Converter.java
@@ -25,15 +25,19 @@ import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.converter.initializer.ConverterInitializer;
 import org.apache.gobblin.converter.initializer.NoopConverterInitializer;
+import org.apache.gobblin.metadata.GlobalMetadata;
 import org.apache.gobblin.stream.ControlMessage;
 import org.apache.gobblin.records.ControlMessageHandler;
 import org.apache.gobblin.records.RecordStreamProcessor;
 import org.apache.gobblin.records.RecordStreamWithMetadata;
+import org.apache.gobblin.stream.MetadataUpdateControlMessage;
 import org.apache.gobblin.stream.RecordEnvelope;
 import org.apache.gobblin.source.workunit.WorkUnitStream;
 import org.apache.gobblin.stream.StreamEntity;
 import org.apache.gobblin.util.FinalState;
 
+import com.google.common.base.Optional;
+
 import io.reactivex.Flowable;
 
 
@@ -55,6 +59,9 @@ import io.reactivex.Flowable;
  * @param <DO> output data type
  */
 public abstract class Converter<SI, SO, DI, DO> implements Closeable, FinalState, RecordStreamProcessor<SI, SO, DI, DO> {
+  // Metadata containing the output schema. This may be changed when a MetadataUpdateControlMessage is received.
+  private GlobalMetadata<SO> outputGlobalMetadata;
+
   /**
    * Initialize this {@link Converter}.
    *
@@ -120,16 +127,30 @@ public abstract class Converter<SI, SO, DI, DO> implements Closeable, FinalState
   public RecordStreamWithMetadata<DO, SO> processStream(RecordStreamWithMetadata<DI, SI> inputStream,
       WorkUnitState workUnitState) throws SchemaConversionException {
     init(workUnitState);
-    SO outputSchema = convertSchema(inputStream.getSchema(), workUnitState);
+    this.outputGlobalMetadata = GlobalMetadata.<SI, SO>builderWithInput(inputStream.getGlobalMetadata(),
+        Optional.of(convertSchema(inputStream.getGlobalMetadata().getSchema(), workUnitState))).build();
     Flowable<StreamEntity<DO>> outputStream =
         inputStream.getRecordStream()
             .flatMap(in -> {
               if (in instanceof ControlMessage) {
+                ControlMessage out = (ControlMessage) in;
+
                 getMessageHandler().handleMessage((ControlMessage) in);
-                return Flowable.just(((ControlMessage<DO>) in));
+
+                // update the output schema with the new input schema from the MetadataUpdateControlMessage
+                if (in instanceof MetadataUpdateControlMessage) {
+                  this.outputGlobalMetadata = GlobalMetadata.<SI, SO>builderWithInput(
+                      ((MetadataUpdateControlMessage) in).getGlobalMetadata(),
+                      Optional.of(convertSchema((SI)((MetadataUpdateControlMessage) in).getGlobalMetadata()
+                          .getSchema(), workUnitState))).build();
+                  out = new MetadataUpdateControlMessage<SO, DO>(this.outputGlobalMetadata);
+                }
+
+                return Flowable.just(((ControlMessage<DO>) out));
               } else if (in instanceof RecordEnvelope) {
                 RecordEnvelope<DI> recordEnvelope = (RecordEnvelope<DI>) in;
-                Iterator<DO> convertedIterable = convertRecord(outputSchema, recordEnvelope.getRecord(), workUnitState).iterator();
+                Iterator<DO> convertedIterable = convertRecord(this.outputGlobalMetadata.getSchema(),
+                    recordEnvelope.getRecord(), workUnitState).iterator();
 
                 if (!convertedIterable.hasNext()) {
                   // if the iterable is empty, ack the record, return an empty flowable
@@ -153,7 +174,7 @@ public abstract class Converter<SI, SO, DI, DO> implements Closeable, FinalState
               }
             }, 1);
     outputStream = outputStream.doOnComplete(this::close);
-    return inputStream.withRecordStream(outputStream, outputSchema);
+    return inputStream.withRecordStream(outputStream, this.outputGlobalMetadata);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3a035737/gobblin-api/src/main/java/org/apache/gobblin/fork/Forker.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/fork/Forker.java b/gobblin-api/src/main/java/org/apache/gobblin/fork/Forker.java
index d70e5ba..d077f78 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/fork/Forker.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/fork/Forker.java
@@ -24,6 +24,7 @@ import com.google.common.collect.Lists;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.metadata.GlobalMetadata;
 import org.apache.gobblin.records.RecordStreamWithMetadata;
 import org.apache.gobblin.stream.ControlMessage;
 import org.apache.gobblin.stream.RecordEnvelope;
@@ -59,7 +60,7 @@ public class Forker {
     workUnitState.setProp(ConfigurationKeys.FORK_BRANCHES_KEY, branches);
 
     forkOperator.init(workUnitState);
-    List<Boolean> forkedSchemas = forkOperator.forkSchema(workUnitState, inputStream.getSchema());
+    List<Boolean> forkedSchemas = forkOperator.forkSchema(workUnitState, inputStream.getGlobalMetadata().getSchema());
     int activeForks = (int) forkedSchemas.stream().filter(b -> b).count();
 
     Preconditions.checkState(forkedSchemas.size() == branches, String
@@ -90,7 +91,8 @@ public class Forker {
         Flowable<StreamEntity<D>> thisStream =
             forkedStream.filter(new ForkFilter<>(idx)).map(RecordWithForkMap::getRecordCopyIfNecessary);
         forkStreams.add(inputStream.withRecordStream(thisStream,
-            mustCopy ? (S) CopyHelper.copy(inputStream.getSchema()) : inputStream.getSchema()));
+            mustCopy ? (GlobalMetadata<S>) CopyHelper.copy(inputStream.getGlobalMetadata()) :
+                inputStream.getGlobalMetadata()));
       } else {
         forkStreams.add(null);
       }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3a035737/gobblin-api/src/main/java/org/apache/gobblin/metadata/GlobalMetadata.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/metadata/GlobalMetadata.java b/gobblin-api/src/main/java/org/apache/gobblin/metadata/GlobalMetadata.java
new file mode 100644
index 0000000..48a6943
--- /dev/null
+++ b/gobblin-api/src/main/java/org/apache/gobblin/metadata/GlobalMetadata.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.metadata;
+
+import org.apache.gobblin.fork.CopyHelper;
+import org.apache.gobblin.fork.CopyNotSupportedException;
+import org.apache.gobblin.fork.Copyable;
+
+import com.google.common.base.Optional;
+
+import lombok.AccessLevel;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+
+
+/**
+ * Global metadata
+ * @param <S> schema type
+ */
+@AllArgsConstructor(access=AccessLevel.PRIVATE)
+@EqualsAndHashCode
+@Builder
+public class GlobalMetadata<S> implements Copyable<GlobalMetadata<S>> {
+  @Getter
+  private S schema;
+
+  @Override
+  public GlobalMetadata<S> copy() throws CopyNotSupportedException {
+    if (CopyHelper.isCopyable(schema)) {
+      return new GlobalMetadata((S)CopyHelper.copy(schema));
+    }
+
+    throw new CopyNotSupportedException("Type is not copyable: " + schema.getClass().getName());
+  }
+
+  /**
+   * Builder that takes in an input {@GlobalMetadata} to use as a base.
+   * @param inputMetadata input metadata
+   * @param outputSchema output schema to set in the builder
+   * @param <SI> input schema type
+   * @param <SO> output schema type
+   * @return builder
+   */
+  public static <SI, SO> GlobalMetadataBuilder<SO> builderWithInput(GlobalMetadata<SI> inputMetadata, Optional<SO> outputSchema) {
+    GlobalMetadataBuilder<SO> builder = (GlobalMetadataBuilder<SO>) builder();
+
+    if (outputSchema.isPresent()) {
+      builder.schema(outputSchema.get());
+    }
+
+    return builder;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3a035737/gobblin-api/src/main/java/org/apache/gobblin/records/RecordStreamWithMetadata.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/records/RecordStreamWithMetadata.java b/gobblin-api/src/main/java/org/apache/gobblin/records/RecordStreamWithMetadata.java
index 57c1e7c..c5dda8f 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/records/RecordStreamWithMetadata.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/records/RecordStreamWithMetadata.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.records;
 
 import java.util.function.Function;
 
+import org.apache.gobblin.metadata.GlobalMetadata;
 import org.apache.gobblin.stream.RecordEnvelope;
 import org.apache.gobblin.stream.StreamEntity;
 
@@ -34,20 +35,29 @@ import lombok.Data;
 @Data
 public class RecordStreamWithMetadata<D, S> {
   private final Flowable<StreamEntity<D>> recordStream;
-  private final S schema;
+  private final GlobalMetadata<S> globalMetadata;
 
   /**
    * @return a new {@link RecordStreamWithMetadata} with a different {@link #recordStream} but same schema.
    */
   public <DO> RecordStreamWithMetadata<DO, S> withRecordStream(Flowable<StreamEntity<DO>> newRecordStream) {
-    return withRecordStream(newRecordStream, this.schema);
+    return withRecordStream(newRecordStream, this.globalMetadata);
   }
 
   /**
-   * @return a new {@link RecordStreamWithMetadata} with a different {@link #recordStream} and {@link #schema}.
+   * @return a new {@link RecordStreamWithMetadata} with a different {@link #recordStream} and {@link #globalMetadata}.
    */
+  @Deprecated
   public <DO, SO> RecordStreamWithMetadata<DO, SO> withRecordStream(Flowable<StreamEntity<DO>> newRecordStream, SO newSchema) {
-    return new RecordStreamWithMetadata<>(newRecordStream, newSchema);
+    return new RecordStreamWithMetadata<>(newRecordStream, GlobalMetadata.<SO>builder().schema(newSchema).build());
+  }
+
+  /**
+   * @return a new {@link RecordStreamWithMetadata} with a different {@link #recordStream} and {@link #globalMetadata}.
+   */
+  public <DO, SO> RecordStreamWithMetadata<DO, SO> withRecordStream(Flowable<StreamEntity<DO>> newRecordStream,
+      GlobalMetadata<SO> newGlobalMetadata) {
+    return new RecordStreamWithMetadata<>(newRecordStream, newGlobalMetadata);
   }
 
   /**
@@ -56,7 +66,7 @@ public class RecordStreamWithMetadata<D, S> {
    */
   public <DO> RecordStreamWithMetadata<DO, S>
       mapStream(Function<? super Flowable<StreamEntity<D>>, ? extends Flowable<StreamEntity<DO>>> transform) {
-    return new RecordStreamWithMetadata<>(transform.apply(this.recordStream), this.schema);
+    return new RecordStreamWithMetadata<>(transform.apply(this.recordStream), this.globalMetadata);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3a035737/gobblin-api/src/main/java/org/apache/gobblin/source/extractor/Extractor.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/source/extractor/Extractor.java b/gobblin-api/src/main/java/org/apache/gobblin/source/extractor/Extractor.java
index 22b1670..9749795 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/source/extractor/Extractor.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/source/extractor/Extractor.java
@@ -21,6 +21,7 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.gobblin.metadata.GlobalMetadata;
 import org.apache.gobblin.records.RecordStreamWithMetadata;
 import org.apache.gobblin.stream.RecordEnvelope;
 import org.apache.gobblin.stream.StreamEntity;
@@ -128,7 +129,7 @@ public interface Extractor<S, D> extends Closeable {
       }
     });
     recordStream = recordStream.doFinally(this::close);
-    return new RecordStreamWithMetadata<>(recordStream, schema);
+    return new RecordStreamWithMetadata<>(recordStream, GlobalMetadata.<S>builder().schema(schema).build());
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3a035737/gobblin-api/src/main/java/org/apache/gobblin/stream/ControlMessageInjector.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/stream/ControlMessageInjector.java b/gobblin-api/src/main/java/org/apache/gobblin/stream/ControlMessageInjector.java
new file mode 100644
index 0000000..93ac0ae
--- /dev/null
+++ b/gobblin-api/src/main/java/org/apache/gobblin/stream/ControlMessageInjector.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.stream;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.metadata.GlobalMetadata;
+import org.apache.gobblin.records.ControlMessageHandler;
+import org.apache.gobblin.records.RecordStreamProcessor;
+import org.apache.gobblin.records.RecordStreamWithMetadata;
+
+import io.reactivex.Flowable;
+
+/**
+ * A {@link RecordStreamProcessor} that inspects an input record and outputs control messages before, after, or around
+ * the input record
+ * @param <SI>
+ * @param <DI>
+ */
+public abstract class ControlMessageInjector<SI, DI> implements Closeable,
+        RecordStreamProcessor<SI, SI, DI, DI> {
+
+  /**
+   * Initialize this {@link ControlMessageInjector}.
+   *
+   * @param workUnitState a {@link WorkUnitState} object carrying configuration properties
+   * @return an initialized {@link ControlMessageInjector} instance
+   */
+  protected ControlMessageInjector<SI, DI> init(WorkUnitState workUnitState) {
+    return this;
+  }
+
+  @Override
+  public void close() throws IOException {
+  }
+
+  /**
+   * Set the global metadata of the input messages. The base implementation is empty and should be overridden by
+   * the subclasses that need to store the input {@link GlobalMetadata}
+   * @param inputGlobalMetadata the global metadata for input messages
+   * @param workUnitState
+   */
+  protected void setInputGlobalMetadata(GlobalMetadata<SI> inputGlobalMetadata, WorkUnitState workUnitState) {
+  }
+
+  /**
+   * Inject {@link ControlMessage}s before the record
+   * @param inputRecordEnvelope
+   * @param workUnitState
+   * @return The {@link ControlMessage}s to inject before the record
+   */
+  protected abstract Iterable<ControlMessage<DI>> injectControlMessagesBefore(RecordEnvelope<DI> inputRecordEnvelope,
+      WorkUnitState workUnitState);
+
+  /**
+   * Inject {@link ControlMessage}s after the record
+   * @param inputRecordEnvelope
+   * @param workUnitState
+   * @return The {@link ControlMessage}s to inject after the record
+   */
+  protected abstract Iterable<ControlMessage<DI>> injectControlMessagesAfter(RecordEnvelope<DI> inputRecordEnvelope,
+      WorkUnitState workUnitState);
+
+  /**
+   * Apply injections to the input {@link RecordStreamWithMetadata}.
+   * {@link ControlMessage}s may be injected before, after, or around the input record.
+   * A {@link MetadataUpdateControlMessage} will update the current input {@link GlobalMetadata} and pass the
+   * updated input {@link GlobalMetadata} to the next processor to propagate the metadata update down the pipeline.
+   */
+  @Override
+  public RecordStreamWithMetadata<DI, SI> processStream(RecordStreamWithMetadata<DI, SI> inputStream,
+      WorkUnitState workUnitState) throws StreamProcessingException {
+    init(workUnitState);
+
+    setInputGlobalMetadata(inputStream.getGlobalMetadata(), workUnitState);
+
+    Flowable<StreamEntity<DI>> outputStream =
+        inputStream.getRecordStream()
+            .flatMap(in -> {
+              if (in instanceof ControlMessage) {
+                if (in instanceof MetadataUpdateControlMessage) {
+                  setInputGlobalMetadata(((MetadataUpdateControlMessage) in).getGlobalMetadata(), workUnitState);
+                }
+
+                getMessageHandler().handleMessage((ControlMessage) in);
+                return Flowable.just(in);
+              } else if (in instanceof RecordEnvelope) {
+                RecordEnvelope<DI> recordEnvelope = (RecordEnvelope<DI>) in;
+                Iterable<ControlMessage<DI>> injectedBeforeIterable =
+                    injectControlMessagesBefore(recordEnvelope, workUnitState);
+                Iterable<ControlMessage<DI>> injectedAfterIterable =
+                    injectControlMessagesAfter(recordEnvelope, workUnitState);
+
+                if (injectedBeforeIterable == null && injectedAfterIterable == null) {
+                  // nothing injected so return the record envelope
+                  return Flowable.just(recordEnvelope);
+                } else {
+                  Flowable<StreamEntity<DI>> flowable;
+
+                  if (injectedBeforeIterable != null) {
+                    flowable = Flowable.<StreamEntity<DI>>fromIterable(injectedBeforeIterable)
+                        .concatWith(Flowable.just(recordEnvelope));
+                  } else {
+                    flowable = Flowable.just(recordEnvelope);
+                  }
+
+                  if (injectedAfterIterable != null) {
+                    flowable.concatWith(Flowable.fromIterable(injectedAfterIterable));
+                  }
+                  return flowable;
+                }
+              } else {
+                throw new UnsupportedOperationException();
+              }
+            }, 1);
+    outputStream = outputStream.doOnComplete(this::close);
+    return inputStream.withRecordStream(outputStream, inputStream.getGlobalMetadata());
+  }
+
+  /**
+   * @return {@link ControlMessageHandler} to call for each {@link ControlMessage} received.
+   */
+  protected ControlMessageHandler getMessageHandler() {
+    return ControlMessageHandler.NOOP;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3a035737/gobblin-api/src/main/java/org/apache/gobblin/stream/MetadataUpdateControlMessage.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/stream/MetadataUpdateControlMessage.java b/gobblin-api/src/main/java/org/apache/gobblin/stream/MetadataUpdateControlMessage.java
new file mode 100644
index 0000000..7edd991
--- /dev/null
+++ b/gobblin-api/src/main/java/org/apache/gobblin/stream/MetadataUpdateControlMessage.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.stream;
+
+import org.apache.gobblin.metadata.GlobalMetadata;
+
+import lombok.AllArgsConstructor;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+
+
+/**
+ * Control message for updating the {@link GlobalMetadata} used for processing records
+ * @param <S> schema type
+ * @param <D> data type
+ */
+@AllArgsConstructor
+@EqualsAndHashCode
+public class MetadataUpdateControlMessage<S, D> extends ControlMessage<D> {
+  @Getter
+  private GlobalMetadata<S> globalMetadata;
+
+  @Override
+  protected StreamEntity<D> buildClone() {
+    return new MetadataUpdateControlMessage(this.globalMetadata);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3a035737/gobblin-api/src/test/java/org/apache/gobblin/converter/ConverterTest.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/test/java/org/apache/gobblin/converter/ConverterTest.java b/gobblin-api/src/test/java/org/apache/gobblin/converter/ConverterTest.java
index f5f0ed5..00ac4f9 100644
--- a/gobblin-api/src/test/java/org/apache/gobblin/converter/ConverterTest.java
+++ b/gobblin-api/src/test/java/org/apache/gobblin/converter/ConverterTest.java
@@ -26,6 +26,7 @@ import com.google.common.collect.Lists;
 
 import org.apache.gobblin.ack.BasicAckableForTesting;
 import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.metadata.GlobalMetadata;
 import org.apache.gobblin.records.RecordStreamWithMetadata;
 import org.apache.gobblin.stream.ControlMessage;
 import org.apache.gobblin.stream.RecordEnvelope;
@@ -42,7 +43,8 @@ public class ConverterTest {
     BasicAckableForTesting ackable = new BasicAckableForTesting();
 
     RecordStreamWithMetadata<Integer, String> stream =
-        new RecordStreamWithMetadata<>(Flowable.just(new RecordEnvelope<>(0)), "schema").mapRecords(r -> {
+        new RecordStreamWithMetadata<>(Flowable.just(new RecordEnvelope<>(0)),
+            GlobalMetadata.<String>builder().schema("schema").build()).mapRecords(r -> {
           r.addCallBack(ackable);
           return r;
         });
@@ -60,7 +62,8 @@ public class ConverterTest {
     BasicAckableForTesting ackable = new BasicAckableForTesting();
 
     RecordStreamWithMetadata<Integer, String> stream =
-        new RecordStreamWithMetadata<>(Flowable.just(new RecordEnvelope<>(1)), "schema").mapRecords(r -> {
+        new RecordStreamWithMetadata<>(Flowable.just(new RecordEnvelope<>(1)),
+            GlobalMetadata.<String>builder().schema("schema").build()).mapRecords(r -> {
           r.addCallBack(ackable);
           return r;
         });
@@ -81,7 +84,8 @@ public class ConverterTest {
     BasicAckableForTesting ackable = new BasicAckableForTesting();
 
     RecordStreamWithMetadata<Integer, String> stream =
-        new RecordStreamWithMetadata<>(Flowable.just(new RecordEnvelope<>(2)), "schema").mapRecords(r -> {
+        new RecordStreamWithMetadata<>(Flowable.just(new RecordEnvelope<>(2)),
+            GlobalMetadata.<String>builder().schema("schema").build()).mapRecords(r -> {
           r.addCallBack(ackable);
           return r;
         });
@@ -105,7 +109,8 @@ public class ConverterTest {
     BasicAckableForTesting ackable = new BasicAckableForTesting();
 
     RecordStreamWithMetadata<Integer, String> stream =
-        new RecordStreamWithMetadata<>(Flowable.just(new RecordEnvelope<>(1), new MyControlMessage<>()), "schema").mapRecords(r -> {
+        new RecordStreamWithMetadata<>(Flowable.just(new RecordEnvelope<>(1), new MyControlMessage<>()),
+            GlobalMetadata.<String>builder().schema("schema").build()).mapRecords(r -> {
           r.addCallBack(ackable);
           return r;
         });

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3a035737/gobblin-api/src/test/java/org/apache/gobblin/fork/ForkerTest.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/test/java/org/apache/gobblin/fork/ForkerTest.java b/gobblin-api/src/test/java/org/apache/gobblin/fork/ForkerTest.java
index 1c8beb1..30125f3 100644
--- a/gobblin-api/src/test/java/org/apache/gobblin/fork/ForkerTest.java
+++ b/gobblin-api/src/test/java/org/apache/gobblin/fork/ForkerTest.java
@@ -32,6 +32,7 @@ import com.google.common.collect.Lists;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.metadata.GlobalMetadata;
 import org.apache.gobblin.records.RecordStreamWithMetadata;
 import org.apache.gobblin.runtime.BasicTestControlMessage;
 import org.apache.gobblin.stream.RecordEnvelope;
@@ -48,7 +49,8 @@ public class ForkerTest {
     Forker forker = new Forker();
     MyFlowable<StreamEntity<byte[]>> flowable = new MyFlowable<>();
 
-    RecordStreamWithMetadata<byte[], String> stream = new RecordStreamWithMetadata<>(flowable, "schema");
+    RecordStreamWithMetadata<byte[], String> stream =
+        new RecordStreamWithMetadata<>(flowable, GlobalMetadata.<String>builder().schema("schema").build());
 
     WorkUnitState workUnitState = new WorkUnitState();
     workUnitState.setProp(ConfigurationKeys.FORK_BRANCHES_KEY, "3");

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3a035737/gobblin-core-base/src/main/java/org/apache/gobblin/converter/AsyncConverter1to1.java
----------------------------------------------------------------------
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/converter/AsyncConverter1to1.java b/gobblin-core-base/src/main/java/org/apache/gobblin/converter/AsyncConverter1to1.java
index 7eb1563..b5092ef 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/converter/AsyncConverter1to1.java
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/converter/AsyncConverter1to1.java
@@ -19,9 +19,12 @@ package org.apache.gobblin.converter;
 
 import java.util.concurrent.CompletableFuture;
 
+import com.google.common.base.Optional;
+
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.metadata.GlobalMetadata;
 import org.apache.gobblin.stream.ControlMessage;
 import org.apache.gobblin.records.RecordStreamWithMetadata;
 import org.apache.gobblin.stream.RecordEnvelope;
@@ -64,12 +67,20 @@ public abstract class AsyncConverter1to1<SI, SO, DI, DO> extends Converter<SI, S
   protected abstract CompletableFuture<DO> convertRecordAsync(SO outputSchema, DI inputRecord, WorkUnitState workUnit)
       throws DataConversionException;
 
+  /**
+   * Return a {@link RecordStreamWithMetadata} with the appropriate modifications.
+   * @param inputStream
+   * @param workUnitState
+   * @return
+   * @throws SchemaConversionException
+   * @implNote this processStream does not handle {@link org.apache.gobblin.stream.MetadataUpdateControlMessage}s
+   */
   @Override
   public RecordStreamWithMetadata<DO, SO> processStream(RecordStreamWithMetadata<DI, SI> inputStream,
       WorkUnitState workUnitState) throws SchemaConversionException {
     int maxConcurrentAsyncConversions = workUnitState.getPropAsInt(MAX_CONCURRENT_ASYNC_CONVERSIONS_KEY,
         DEFAULT_MAX_CONCURRENT_ASYNC_CONVERSIONS);
-    SO outputSchema = convertSchema(inputStream.getSchema(), workUnitState);
+    SO outputSchema = convertSchema(inputStream.getGlobalMetadata().getSchema(), workUnitState);
     Flowable<StreamEntity<DO>> outputStream =
         inputStream.getRecordStream()
             .flatMapSingle(in -> {
@@ -83,7 +94,8 @@ public abstract class AsyncConverter1to1<SI, SO, DI, DO> extends Converter<SI, S
                 throw new IllegalStateException("Expected ControlMessage or RecordEnvelope.");
               }
             }, false, maxConcurrentAsyncConversions);
-    return inputStream.withRecordStream(outputStream, outputSchema);
+    return inputStream.withRecordStream(outputStream, GlobalMetadata.<SI, SO>builderWithInput(inputStream.getGlobalMetadata(),
+        Optional.of(outputSchema)).build());
   }
 
   @RequiredArgsConstructor

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3a035737/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/extractor/InstrumentedExtractorBase.java
----------------------------------------------------------------------
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/extractor/InstrumentedExtractorBase.java b/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/extractor/InstrumentedExtractorBase.java
index 35de712..1dcb1d1 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/extractor/InstrumentedExtractorBase.java
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/extractor/InstrumentedExtractorBase.java
@@ -34,6 +34,7 @@ import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.instrumented.Instrumentable;
 import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metadata.GlobalMetadata;
 import org.apache.gobblin.metrics.GobblinMetrics;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.MetricNames;
@@ -190,7 +191,8 @@ public abstract class InstrumentedExtractorBase<S, D>
       }
     });
     recordStream = recordStream.doFinally(this::close);
-    return new RecordStreamWithMetadata<>(recordStream, schema);
+    return new RecordStreamWithMetadata<>(recordStream, GlobalMetadata.<S>builder().schema(schema).build());
+
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3a035737/gobblin-core-base/src/test/java/org/apache/gobblin/converter/AsyncConverter1to1Test.java
----------------------------------------------------------------------
diff --git a/gobblin-core-base/src/test/java/org/apache/gobblin/converter/AsyncConverter1to1Test.java b/gobblin-core-base/src/test/java/org/apache/gobblin/converter/AsyncConverter1to1Test.java
index 01ec3b4..77ae3ab 100644
--- a/gobblin-core-base/src/test/java/org/apache/gobblin/converter/AsyncConverter1to1Test.java
+++ b/gobblin-core-base/src/test/java/org/apache/gobblin/converter/AsyncConverter1to1Test.java
@@ -31,6 +31,7 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
 import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.metadata.GlobalMetadata;
 import org.apache.gobblin.records.RecordStreamWithMetadata;
 import org.apache.gobblin.stream.RecordEnvelope;
 import org.apache.gobblin.util.ExponentialBackoff;
@@ -52,7 +53,8 @@ public class AsyncConverter1to1Test {
     workUnitState.setProp(AsyncConverter1to1.MAX_CONCURRENT_ASYNC_CONVERSIONS_KEY, 3);
 
     RecordStreamWithMetadata<String, String> stream =
-        new RecordStreamWithMetadata<>(Flowable.range(0, 5).map(i -> i.toString()).map(RecordEnvelope::new), "schema");
+        new RecordStreamWithMetadata<>(Flowable.range(0, 5).map(i -> i.toString()).map(RecordEnvelope::new),
+            GlobalMetadata.<String>builder().schema("schema").build());
 
     Set<String> outputRecords = Sets.newConcurrentHashSet();
 
@@ -106,7 +108,8 @@ public class AsyncConverter1to1Test {
     workUnitState.setProp(AsyncConverter1to1.MAX_CONCURRENT_ASYNC_CONVERSIONS_KEY, 3);
 
     RecordStreamWithMetadata<String, String> stream =
-        new RecordStreamWithMetadata<>(Flowable.just("0", MyAsyncConverter1to1.FAIL, "1").map(RecordEnvelope::new), "schema");
+        new RecordStreamWithMetadata<>(Flowable.just("0", MyAsyncConverter1to1.FAIL, "1").map(RecordEnvelope::new),
+            GlobalMetadata.<String>builder().schema("schema").build());
 
     Set<String> outputRecords = Sets.newConcurrentHashSet();
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3a035737/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StreamModelTaskRunner.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StreamModelTaskRunner.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StreamModelTaskRunner.java
index 2b0b47b..7713b79 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StreamModelTaskRunner.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StreamModelTaskRunner.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.runtime;
 
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -32,6 +33,7 @@ import org.apache.gobblin.converter.Converter;
 import org.apache.gobblin.fork.ForkOperator;
 import org.apache.gobblin.fork.Forker;
 import org.apache.gobblin.qualitychecker.row.RowLevelPolicyChecker;
+import org.apache.gobblin.records.RecordStreamProcessor;
 import org.apache.gobblin.records.RecordStreamWithMetadata;
 import org.apache.gobblin.runtime.fork.Fork;
 import org.apache.gobblin.source.extractor.Extractor;
@@ -61,6 +63,7 @@ public class StreamModelTaskRunner {
   private final TaskContext taskContext;
   private final Extractor extractor;
   private final Converter converter;
+  private final List<RecordStreamProcessor<?,?,?,?>> recordStreamProcessors;
   private final RowLevelPolicyChecker rowChecker;
   private final TaskExecutor taskExecutor;
   private final ExecutionModel taskMode;
@@ -102,13 +105,21 @@ public class StreamModelTaskRunner {
         return r;
       });
     }
-    if (this.converter instanceof MultiConverter) {
-      // if multiconverter, unpack it
-      for (Converter cverter : ((MultiConverter) this.converter).getConverters()) {
-        stream = cverter.processStream(stream, this.taskState);
+
+    // Use the recordStreamProcessor list if it is configured. This list can contain both all RecordStreamProcessor types
+    if (!this.recordStreamProcessors.isEmpty()) {
+      for (RecordStreamProcessor streamProcessor : this.recordStreamProcessors) {
+        stream = streamProcessor.processStream(stream, this.taskState);
       }
     } else {
-      stream = this.converter.processStream(stream, this.taskState);
+      if (this.converter instanceof MultiConverter) {
+        // if multiconverter, unpack it
+        for (Converter cverter : ((MultiConverter) this.converter).getConverters()) {
+          stream = cverter.processStream(stream, this.taskState);
+        }
+      } else {
+        stream = this.converter.processStream(stream, this.taskState);
+      }
     }
     stream = this.rowChecker.processStream(stream, this.taskState);
 
@@ -124,7 +135,7 @@ public class StreamModelTaskRunner {
         if (isForkAsync) {
           forkedStream = forkedStream.mapStream(f -> f.observeOn(Schedulers.from(this.taskExecutor.getForkExecutor()), false, bufferSize));
         }
-        Fork fork = new Fork(this.taskContext, forkedStream.getSchema(), forkedStreams.getForkedStreams().size(), fidx, this.taskMode);
+        Fork fork = new Fork(this.taskContext, forkedStream.getGlobalMetadata().getSchema(), forkedStreams.getForkedStreams().size(), fidx, this.taskMode);
         fork.consumeRecordStream(forkedStream);
         this.forks.put(Optional.of(fork), Optional.of(Futures.immediateFuture(null)));
         this.task.configureStreamingFork(fork, this.watermarkingStrategy);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3a035737/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
index 69c9a1c..65cf611 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.runtime;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -29,12 +30,12 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.lang3.BooleanUtils;
-import org.apache.gobblin.converter.DataConversionException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
 
 import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
@@ -43,14 +44,13 @@ import com.google.common.io.Closer;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
-import lombok.NoArgsConstructor;
-
 import org.apache.gobblin.Constructs;
 import org.apache.gobblin.commit.SpeculativeAttemptAwareConstruct;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.converter.Converter;
+import org.apache.gobblin.converter.DataConversionException;
 import org.apache.gobblin.fork.CopyHelper;
 import org.apache.gobblin.fork.CopyNotSupportedException;
 import org.apache.gobblin.fork.Copyable;
@@ -64,6 +64,7 @@ import org.apache.gobblin.publisher.DataPublisher;
 import org.apache.gobblin.publisher.SingleTaskDataPublisher;
 import org.apache.gobblin.qualitychecker.row.RowLevelPolicyCheckResults;
 import org.apache.gobblin.qualitychecker.row.RowLevelPolicyChecker;
+import org.apache.gobblin.records.RecordStreamProcessor;
 import org.apache.gobblin.runtime.fork.AsynchronousFork;
 import org.apache.gobblin.runtime.fork.Fork;
 import org.apache.gobblin.runtime.fork.SynchronousFork;
@@ -71,18 +72,13 @@ import org.apache.gobblin.runtime.task.TaskIFace;
 import org.apache.gobblin.runtime.util.TaskMetrics;
 import org.apache.gobblin.source.extractor.Extractor;
 import org.apache.gobblin.source.extractor.JobCommitPolicy;
-import org.apache.gobblin.stream.RecordEnvelope;
 import org.apache.gobblin.source.extractor.StreamingExtractor;
 import org.apache.gobblin.state.ConstructState;
+import org.apache.gobblin.stream.RecordEnvelope;
 import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.writer.AcknowledgableWatermark;
-import org.apache.gobblin.writer.DataWriter;
-import org.apache.gobblin.writer.FineGrainedWatermarkTracker;
-import org.apache.gobblin.writer.MultiWriterWatermarkManager;
-import org.apache.gobblin.writer.TrackerBasedWatermarkManager;
-import org.apache.gobblin.writer.WatermarkAwareWriter;
-import org.apache.gobblin.writer.WatermarkManager;
-import org.apache.gobblin.writer.WatermarkStorage;
+import org.apache.gobblin.writer.*;
+
+import lombok.NoArgsConstructor;
 
 
 /**
@@ -139,6 +135,7 @@ public class Task implements TaskIFace {
   private final Optional<WatermarkManager> watermarkManager;
   private final Optional<FineGrainedWatermarkTracker> watermarkTracker;
   private final Optional<WatermarkStorage> watermarkStorage;
+  private final List<RecordStreamProcessor<?,?,?,?>> recordStreamProcessors;
 
   private final Closer closer;
 
@@ -173,7 +170,32 @@ public class Task implements TaskIFace {
     this.extractor =
         closer.register(new InstrumentedExtractorDecorator<>(this.taskState, this.taskContext.getExtractor()));
 
-    this.converter = closer.register(new MultiConverter(this.taskContext.getConverters()));
+    this.recordStreamProcessors = this.taskContext.getRecordStreamProcessors();
+
+    // add record stream processors to closer if they are closeable
+    for (RecordStreamProcessor r: recordStreamProcessors) {
+      if (r instanceof Closeable) {
+        this.closer.register((Closeable)r);
+      }
+    }
+
+    List<Converter<?,?,?,?>> converters = this.taskContext.getConverters();
+
+    this.converter = closer.register(new MultiConverter(converters));
+
+    // can't have both record stream processors and converter lists configured
+    try {
+      Preconditions.checkState(this.recordStreamProcessors.isEmpty() || converters.isEmpty(),
+          "Converters cannot be specified when RecordStreamProcessors are specified");
+    } catch (IllegalStateException e) {
+      try {
+        closer.close();
+      } catch (Throwable t) {
+        LOG.error("Failed to close all open resources", t);
+      }
+      throw new TaskInstantiationException("Converters cannot be specified when RecordStreamProcessors are specified");
+    }
+
     try {
       this.rowChecker = closer.register(this.taskContext.getRowLevelPolicyChecker());
     } catch (Exception e) {
@@ -314,7 +336,7 @@ public class Task implements TaskIFace {
         runSynchronousModel();
       } else {
         new StreamModelTaskRunner(this, this.taskState, this.closer, this.taskContext, this.extractor,
-            this.converter, this.rowChecker, this.taskExecutor, this.taskMode, this.shutdownRequested,
+            this.converter, this.recordStreamProcessors, this.rowChecker, this.taskExecutor, this.taskMode, this.shutdownRequested,
             this.watermarkTracker, this.watermarkManager, this.watermarkStorage, this.forks, this.watermarkingStrategy).run();
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3a035737/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskContext.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskContext.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskContext.java
index 590cdf9..7b71eba 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskContext.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskContext.java
@@ -41,6 +41,7 @@ import org.apache.gobblin.qualitychecker.row.RowLevelPolicyCheckerBuilderFactory
 import org.apache.gobblin.qualitychecker.task.TaskLevelPolicyCheckResults;
 import org.apache.gobblin.qualitychecker.task.TaskLevelPolicyChecker;
 import org.apache.gobblin.qualitychecker.task.TaskLevelPolicyCheckerBuilderFactory;
+import org.apache.gobblin.records.RecordStreamProcessor;
 import org.apache.gobblin.runtime.util.TaskMetrics;
 import org.apache.gobblin.source.Source;
 import org.apache.gobblin.source.extractor.Extractor;
@@ -232,6 +233,62 @@ public class TaskContext {
   }
 
   /**
+   * Get the list of pre-fork {@link RecordStreamProcessor}s.
+   *
+   * @return list (possibly empty) of {@link RecordStreamProcessor}s
+   */
+  public List<RecordStreamProcessor<?, ?, ?, ?>> getRecordStreamProcessors() {
+    return getRecordStreamProcessors(-1, this.taskState);
+  }
+
+  /**
+   * Get the list of post-fork {@link RecordStreamProcessor}s for a given branch.
+   *
+   * @param index branch index
+   * @param forkTaskState a {@link TaskState} instance specific to the fork identified by the branch index
+   * @return list (possibly empty) of {@link RecordStreamProcessor}s
+   */
+  @SuppressWarnings("unchecked")
+  public List<RecordStreamProcessor<?, ?, ?, ?>> getRecordStreamProcessors(int index, TaskState forkTaskState) {
+    String streamProcessorClassKey =
+        ForkOperatorUtils.getPropertyNameForBranch(ConfigurationKeys.RECORD_STREAM_PROCESSOR_CLASSES_KEY, index);
+
+    if (!this.taskState.contains(streamProcessorClassKey)) {
+      return Collections.emptyList();
+    }
+
+    if (index >= 0) {
+      forkTaskState.setProp(ConfigurationKeys.FORK_BRANCH_ID_KEY, index);
+    }
+
+    List<RecordStreamProcessor<?, ?, ?, ?>> streamProcessors = Lists.newArrayList();
+    for (String streamProcessorClass : Splitter.on(",").omitEmptyStrings().trimResults()
+        .split(this.taskState.getProp(streamProcessorClassKey))) {
+      try {
+        RecordStreamProcessor<?, ?, ?, ?> streamProcessor =
+            RecordStreamProcessor.class.cast(Class.forName(streamProcessorClass).newInstance());
+
+        if (streamProcessor instanceof Converter) {
+          InstrumentedConverterDecorator instrumentedConverter =
+              new InstrumentedConverterDecorator<>((Converter)streamProcessor);
+          instrumentedConverter.init(forkTaskState);
+          streamProcessors.add(instrumentedConverter);
+        } else {
+          streamProcessors.add(streamProcessor);
+        }
+      } catch (ClassNotFoundException cnfe) {
+        throw new RuntimeException(cnfe);
+      } catch (InstantiationException ie) {
+        throw new RuntimeException(ie);
+      } catch (IllegalAccessException iae) {
+        throw new RuntimeException(iae);
+      }
+    }
+
+    return streamProcessors;
+  }
+
+  /**
    * Get the {@link ForkOperator} to be applied to converted input schema and data record.
    *
    * @return {@link ForkOperator} to be used or <code>null</code> if none is specified

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3a035737/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TestRecordStream.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TestRecordStream.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TestRecordStream.java
index c2f0cdb..d00938e 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TestRecordStream.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TestRecordStream.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.runtime;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
@@ -38,27 +39,33 @@ import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.converter.Converter;
 import org.apache.gobblin.converter.DataConversionException;
 import org.apache.gobblin.converter.SchemaConversionException;
+import org.apache.gobblin.converter.SingleRecordIterable;
 import org.apache.gobblin.fork.IdentityForkOperator;
+import org.apache.gobblin.metadata.GlobalMetadata;
 import org.apache.gobblin.publisher.TaskPublisher;
 import org.apache.gobblin.qualitychecker.row.RowLevelPolicyChecker;
 import org.apache.gobblin.qualitychecker.task.TaskLevelPolicyCheckResults;
 import org.apache.gobblin.qualitychecker.task.TaskLevelPolicyChecker;
 import org.apache.gobblin.records.ControlMessageHandler;
+import org.apache.gobblin.records.RecordStreamProcessor;
 import org.apache.gobblin.records.RecordStreamWithMetadata;
 import org.apache.gobblin.source.extractor.Extractor;
 import org.apache.gobblin.source.workunit.Extract;
 import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.stream.ControlMessage;
+import org.apache.gobblin.stream.ControlMessageInjector;
 import org.apache.gobblin.stream.FlushControlMessage;
+import org.apache.gobblin.stream.MetadataUpdateControlMessage;
 import org.apache.gobblin.stream.RecordEnvelope;
 import org.apache.gobblin.stream.StreamEntity;
 import org.apache.gobblin.writer.DataWriter;
 import org.apache.gobblin.writer.DataWriterBuilder;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.*;
 import io.reactivex.Flowable;
 import lombok.AllArgsConstructor;
-import static org.mockito.Matchers.*;
-import static org.mockito.Mockito.*;
 
 
 /**
@@ -109,6 +116,97 @@ public class TestRecordStream {
     Assert.assertEquals(writer.messages, Lists.newArrayList("flush called", "flush called"));
   }
 
+  /**
+   * Test of metadata update control messages that signal the converters to change schemas
+   * @throws Exception
+   */
+  @Test
+  public void testMetadataUpdateControlMessages() throws Exception {
+
+    MyExtractor extractor = new MyExtractor(new StreamEntity[]{new RecordEnvelope<>("a"),
+        new MetadataUpdateControlMessage<>(GlobalMetadata.<String>builder().schema("Schema1").build()), new RecordEnvelope<>("b"),
+            new MetadataUpdateControlMessage(GlobalMetadata.<String>builder().schema("Schema2").build())});
+    SchemaAppendConverter converter = new SchemaAppendConverter();
+    MyDataWriter writer = new MyDataWriter();
+
+    Task task = setupTask(extractor, writer, converter);
+
+    task.run();
+    task.commit();
+    Assert.assertEquals(task.getTaskState().getWorkingState(), WorkUnitState.WorkingState.SUCCESSFUL);
+
+    Assert.assertEquals(converter.records, Lists.newArrayList("a:schema", "b:Schema1"));
+    Assert.assertEquals(converter.messages,
+        Lists.newArrayList(new MetadataUpdateControlMessage<>(GlobalMetadata.<String>builder().schema("Schema1").build()),
+            new MetadataUpdateControlMessage<>(GlobalMetadata.<String>builder().schema("Schema2").build())));
+
+    Assert.assertEquals(writer.records, Lists.newArrayList("a:schema", "b:Schema1"));
+    Assert.assertEquals(writer.messages, Lists.newArrayList(new MetadataUpdateControlMessage<>(
+        GlobalMetadata.<String>builder().schema("Schema1").build()),
+        new MetadataUpdateControlMessage<>(GlobalMetadata.<String>builder().schema("Schema2").build())));
+  }
+
+  /**
+   * Test with the converter configured in the list of {@link RecordStreamProcessor}s.
+   * @throws Exception
+   */
+  @Test
+  public void testMetadataUpdateWithStreamProcessors() throws Exception {
+
+    MyExtractor extractor = new MyExtractor(new StreamEntity[]{new RecordEnvelope<>("a"),
+        new MetadataUpdateControlMessage<>(GlobalMetadata.<String>builder().schema("Schema1").build()), new RecordEnvelope<>("b"),
+        new MetadataUpdateControlMessage(GlobalMetadata.<String>builder().schema("Schema2").build())});
+    SchemaAppendConverter converter = new SchemaAppendConverter();
+    MyDataWriter writer = new MyDataWriter();
+
+    Task task = setupTask(extractor, writer, Collections.EMPTY_LIST, Lists.newArrayList(converter));
+
+    task.run();
+    task.commit();
+    Assert.assertEquals(task.getTaskState().getWorkingState(), WorkUnitState.WorkingState.SUCCESSFUL);
+
+    Assert.assertEquals(converter.records, Lists.newArrayList("a:schema", "b:Schema1"));
+    Assert.assertEquals(converter.messages,
+        Lists.newArrayList(new MetadataUpdateControlMessage<>(GlobalMetadata.<String>builder().schema("Schema1").build()),
+            new MetadataUpdateControlMessage<>(GlobalMetadata.<String>builder().schema("Schema2").build())));
+
+    Assert.assertEquals(writer.records, Lists.newArrayList("a:schema", "b:Schema1"));
+    Assert.assertEquals(writer.messages, Lists.newArrayList(new MetadataUpdateControlMessage<>(
+        GlobalMetadata.<String>builder().schema("Schema1").build()),
+        new MetadataUpdateControlMessage<>(GlobalMetadata.<String>builder().schema("Schema2").build())));
+  }
+
+  /**
+   * Test the injection of {@link ControlMessage}s
+   * @throws Exception
+   */
+  @Test
+  public void testInjectedControlMessages() throws Exception {
+
+    MyExtractor extractor = new MyExtractor(new StreamEntity[]{new RecordEnvelope<>("schema:a"),
+        new RecordEnvelope<>("schema:b"), new RecordEnvelope<>("schema1:c"), new RecordEnvelope<>("schema2:d")});
+    SchemaChangeDetectionInjector injector = new SchemaChangeDetectionInjector();
+    SchemaAppendConverter converter = new SchemaAppendConverter();
+    MyDataWriter writer = new MyDataWriter();
+
+    Task task = setupTask(extractor, writer, Collections.EMPTY_LIST,
+        Lists.newArrayList(injector, converter));
+
+    task.run();
+    task.commit();
+    Assert.assertEquals(task.getTaskState().getWorkingState(), WorkUnitState.WorkingState.SUCCESSFUL);
+
+    Assert.assertEquals(converter.records, Lists.newArrayList("a:schema", "b:schema", "c:schema1", "d:schema2"));
+    Assert.assertEquals(converter.messages,
+        Lists.newArrayList(new MetadataUpdateControlMessage<>(GlobalMetadata.<String>builder().schema("schema1").build()),
+            new MetadataUpdateControlMessage<>(GlobalMetadata.<String>builder().schema("schema2").build())));
+
+    Assert.assertEquals(writer.records, Lists.newArrayList("a:schema", "b:schema", "c:schema1", "d:schema2"));
+    Assert.assertEquals(writer.messages, Lists.newArrayList(new MetadataUpdateControlMessage<>(
+        GlobalMetadata.<String>builder().schema("schema1").build()),
+        new MetadataUpdateControlMessage<>(GlobalMetadata.<String>builder().schema("schema2").build())));
+  }
+
   @Test
   public void testAcks() throws Exception {
 
@@ -178,6 +276,11 @@ public class TestRecordStream {
   }
 
   private Task setupTask(Extractor extractor, DataWriterBuilder writer, Converter converter) throws Exception {
+    return setupTask(extractor, writer, Lists.newArrayList(converter), Collections.EMPTY_LIST);
+  }
+
+  private Task setupTask(Extractor extractor, DataWriterBuilder writer, List<Converter<?,?,?,?>> converters,
+      List<RecordStreamProcessor<?,?,?,?>> recordStreamProcessors) throws Exception {
     // Create a TaskState
     TaskState taskState = getEmptyTestTaskState("testRetryTaskId");
     taskState.setProp(ConfigurationKeys.TASK_SYNCHRONOUS_EXECUTION_MODEL_KEY, false);
@@ -186,7 +289,8 @@ public class TestRecordStream {
     when(mockTaskContext.getExtractor()).thenReturn(extractor);
     when(mockTaskContext.getForkOperator()).thenReturn(new IdentityForkOperator());
     when(mockTaskContext.getTaskState()).thenReturn(taskState);
-    when(mockTaskContext.getConverters()).thenReturn(Lists.newArrayList(converter));
+    when(mockTaskContext.getConverters()).thenReturn(converters);
+    when(mockTaskContext.getRecordStreamProcessors()).thenReturn(recordStreamProcessors);
     when(mockTaskContext.getTaskLevelPolicyChecker(any(TaskState.class), anyInt()))
         .thenReturn(mock(TaskLevelPolicyChecker.class));
     when(mockTaskContext.getRowLevelPolicyChecker()).
@@ -241,7 +345,8 @@ public class TestRecordStream {
 
     @Override
     public RecordStreamWithMetadata<String, String> recordStream(AtomicBoolean shutdownRequest) throws IOException {
-      return new RecordStreamWithMetadata<>(Flowable.fromArray(this.stream), "schema");
+      return new RecordStreamWithMetadata<>(Flowable.fromArray(this.stream),
+          GlobalMetadata.<String>builder().schema("schema").build());
     }
   }
 
@@ -306,6 +411,81 @@ public class TestRecordStream {
     }
   }
 
+
+  /**
+   * Converter that appends the output schema string to the record string
+   */
+  static class SchemaAppendConverter extends Converter<String, String, String, String> {
+    private List<String> records = new ArrayList<>();
+    private List<ControlMessage<String>> messages = new ArrayList<>();
+
+    @Override
+    public String convertSchema(String inputSchema, WorkUnitState workUnit) throws SchemaConversionException {
+      return inputSchema;
+    }
+
+    @Override
+    public Iterable<String> convertRecord(String outputSchema, String inputRecord, WorkUnitState workUnit)
+        throws DataConversionException {
+      String inputWithoutSchema = inputRecord.substring(inputRecord.indexOf(":") + 1);
+      String outputRecord = inputWithoutSchema + ":" + outputSchema;
+      records.add(outputRecord);
+      return Lists.newArrayList(outputRecord);
+    }
+
+    @Override
+    public ControlMessageHandler getMessageHandler() {
+      return messages::add;
+    }
+  }
+
+  /**
+   * Input to this {@link RecordStreamProcessor} is a string of the form "schema:value".
+   * It will inject a {@link MetadataUpdateControlMessage} when a schema change is detected.
+   */
+  static class SchemaChangeDetectionInjector extends ControlMessageInjector<String, String> {
+    private List<String> records = new ArrayList<>();
+    private List<ControlMessage<String>> messages = new ArrayList<>();
+    private GlobalMetadata<String> globalMetadata;
+
+    public Iterable<String> convertRecord(String outputSchema, String inputRecord, WorkUnitState workUnitState)
+        throws DataConversionException {
+
+      String outputRecord = inputRecord.split(":")[1];
+      records.add(outputRecord);
+      return Lists.newArrayList(outputRecord);
+    }
+
+    @Override
+    protected void setInputGlobalMetadata(GlobalMetadata<String> inputGlobalMetadata, WorkUnitState workUnitState) {
+      this.globalMetadata = inputGlobalMetadata;
+    }
+
+    @Override
+    public Iterable<ControlMessage<String>> injectControlMessagesBefore(RecordEnvelope<String> inputRecordEnvelope,
+        WorkUnitState workUnitState) {
+      String recordSchema = inputRecordEnvelope.getRecord().split(":")[0];
+
+      if (!recordSchema.equals(this.globalMetadata.getSchema())) {
+        return new SingleRecordIterable<>(new MetadataUpdateControlMessage<>(
+            GlobalMetadata.<String>builder().schema(recordSchema).build()));
+      }
+
+      return null;
+    }
+
+    @Override
+    public Iterable<ControlMessage<String>> injectControlMessagesAfter(RecordEnvelope<String> inputRecordEnvelope,
+        WorkUnitState workUnitState) {
+      return null;
+    }
+
+    @Override
+    public ControlMessageHandler getMessageHandler() {
+      return messages::add;
+    }
+  }
+
   static class MyFlushDataWriter extends DataWriterBuilder<String, String> implements DataWriter<String> {
     private List<String> records = new ArrayList<>();
     private List<String> messages = new ArrayList<>();


Mime
View raw message