gobblin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hut...@apache.org
Subject incubator-gobblin git commit: [GOBBLIN-319] Add DatasetResolver to transform raw Gobblin dataset to application specific dataset
Date Thu, 30 Nov 2017 23:27:15 GMT
Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 3a28721d8 -> 9fd80690d


[GOBBLIN-319] Add DatasetResolver to transform raw Gobblin dataset to application specific dataset

Closes #2171 from zxcware/lineage


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/9fd80690
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/9fd80690
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/9fd80690

Branch: refs/heads/master
Commit: 9fd80690db540c373321a2d434b64d8706d80d65
Parents: 3a28721
Author: zhchen <zhchen@linkedin.com>
Authored: Thu Nov 30 15:26:16 2017 -0800
Committer: Hung Tran <hutran@linkedin.com>
Committed: Thu Nov 30 15:26:29 2017 -0800

----------------------------------------------------------------------
 .../gobblin/dataset/DatasetConstants.java       |  5 +
 .../apache/gobblin/dataset/DatasetResolver.java | 35 +++++++
 .../gobblin/dataset/DatasetResolverFactory.java | 31 ++++++
 .../gobblin/dataset/NoopDatasetResolver.java    | 35 +++++++
 .../gobblin/publisher/BaseDataPublisher.java    | 28 ++++--
 .../publisher/BaseDataPublisherTest.java        | 52 ++++++++++
 .../data/management/copy/CopySource.java        | 17 +++-
 .../management/copy/CopyableDatasetBase.java    |  7 --
 .../data/management/copy/CopyableFile.java      |  7 +-
 .../copy/RecursiveCopyableDataset.java          | 27 ++++--
 .../copy/hive/HiveCopyEntityHelper.java         | 15 ++-
 .../data/management/copy/hive/HiveDataset.java  |  6 --
 .../copy/hive/HivePartitionFileSet.java         |  2 +-
 .../copy/hive/UnpartitionedTableFileSet.java    |  2 +-
 .../copy/publisher/CopyDataPublisher.java       |  2 +-
 .../metrics/event/lineage/LineageException.java | 32 -------
 .../metrics/event/lineage/LineageInfo.java      | 73 +++++++++------
 .../metrics/event/lineage/LineageEventTest.java | 99 +++++++++++---------
 .../gobblin/runtime/SafeDatasetCommit.java      | 17 ++--
 19 files changed, 335 insertions(+), 157 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9fd80690/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetConstants.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetConstants.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetConstants.java
index 73999dc..35bb09e 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetConstants.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetConstants.java
@@ -19,10 +19,15 @@ package org.apache.gobblin.dataset;
 
 public class DatasetConstants {
   /** Platforms */
+  public static final String PLATFORM_FILE = "file";
+  public static final String PLATFORM_HDFS = "hdfs";
   public static final String PLATFORM_KAFKA = "kafka";
   public static final String PLATFORM_HIVE = "hive";
   public static final String PLATFORM_MYSQL = "mysql";
 
+  /** Common metadata */
+  public static final String BRANCH = "branch";
+
   /** File system metadata */
   public static final String FS_URI = "fsUri";
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9fd80690/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetResolver.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetResolver.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetResolver.java
new file mode 100644
index 0000000..0e28169
--- /dev/null
+++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetResolver.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.dataset;
+
+import org.apache.gobblin.configuration.State;
+
+
+/**
+ * A {@link DatasetResolver} resolves job specific dataset
+ */
+public interface DatasetResolver {
+  /**
+   * Given raw Gobblin dataset, resolve job specific dataset
+   *
+   * @param raw a dataset in terms of Gobblin
+   * @param state configuration that helps resolve job specific dataset
+   * @return resolved dataset for the job
+   */
+  DatasetDescriptor resolve(DatasetDescriptor raw, State state);
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9fd80690/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetResolverFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetResolverFactory.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetResolverFactory.java
new file mode 100644
index 0000000..eb1b887
--- /dev/null
+++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetResolverFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.dataset;
+
+import org.apache.gobblin.configuration.State;
+
+
+/**
+ * A factory that creates an instance of {@link DatasetResolver}
+ */
+public interface DatasetResolverFactory {
+  String NAMESPACE = "DatasetResolverFactory";
+  String CLASS = NAMESPACE + "." + "class";
+
+  DatasetResolver createResolver(State state);
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9fd80690/gobblin-api/src/main/java/org/apache/gobblin/dataset/NoopDatasetResolver.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/NoopDatasetResolver.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/NoopDatasetResolver.java
new file mode 100644
index 0000000..c54011a
--- /dev/null
+++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/NoopDatasetResolver.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.dataset;
+
+import org.apache.gobblin.configuration.State;
+
+
+/**
+ * The default {@link DatasetResolver} that directly uses Gobblin raw dataset as job dataset
+ */
+public class NoopDatasetResolver implements DatasetResolver {
+  public static final NoopDatasetResolver INSTANCE = new NoopDatasetResolver();
+
+  private NoopDatasetResolver() {}
+
+  @Override
+  public DatasetDescriptor resolve(DatasetDescriptor raw, State state) {
+    return raw;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9fd80690/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java b/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
index 0097c15..89bab2b 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
@@ -61,9 +61,11 @@ import org.apache.gobblin.dataset.DatasetDescriptor;
 import org.apache.gobblin.metadata.MetadataMerger;
 import org.apache.gobblin.metadata.types.StaticStringMetadataMerger;
 import org.apache.gobblin.metrics.event.lineage.LineageInfo;
+import org.apache.gobblin.util.FileListUtils;
 import org.apache.gobblin.util.ForkOperatorUtils;
 import org.apache.gobblin.util.HadoopUtils;
 import org.apache.gobblin.util.ParallelRunner;
+import org.apache.gobblin.util.PathUtils;
 import org.apache.gobblin.util.WriterUtils;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 import org.apache.gobblin.writer.FsDataWriter;
@@ -280,6 +282,20 @@ public class BaseDataPublisher extends SingleTaskDataPublisher {
     }
   }
 
+  private void addLineageInfo(WorkUnitState state, int branchId) {
+    DatasetDescriptor destination = createDestinationDescriptor(state, branchId);
+    LineageInfo.putDestination(destination, branchId, state);
+  }
+
+  protected DatasetDescriptor createDestinationDescriptor(WorkUnitState state, int branchId) {
+    Path publisherOutputDir = getPublisherOutputDir(state, branchId);
+    FileSystem fs = this.publisherFileSystemByBranches.get(branchId);
+    DatasetDescriptor destination = new DatasetDescriptor(fs.getScheme(), publisherOutputDir.toString());
+    destination.addMetadata(DatasetConstants.FS_URI, fs.getUri().toString());
+    destination.addMetadata(DatasetConstants.BRANCH, String.valueOf(branchId));
+    return destination;
+  }
+
   @Override
   public void publishData(WorkUnitState state)
       throws IOException {
@@ -296,6 +312,7 @@ public class BaseDataPublisher extends SingleTaskDataPublisher {
   private void publishSingleTaskData(WorkUnitState state, int branchId)
       throws IOException {
     publishData(state, branchId, true, new HashSet<Path>());
+    addLineageInfo(state, branchId);
   }
 
   @Override
@@ -329,16 +346,7 @@ public class BaseDataPublisher extends SingleTaskDataPublisher {
   private void publishMultiTaskData(WorkUnitState state, int branchId, Set<Path> writerOutputPathsMoved)
       throws IOException {
     publishData(state, branchId, false, writerOutputPathsMoved);
-    DatasetDescriptor destination = createDestinationDescriptor(state, branchId);
-    LineageInfo.putDestination(destination, branchId, state);
-  }
-
-  protected DatasetDescriptor createDestinationDescriptor(WorkUnitState state, int branchId) {
-    Path publisherOutputDir = getPublisherOutputDir(state, branchId);
-    FileSystem fs = this.publisherFileSystemByBranches.get(branchId);
-    DatasetDescriptor destination = new DatasetDescriptor(fs.getScheme(), publisherOutputDir.toString());
-    destination.addMetadata(DatasetConstants.FS_URI, fs.getUri().toString());
-    return destination;
+    addLineageInfo(state, branchId);
   }
 
   protected void publishData(WorkUnitState state, int branchId, boolean publishSingleTaskData,

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9fd80690/gobblin-core/src/test/java/org/apache/gobblin/publisher/BaseDataPublisherTest.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/java/org/apache/gobblin/publisher/BaseDataPublisherTest.java b/gobblin-core/src/test/java/org/apache/gobblin/publisher/BaseDataPublisherTest.java
index 09bc0c8..159786b 100644
--- a/gobblin-core/src/test/java/org/apache/gobblin/publisher/BaseDataPublisherTest.java
+++ b/gobblin-core/src/test/java/org/apache/gobblin/publisher/BaseDataPublisherTest.java
@@ -28,10 +28,13 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -41,8 +44,10 @@ import com.google.common.io.Files;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.dataset.DatasetDescriptor;
 import org.apache.gobblin.metadata.MetadataMerger;
 import org.apache.gobblin.metadata.types.GlobalMetadata;
+import org.apache.gobblin.metrics.event.lineage.LineageInfo;
 import org.apache.gobblin.util.ForkOperatorUtils;
 import org.apache.gobblin.writer.FsDataWriter;
 import org.apache.gobblin.writer.FsWriterMetrics;
@@ -519,6 +524,34 @@ public class BaseDataPublisherTest {
     }
   }
 
+  @Test
+  public void testPublishSingleTask()
+      throws IOException {
+    WorkUnitState state = buildTaskState(1);
+    DatasetDescriptor source = new DatasetDescriptor("kafka", "testTopic");
+    LineageInfo.setSource(source, state);
+    BaseDataPublisher publisher = new BaseDataPublisher(state);
+    publisher.publishData(state);
+    Assert.assertTrue(state.contains("gobblin.event.lineage.branch.0.destination"));
+    Assert.assertFalse(state.contains("gobblin.event.lineage.branch.1.destination"));
+  }
+
+  @Test
+  public void testPublishMultiTasks()
+      throws IOException {
+    WorkUnitState state1 = buildTaskState(2);
+    WorkUnitState state2 = buildTaskState(2);
+    DatasetDescriptor source = new DatasetDescriptor("kafka", "testTopic");
+    LineageInfo.setSource(source, state1);
+    LineageInfo.setSource(source, state2);
+    BaseDataPublisher publisher = new BaseDataPublisher(state1);
+    publisher.publishData(ImmutableList.of(state1, state2));
+    Assert.assertTrue(state1.contains("gobblin.event.lineage.branch.0.destination"));
+    Assert.assertTrue(state1.contains("gobblin.event.lineage.branch.1.destination"));
+    Assert.assertTrue(state2.contains("gobblin.event.lineage.branch.0.destination"));
+    Assert.assertTrue(state2.contains("gobblin.event.lineage.branch.1.destination"));
+  }
+
   public static class TestAdditionMerger implements MetadataMerger<String> {
     private int sum = 0;
 
@@ -588,4 +621,23 @@ public class BaseDataPublisherTest {
 
     return state;
   }
+
+  private WorkUnitState buildTaskState(int numBranches) {
+    WorkUnitState state = new WorkUnitState();
+
+    state.setProp(ConfigurationKeys.EXTRACT_NAMESPACE_NAME_KEY, "namespace");
+    state.setProp(ConfigurationKeys.EXTRACT_TABLE_NAME_KEY, "table");
+    state.setProp(ConfigurationKeys.WRITER_FILE_PATH_TYPE, "namespace_table");
+    state.setProp(ConfigurationKeys.FORK_BRANCHES_KEY, numBranches);
+    state.setProp(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, "/data/output");
+    state.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, "/data/working");
+    if (numBranches > 1) {
+      for (int i = 0; i < numBranches; i++) {
+        state.setProp(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR + "." + i, "/data/output" + "/branch" + i);
+        state.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR + "." + i, "/data/working" + "/branch" + i);
+      }
+    }
+
+    return state;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9fd80690/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
index 8ca05e3..a74d425 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
@@ -65,7 +65,6 @@ import org.apache.gobblin.metrics.GobblinMetrics;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.metrics.event.EventSubmitter;
-import org.apache.gobblin.metrics.event.lineage.LineageEventBuilder;
 import org.apache.gobblin.metrics.event.lineage.LineageInfo;
 import org.apache.gobblin.metrics.event.sla.SlaEventKeys;
 import org.apache.gobblin.source.extractor.Extractor;
@@ -300,7 +299,7 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> {
           setWorkUnitWatermark(workUnit, watermarkGenerator, copyEntity);
           computeAndSetWorkUnitGuid(workUnit);
           workUnitsForPartition.add(workUnit);
-          addLineageInfo(copyEntity, copyableDataset, workUnit);
+          addLineageInfo(copyEntity, workUnit);
         }
 
         this.workUnitList.putAll(this.fileSet, workUnitsForPartition);
@@ -313,9 +312,17 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> {
     }
   }
 
-  private void addLineageInfo(CopyEntity copyEntity, CopyableDatasetBase copyableDataset, WorkUnit workUnit) {
-    if (copyEntity instanceof CopyableFile && copyableDataset.getDatasetDescriptor() != null) {
-      LineageInfo.setSource(copyableDataset.getDatasetDescriptor(), workUnit);
+  private void addLineageInfo(CopyEntity copyEntity, WorkUnit workUnit) {
+    if (copyEntity instanceof CopyableFile) {
+      CopyableFile copyableFile = (CopyableFile) copyEntity;
+      /*
+       * In Gobblin Distcp, the source and target path info of a CopyableFile are determined by its dataset found by
+       * a DatasetFinder. Consequently, the source and destination dataset for the CopyableFile lineage are expected
+       * to be set by the same logic
+       */
+      if (copyableFile.getSourceDataset() != null && copyableFile.getDestinationDataset() != null) {
+        LineageInfo.setSource(copyableFile.getSourceDataset(), workUnit);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9fd80690/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableDatasetBase.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableDatasetBase.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableDatasetBase.java
index 6c71edc..c27b839 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableDatasetBase.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableDatasetBase.java
@@ -18,7 +18,6 @@
 package org.apache.gobblin.data.management.copy;
 
 import org.apache.gobblin.dataset.Dataset;
-import org.apache.gobblin.dataset.DatasetDescriptor;
 
 
 /**
@@ -26,10 +25,4 @@ import org.apache.gobblin.dataset.DatasetDescriptor;
  * Concrete classes must implement a subinterface of this interface ({@link CopyableDataset} or {@link IterableCopyableDataset}).
  */
 public interface CopyableDatasetBase extends Dataset {
-  /**
-   * Get the descriptor which identifies and provides metadata of the dataset
-   */
-  default DatasetDescriptor getDatasetDescriptor() {
-    return null;
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9fd80690/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
index 04e5e34..843a7e3 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
@@ -52,12 +52,17 @@ import com.google.common.collect.Lists;
 @NoArgsConstructor(access = AccessLevel.PROTECTED)
 @EqualsAndHashCode(callSuper = true)
 public class CopyableFile extends CopyEntity implements File {
+  /**
+   * The source dataset the file belongs to. For now, since it's only used before copying, set it to be
+   * transient so that it won't be serialized, avoid unnecessary data transfer
+   */
+  private transient DatasetDescriptor sourceDataset;
 
   /** {@link FileStatus} of the existing origin file. */
   private FileStatus origin;
 
   /** The destination dataset the file will be copied to */
-  private DatasetDescriptor destDataset;
+  private DatasetDescriptor destinationDataset;
 
   /** Complete destination {@link Path} of the file. */
   private Path destination;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9fd80690/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
index 35108df..138debe 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.data.management.copy;
 import org.apache.gobblin.commit.CommitStep;
 import org.apache.gobblin.data.management.copy.entities.PrePublishStep;
 import org.apache.gobblin.data.management.dataset.DatasetUtils;
+import org.apache.gobblin.dataset.DatasetConstants;
 import org.apache.gobblin.dataset.FileSystemDataset;
 import org.apache.gobblin.dataset.DatasetDescriptor;
 import org.apache.gobblin.util.PathUtils;
@@ -43,8 +44,6 @@ import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
-import lombok.Getter;
-
 
 /**
  * Implementation of {@link CopyableDataset} that creates a {@link CopyableFile} for every file that is a descendant if
@@ -69,9 +68,6 @@ public class RecursiveCopyableDataset implements CopyableDataset, FileSystemData
   private final boolean update;
   private final boolean delete;
 
-  @Getter
-  private transient final DatasetDescriptor datasetDescriptor;
-
   // Include empty directories in the source for copy
   private final boolean includeEmptyDirectories;
   // Delete empty directories in the destination
@@ -83,7 +79,6 @@ public class RecursiveCopyableDataset implements CopyableDataset, FileSystemData
 
     this.rootPath = PathUtils.getPathWithoutSchemeAndAuthority(rootPath);
     this.fs = fs;
-    this.datasetDescriptor = new DatasetDescriptor(fs.getScheme(), rootPath.toString());
 
     this.pathFilter = DatasetUtils.instantiatePathFilter(properties);
     this.copyableFileFilter = DatasetUtils.instantiateCopyableFileFilter(properties);
@@ -136,7 +131,6 @@ public class RecursiveCopyableDataset implements CopyableDataset, FileSystemData
 
     List<CopyEntity> copyEntities = Lists.newArrayList();
     List<CopyableFile> copyableFiles = Lists.newArrayList();
-    DatasetDescriptor targetDataset = new DatasetDescriptor(targetFs.getScheme(), targetPath.toString());
 
     for (Path path : toCopy) {
       FileStatus file = filesInSource.get(path);
@@ -147,7 +141,24 @@ public class RecursiveCopyableDataset implements CopyableDataset, FileSystemData
           .ancestorsOwnerAndPermission(CopyableFile.resolveReplicatedOwnerAndPermissionsRecursively(this.fs,
               file.getPath().getParent(), nonGlobSearchPath, configuration))
           .build();
-      copyableFile.setDestDataset(targetDataset);
+
+      /*
+       * By default, the raw Gobblin dataset for CopyableFile lineage is its parent folder
+       * if itself is not a folder
+       */
+      boolean isDir = file.isDirectory();
+
+      Path fullSourcePath = Path.getPathWithoutSchemeAndAuthority(file.getPath());
+      String sourceDataset = isDir ? fullSourcePath.toString() : fullSourcePath.getParent().toString();
+      DatasetDescriptor source = new DatasetDescriptor(this.fs.getScheme(), sourceDataset);
+      source.addMetadata(DatasetConstants.FS_URI, this.fs.getUri().toString());
+      copyableFile.setSourceDataset(source);
+
+      String destinationDataset = isDir ? thisTargetPath.toString() : thisTargetPath.getParent().toString();
+      DatasetDescriptor destination = new DatasetDescriptor(targetFs.getScheme(), destinationDataset);
+      destination.addMetadata(DatasetConstants.FS_URI, targetFs.getUri().toString());
+      copyableFile.setDestinationDataset(destination);
+
       copyableFiles.add(copyableFile);
     }
     copyEntities.addAll(this.copyableFileFilter.filter(this.fs, targetFs, copyableFiles));

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9fd80690/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
index cc7be1e..2580775 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
@@ -763,9 +763,18 @@ public class HiveCopyEntityHelper {
     return this.targetFs;
   }
 
-  void setCopyableFileDestinationDataset(CopyableFile copyableFile) {
-    DatasetDescriptor destination = new DatasetDescriptor(DatasetConstants.PLATFORM_HIVE, this.getTargetDatabase() + "." + this.getTargetTable());
+  /**
+   * Set the source and destination datasets of a copyable file
+   */
+  void setCopyableFileDatasets(CopyableFile copyableFile) {
+    String sourceTable = dataset.getTable().getDbName() + "." + dataset.getTable().getTableName();
+    DatasetDescriptor source = new DatasetDescriptor(DatasetConstants.PLATFORM_HIVE, sourceTable);
+    source.addMetadata(DatasetConstants.FS_URI, dataset.getFs().getUri().toString());
+    copyableFile.setSourceDataset(source);
+
+    String destinationTable = this.getTargetDatabase() + "." + this.getTargetTable();
+    DatasetDescriptor destination = new DatasetDescriptor(DatasetConstants.PLATFORM_HIVE, destinationTable);
     destination.addMetadata(DatasetConstants.FS_URI, this.getTargetFs().getUri().toString());
-    copyableFile.setDestDataset(destination);
+    copyableFile.setDestinationDataset(destination);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9fd80690/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDataset.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDataset.java
index 03dba25..26c7d7e 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDataset.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDataset.java
@@ -57,12 +57,10 @@ import org.apache.gobblin.data.management.copy.CopyableDataset;
 import org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder.DbAndTable;
 import org.apache.gobblin.data.management.copy.prioritization.PrioritizedCopyableDataset;
 import org.apache.gobblin.data.management.partition.FileSet;
-import org.apache.gobblin.dataset.DatasetConstants;
 import org.apache.gobblin.hive.HiveMetastoreClientPool;
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.Tag;
-import org.apache.gobblin.dataset.DatasetDescriptor;
 import org.apache.gobblin.util.AutoReturnableObject;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.PathUtils;
@@ -108,8 +106,6 @@ public class HiveDataset implements PrioritizedCopyableDataset {
   protected final DbAndTable dbAndTable;
   protected final DbAndTable logicalDbAndTable;
 
-  private transient final DatasetDescriptor datasetDescriptor;
-
   public HiveDataset(FileSystem fs, HiveMetastoreClientPool clientPool, Table table, Properties properties) {
     this(fs, clientPool, table, properties, ConfigFactory.empty());
   }
@@ -128,8 +124,6 @@ public class HiveDataset implements PrioritizedCopyableDataset {
         Optional.fromNullable(this.table.getDataLocation());
 
     this.tableIdentifier = this.table.getDbName() + "." + this.table.getTableName();
-    this.datasetDescriptor = new DatasetDescriptor(DatasetConstants.PLATFORM_HIVE, tableIdentifier);
-    this.datasetDescriptor.addMetadata(DatasetConstants.FS_URI, fs.getUri().toString());
 
     this.datasetNamePattern = Optional.fromNullable(ConfigUtils.getString(datasetConfig, DATASET_NAME_PATTERN_KEY, null));
     this.dbAndTable = new DbAndTable(table.getDbName(), table.getTableName());

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9fd80690/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java
index 790c0b4..34b6933 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java
@@ -152,7 +152,7 @@ public class HivePartitionFileSet extends HiveFileSet {
         CopyableFile fileEntity =
             builder.fileSet(fileSet).checksum(new byte[0]).datasetOutputPath(desiredTargetLocation.location.toString())
                 .build();
-        this.hiveCopyEntityHelper.setCopyableFileDestinationDataset(fileEntity);
+        this.hiveCopyEntityHelper.setCopyableFileDatasets(fileEntity);
         copyEntities.add(fileEntity);
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9fd80690/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSet.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSet.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSet.java
index 4d82a62..21813fb 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSet.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSet.java
@@ -122,7 +122,7 @@ public class UnpartitionedTableFileSet extends HiveFileSet {
         Optional.<Partition> absent())) {
       CopyableFile fileEntity =
           builder.fileSet(fileSet).datasetOutputPath(desiredTargetLocation.location.toString()).build();
-      this.helper.setCopyableFileDestinationDataset(fileEntity);
+      this.helper.setCopyableFileDatasets(fileEntity);
       copyEntities.add(fileEntity);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9fd80690/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
index 71ebd59..e1ccf65 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
@@ -214,7 +214,7 @@ public class CopyDataPublisher extends DataPublisher implements UnpublishedHandl
           if (!fileSetRoot.isPresent() && copyableFile.getDatasetOutputPath() != null) {
             fileSetRoot = Optional.of(copyableFile.getDatasetOutputPath());
           }
-          LineageInfo.putDestination(copyableFile.getDestDataset(), 0, wus);
+          LineageInfo.putDestination(copyableFile.getDestinationDataset(), 0, wus);
         }
         if (datasetOriginTimestamp > copyableFile.getOriginTimestamp()) {
           datasetOriginTimestamp = copyableFile.getOriginTimestamp();

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9fd80690/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageException.java
----------------------------------------------------------------------
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageException.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageException.java
deleted file mode 100644
index e7528b3..0000000
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageException.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.metrics.event.lineage;
-
-/**
- * A set of exceptions used by {@link LineageEventBuilder} when lineage information is serialized or deserialized.
- */
-public class LineageException extends Exception {
-  public LineageException(String message) {
-    super(message);
-  }
-  public static class ConflictException extends LineageException {
-    public ConflictException(String branchId, LineageEventBuilder actual, LineageEventBuilder expect) {
-      super("Conflict LineageEvent: branchId=" + branchId + ", expected=" + expect.toString() + " actual=" + actual.toString());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9fd80690/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java
----------------------------------------------------------------------
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java
index 80b9dec..9a3cc11 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java
@@ -19,16 +19,21 @@ package org.apache.gobblin.metrics.event.lineage;
 
 import java.util.Collection;
 import java.util.Map;
+import java.util.Set;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import com.google.gson.Gson;
 
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.dataset.DatasetResolver;
+import org.apache.gobblin.dataset.DatasetResolverFactory;
+import org.apache.gobblin.dataset.NoopDatasetResolver;
 
 
 /**
@@ -57,8 +62,7 @@ import org.apache.gobblin.dataset.DatasetDescriptor;
  */
 @Slf4j
 public final class LineageInfo {
-  public static final String BRANCH = "branch";
-
+  private static final String BRANCH = "branch";
   private static final Gson GSON = new Gson();
   private static final String NAME_KEY = "name";
 
@@ -77,8 +81,14 @@ public final class LineageInfo {
    *
    */
   public static void setSource(DatasetDescriptor source, State state) {
-    state.setProp(getKey(NAME_KEY), source.getName());
-    state.setProp(getKey(LineageEventBuilder.SOURCE), GSON.toJson(source));
+    DatasetResolver resolver = getResolver(state);
+    DatasetDescriptor descriptor = resolver.resolve(source, state);
+    if (descriptor == null) {
+      return;
+    }
+
+    state.setProp(getKey(NAME_KEY), descriptor.getName());
+    state.setProp(getKey(LineageEventBuilder.SOURCE), GSON.toJson(descriptor));
   }
 
   /**
@@ -95,42 +105,32 @@ public final class LineageInfo {
       log.warn("State has no lineage info but branch " + branchId + " puts a destination: " + GSON.toJson(destination));
       return;
     }
-
+    log.debug(String.format("Put destination %s for branch %d", GSON.toJson(destination), branchId));
     synchronized (state.getProp(getKey(NAME_KEY))) {
-      state.setProp(getKey(BRANCH, branchId, LineageEventBuilder.DESTINATION), GSON.toJson(destination));
+      DatasetResolver resolver = getResolver(state);
+      DatasetDescriptor descriptor = resolver.resolve(destination, state);
+      if (descriptor == null) {
+        return;
+      }
+
+      state.setProp(getKey(BRANCH, branchId, LineageEventBuilder.DESTINATION), GSON.toJson(descriptor));
     }
   }
 
   /**
    * Load all lineage information from {@link State}s of a dataset
    *
-   * <p>
-   *   For a dataset, the same branch across different {@link State}s must be the same, as
-   *   the same branch means the same destination
-   * </p>
-   *
    * @param states All states which belong to the same dataset
    * @return A collection of {@link LineageEventBuilder}s put in the state
-   * @throws LineageException.ConflictException if two states have conflict lineage info
    */
-  public static Collection<LineageEventBuilder> load(Collection<? extends State> states)
-      throws LineageException {
+  public static Collection<LineageEventBuilder> load(Collection<? extends State> states) {
     Preconditions.checkArgument(states != null && !states.isEmpty());
-    final Map<String, LineageEventBuilder> resultEvents = Maps.newHashMap();
+    Set<LineageEventBuilder> allEvents = Sets.newHashSet();
     for (State state : states) {
       Map<String, LineageEventBuilder> branchedEvents = load(state);
-      for (Map.Entry<String, LineageEventBuilder> entry : branchedEvents.entrySet()) {
-        String branch = entry.getKey();
-        LineageEventBuilder event = entry.getValue();
-        LineageEventBuilder resultEvent = resultEvents.get(branch);
-        if (resultEvent == null) {
-          resultEvents.put(branch, event);
-        } else if (!resultEvent.equals(event)) {
-          throw new LineageException.ConflictException(branch, event, resultEvent);
-        }
-      }
+      allEvents.addAll(branchedEvents.values());
     }
-    return resultEvents.values();
+    return allEvents;
   }
 
   /**
@@ -162,7 +162,6 @@ public final class LineageInfo {
       switch (parts[1]) {
         case LineageEventBuilder.DESTINATION:
           DatasetDescriptor destination = GSON.fromJson(entry.getValue().toString(), DatasetDescriptor.class);
-          destination.addMetadata(BRANCH, branchId);
           event.setDestination(destination);
           break;
         default:
@@ -195,6 +194,26 @@ public final class LineageInfo {
   }
 
   /**
+   * Get the configured {@link DatasetResolver} from {@link State}
+   */
+  public static DatasetResolver getResolver(State state) {
+    String resolverFactory = state.getProp(DatasetResolverFactory.CLASS);
+    if (resolverFactory == null) {
+      return NoopDatasetResolver.INSTANCE;
+    }
+
+    DatasetResolver resolver = NoopDatasetResolver.INSTANCE;
+    try {
+      DatasetResolverFactory factory = (DatasetResolverFactory) Class.forName(resolverFactory).newInstance();
+      resolver = factory.createResolver(state);
+    } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
+      log.error(String.format("Fail to create a DatasetResolver with factory class %s", resolverFactory));
+    }
+
+    return resolver;
+  }
+
+  /**
    * Prefix all keys with {@link LineageEventBuilder#LIENAGE_EVENT_NAMESPACE}
    */
   private static String getKey(Object... objects) {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9fd80690/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java
----------------------------------------------------------------------
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java
index 4e711b9..7388de6 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.dataset.DatasetConstants;
 import org.apache.gobblin.dataset.DatasetDescriptor;
 import org.apache.gobblin.metrics.event.GobblinEventBuilder;
 
@@ -31,21 +32,31 @@ import org.testng.annotations.Test;
 import com.google.common.collect.Lists;
 
 
+/**
+ * Test for loading linage events from state
+ */
 public class LineageEventTest {
   @Test
   public void testEvent() {
     final String topic = "testTopic";
+    final String kafka = "kafka";
+    final String hdfs = "hdfs";
+    final String mysql = "mysql";
+    final String branch = "branch";
+
     State state0 = new State();
-    DatasetDescriptor source = new DatasetDescriptor("kafka", topic);
+    DatasetDescriptor source = new DatasetDescriptor(kafka, topic);
     LineageInfo.setSource(source, state0);
-    DatasetDescriptor destination0 = new DatasetDescriptor("hdfs", "/data/dbchanges");
-    LineageInfo.putDestination(destination0, 0, state0);
-    DatasetDescriptor destination1 = new DatasetDescriptor("mysql", "kafka.testTopic");
-    LineageInfo.putDestination(destination1, 1, state0);
+    DatasetDescriptor destination00 = new DatasetDescriptor(hdfs, "/data/dbchanges");
+    destination00.addMetadata(branch, "0");
+    LineageInfo.putDestination(destination00, 0, state0);
+    DatasetDescriptor destination01 = new DatasetDescriptor(mysql, "kafka.testTopic");
+    destination01.addMetadata(branch, "1");
+    LineageInfo.putDestination(destination01, 1, state0);
 
     Map<String, LineageEventBuilder> events = LineageInfo.load(state0);
-    verify(events.get("0"), topic, source, destination0, 0);
-    verify(events.get("1"), topic, source, destination1, 1);
+    verify(events.get("0"), topic, source, destination00);
+    verify(events.get("1"), topic, source, destination01);
 
     State state1 = new State();
     LineageInfo.setSource(source, state1);
@@ -54,60 +65,56 @@ public class LineageEventTest {
     states.add(state1);
 
     // Test only full fledged lineage events are loaded
-    try {
-      Collection<LineageEventBuilder> eventsList = LineageInfo.load(states);
-      Assert.assertTrue(eventsList.size() == 2);
-      Assert.assertEquals(getLineageEvent(eventsList, 0), events.get("0"));
-      Assert.assertEquals(getLineageEvent(eventsList, 1), events.get("1"));
-    } catch (LineageException e) {
-      Assert.fail("Unexpected exception");
-    }
+    Collection<LineageEventBuilder> eventsList = LineageInfo.load(states);
+    Assert.assertTrue(eventsList.size() == 2);
+    Assert.assertEquals(getLineageEvent(eventsList, 0, hdfs), events.get("0"));
+    Assert.assertEquals(getLineageEvent(eventsList, 1, mysql), events.get("1"));
 
     // There are 3 full fledged lineage events
-    DatasetDescriptor destination2 = new DatasetDescriptor("mysql", "kafka.testTopic2");
-    LineageInfo.putDestination(destination2, 2, state1);
-    try {
-      Collection<LineageEventBuilder> eventsList = LineageInfo.load(states);
-      Assert.assertTrue(eventsList.size() == 3);
-      Assert.assertEquals(getLineageEvent(eventsList, 0), events.get("0"));
-      Assert.assertEquals(getLineageEvent(eventsList, 1), events.get("1"));
-      verify(getLineageEvent(eventsList, 2), topic, source, destination2, 2);
-    } catch (LineageException e) {
-      Assert.fail("Unexpected exception");
+    DatasetDescriptor destination12 = new DatasetDescriptor(mysql, "kafka.testTopic2");
+    destination12.addMetadata(branch, "2");
+    LineageInfo.putDestination(destination12, 2, state1);
+    eventsList = LineageInfo.load(states);
+    Assert.assertTrue(eventsList.size() == 3);
+    Assert.assertEquals(getLineageEvent(eventsList, 0, hdfs), events.get("0"));
+    Assert.assertEquals(getLineageEvent(eventsList, 1, mysql), events.get("1"));
+    verify(getLineageEvent(eventsList, 2, mysql), topic, source, destination12);
+
+
+    // There 5 lineage events put, but only 4 unique lineage events
+    DatasetDescriptor destination10 = destination12;
+    LineageInfo.putDestination(destination10, 0, state1);
+    DatasetDescriptor destination11 = new DatasetDescriptor("hive", "kafka.testTopic1");
+    destination11.addMetadata(branch, "1");
+    LineageInfo.putDestination(destination11, 1, state1);
+    eventsList = LineageInfo.load(states);
+    Assert.assertTrue(eventsList.size() == 4);
+    Assert.assertEquals(getLineageEvent(eventsList, 0, hdfs), events.get("0"));
+    Assert.assertEquals(getLineageEvent(eventsList, 1, mysql), events.get("1"));
+    // Either branch 0 or 2 of state 1 is selected
+    LineageEventBuilder event12 = getLineageEvent(eventsList, 0, mysql);
+    if (event12 == null) {
+      event12 = getLineageEvent(eventsList, 2, mysql);
     }
-
-    // Throw conflict exception when there is a conflict on a branch between 2 states
-    LineageInfo.putDestination(destination2, 0, state1);
-    boolean hasLineageException = false;
-    try {
-      Collection<LineageEventBuilder> eventsList = LineageInfo.load(states);
-    } catch (LineageException e) {
-      Assert.assertTrue(e instanceof LineageException.ConflictException);
-      hasLineageException = true;
-    }
-    Assert.assertTrue(hasLineageException);
+    verify(event12, topic, source, destination12);
+    verify(getLineageEvent(eventsList, 1, "hive"), topic, source, destination11);
   }
 
-  private LineageEventBuilder getLineageEvent(Collection<LineageEventBuilder> events, int branchId) {
+  private LineageEventBuilder getLineageEvent(Collection<LineageEventBuilder> events, int branchId, String destinationPlatform) {
     for (LineageEventBuilder event : events) {
-      if (event.getDestination().getMetadata().get(LineageInfo.BRANCH).equals(String.valueOf(branchId))) {
+      if (event.getDestination().getPlatform().equals(destinationPlatform) &&
+          event.getDestination().getMetadata().get(DatasetConstants.BRANCH).equals(String.valueOf(branchId))) {
         return event;
       }
     }
     return null;
   }
 
-  private void verify(LineageEventBuilder event, String name, DatasetDescriptor source, DatasetDescriptor destination, int branchId) {
+  private void verify(LineageEventBuilder event, String name, DatasetDescriptor source, DatasetDescriptor destination) {
     Assert.assertEquals(event.getName(), name);
     Assert.assertEquals(event.getNamespace(), LineageEventBuilder.LIENAGE_EVENT_NAMESPACE);
     Assert.assertEquals(event.getMetadata().get(GobblinEventBuilder.EVENT_TYPE), LineageEventBuilder.LINEAGE_EVENT_TYPE);
     Assert.assertTrue(event.getSource().equals(source));
-
-    DatasetDescriptor updatedDestination = new DatasetDescriptor(destination);
-    updatedDestination.addMetadata(LineageInfo.BRANCH, String.valueOf(branchId));
-    Assert.assertTrue(event.getDestination().equals(updatedDestination));
-
-    // It only has eventType info
-    Assert.assertTrue(event.getMetadata().size() == 1);
+    Assert.assertTrue(event.getDestination().equals(destination));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9fd80690/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
index 7ff9bb1..43e5c59 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
@@ -38,7 +38,6 @@ import org.apache.gobblin.commit.DeliverySemantics;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.instrumented.Instrumented;
-import org.apache.gobblin.metrics.event.lineage.LineageException;
 import org.apache.gobblin.metrics.event.lineage.LineageEventBuilder;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.event.FailureEventBuilder;
@@ -210,20 +209,19 @@ final class SafeDatasetCommit implements Callable<Void> {
       }
     }
     if (states.size() == 0) {
+      log.info("Will not submit lineage events as no state contains lineage info");
       return;
     }
 
     try {
       if (StringUtils.isEmpty(datasetUrn)) {
         // This dataset may contain different kinds of LineageEvent
-        for (Collection<TaskState> collection : aggregateByLineageEvent(states)) {
-          submitLineageEvent(collection);
+        for (Map.Entry<String, Collection<TaskState>> entry : aggregateByLineageEvent(states).entrySet()) {
+          submitLineageEvent(entry.getKey(), entry.getValue());
         }
       } else {
-        submitLineageEvent(states);
+        submitLineageEvent(datasetUrn, states);
       }
-    } catch (LineageException e) {
-      log.error("Lineage event submission failed due to :" + e.toString());
     } finally {
       // Purge lineage info from all states
       for (TaskState taskState : allStates) {
@@ -232,10 +230,11 @@ final class SafeDatasetCommit implements Callable<Void> {
     }
   }
 
-  private void submitLineageEvent(Collection<TaskState> states) throws LineageException {
+  private void submitLineageEvent(String dataset, Collection<TaskState> states) {
     Collection<LineageEventBuilder> events = LineageInfo.load(states);
     // Send events
     events.forEach(event -> event.submit(metricContext));
+    log.info(String.format("Submitted %d lineage events for dataset %s", events.size(), dataset));
   }
 
   /**
@@ -425,7 +424,7 @@ final class SafeDatasetCommit implements Callable<Void> {
         .withDatasetState(datasetState).build());
   }
 
-  private static Collection<Collection<TaskState>> aggregateByLineageEvent(Collection<TaskState> states) {
+  private static Map<String, Collection<TaskState>> aggregateByLineageEvent(Collection<TaskState> states) {
     Map<String, Collection<TaskState>> statesByEvents = Maps.newHashMap();
     for (TaskState state : states) {
       String eventName = LineageInfo.getFullEventName(state);
@@ -433,6 +432,6 @@ final class SafeDatasetCommit implements Callable<Void> {
       statesForEvent.add(state);
     }
 
-    return statesByEvents.values();
+    return statesByEvents;
   }
 }


Mime
View raw message