gobblin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hut...@apache.org
Subject [1/2] incubator-gobblin git commit: [GOBBLIN-307] Implement lineage event as LineageEventBuilder in gobblin
Date Mon, 13 Nov 2017 22:15:57 GMT
Repository: incubator-gobblin
Updated Branches:
  refs/heads/master a34a81a42 -> 3e229db98


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java
----------------------------------------------------------------------
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java
b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java
new file mode 100644
index 0000000..4e711b9
--- /dev/null
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.event.lineage;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.metrics.event.GobblinEventBuilder;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Lists;
+
+
+public class LineageEventTest {
+  @Test
+  public void testEvent() {
+    final String topic = "testTopic";
+    State state0 = new State();
+    DatasetDescriptor source = new DatasetDescriptor("kafka", topic);
+    LineageInfo.setSource(source, state0);
+    DatasetDescriptor destination0 = new DatasetDescriptor("hdfs", "/data/dbchanges");
+    LineageInfo.putDestination(destination0, 0, state0);
+    DatasetDescriptor destination1 = new DatasetDescriptor("mysql", "kafka.testTopic");
+    LineageInfo.putDestination(destination1, 1, state0);
+
+    Map<String, LineageEventBuilder> events = LineageInfo.load(state0);
+    verify(events.get("0"), topic, source, destination0, 0);
+    verify(events.get("1"), topic, source, destination1, 1);
+
+    State state1 = new State();
+    LineageInfo.setSource(source, state1);
+    List<State> states = Lists.newArrayList();
+    states.add(state0);
+    states.add(state1);
+
+    // Test only full fledged lineage events are loaded
+    try {
+      Collection<LineageEventBuilder> eventsList = LineageInfo.load(states);
+      Assert.assertTrue(eventsList.size() == 2);
+      Assert.assertEquals(getLineageEvent(eventsList, 0), events.get("0"));
+      Assert.assertEquals(getLineageEvent(eventsList, 1), events.get("1"));
+    } catch (LineageException e) {
+      Assert.fail("Unexpected exception");
+    }
+
+    // There are 3 full fledged lineage events
+    DatasetDescriptor destination2 = new DatasetDescriptor("mysql", "kafka.testTopic2");
+    LineageInfo.putDestination(destination2, 2, state1);
+    try {
+      Collection<LineageEventBuilder> eventsList = LineageInfo.load(states);
+      Assert.assertTrue(eventsList.size() == 3);
+      Assert.assertEquals(getLineageEvent(eventsList, 0), events.get("0"));
+      Assert.assertEquals(getLineageEvent(eventsList, 1), events.get("1"));
+      verify(getLineageEvent(eventsList, 2), topic, source, destination2, 2);
+    } catch (LineageException e) {
+      Assert.fail("Unexpected exception");
+    }
+
+    // Throw conflict exception when there is a conflict on a branch between 2 states
+    LineageInfo.putDestination(destination2, 0, state1);
+    boolean hasLineageException = false;
+    try {
+      Collection<LineageEventBuilder> eventsList = LineageInfo.load(states);
+    } catch (LineageException e) {
+      Assert.assertTrue(e instanceof LineageException.ConflictException);
+      hasLineageException = true;
+    }
+    Assert.assertTrue(hasLineageException);
+  }
+
+  private LineageEventBuilder getLineageEvent(Collection<LineageEventBuilder> events,
int branchId) {
+    for (LineageEventBuilder event : events) {
+      if (event.getDestination().getMetadata().get(LineageInfo.BRANCH).equals(String.valueOf(branchId)))
{
+        return event;
+      }
+    }
+    return null;
+  }
+
+  private void verify(LineageEventBuilder event, String name, DatasetDescriptor source, DatasetDescriptor
destination, int branchId) {
+    Assert.assertEquals(event.getName(), name);
+    Assert.assertEquals(event.getNamespace(), LineageEventBuilder.LIENAGE_EVENT_NAMESPACE);
+    Assert.assertEquals(event.getMetadata().get(GobblinEventBuilder.EVENT_TYPE), LineageEventBuilder.LINEAGE_EVENT_TYPE);
+    Assert.assertTrue(event.getSource().equals(source));
+
+    DatasetDescriptor updatedDestination = new DatasetDescriptor(destination);
+    updatedDestination.addMetadata(LineageInfo.BRANCH, String.valueOf(branchId));
+    Assert.assertTrue(event.getDestination().equals(updatedDestination));
+
+    // It only has eventType info
+    Assert.assertTrue(event.getMetadata().size() == 1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
index 45223fb..8cd25c2 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
@@ -30,7 +30,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Pattern;
 
-import org.apache.gobblin.lineage.LineageInfo;
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.metrics.event.lineage.LineageEventBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,6 +55,7 @@ import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
 import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient.GobblinKafkaConsumerClientFactory;
+import org.apache.gobblin.metrics.event.lineage.LineageInfo;
 import org.apache.gobblin.source.extractor.extract.EventBasedSource;
 import org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker;
 import org.apache.gobblin.source.extractor.limiter.LimiterConfigurationKeys;
@@ -549,8 +552,9 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S,
D> {
     workUnit.setProp(ConfigurationKeys.WORK_UNIT_HIGH_WATER_MARK_KEY, offsets.getLatestOffset());
 
     // Add lineage info
-    workUnit.setProp(LineageInfo.LINEAGE_DATASET_URN, partition.getTopicName());
-    LineageInfo.setDatasetLineageAttribute(workUnit, ConfigurationKeys.KAFKA_BROKERS, kafkaBrokers);
+    DatasetDescriptor source = new DatasetDescriptor(DatasetConstants.PLATFORM_KAFKA, partition.getTopicName());
+    source.addMetadata(DatasetConstants.BROKERS, kafkaBrokers);
+    LineageInfo.setSource(source, workUnit);
 
     LOG.info(String.format("Created workunit for partition %s: lowWatermark=%d, highWatermark=%d,
range=%d", partition,
         offsets.getStartOffset(), offsets.getLatestOffset(), offsets.getLatestOffset() -
offsets.getStartOffset()));

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/extractor/extract/jdbc/MysqlSource.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/extractor/extract/jdbc/MysqlSource.java
b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/extractor/extract/jdbc/MysqlSource.java
index 57fdedd..e2292c7 100644
--- a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/extractor/extract/jdbc/MysqlSource.java
+++ b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/extractor/extract/jdbc/MysqlSource.java
@@ -19,12 +19,13 @@ package org.apache.gobblin.source.extractor.extract.jdbc;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.SourceState;
-import org.apache.gobblin.lineage.LineageInfo;
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.metrics.event.lineage.LineageInfo;
 import org.apache.gobblin.source.extractor.Extractor;
 import org.apache.gobblin.source.extractor.exception.ExtractPrepareException;
 import java.io.IOException;
 
-import org.apache.gobblin.source.workunit.WorkUnit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,6 +34,7 @@ import com.google.gson.JsonElement;
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.source.extractor.extract.QueryBasedSource;
 import org.apache.gobblin.source.jdbc.MysqlExtractor;
+import org.apache.gobblin.source.workunit.WorkUnit;
 
 
 /**
@@ -55,12 +57,14 @@ public class MysqlSource extends QueryBasedSource<JsonArray, JsonElement>
{
     return extractor;
   }
 
-  protected void addLineageSourceInfo (SourceState sourceState, SourceEntity entity, WorkUnit
workUnit) {
-    super.addLineageSourceInfo(sourceState, entity, workUnit);
+  protected void addLineageSourceInfo(SourceState sourceState, SourceEntity entity, WorkUnit
workUnit) {
     String host = sourceState.getProp(ConfigurationKeys.SOURCE_CONN_HOST_NAME);
     String port = sourceState.getProp(ConfigurationKeys.SOURCE_CONN_PORT);
     String database = sourceState.getProp(ConfigurationKeys.SOURCE_QUERYBASED_SCHEMA);
     String connectionUrl = "jdbc:mysql://" + host.trim() + ":" + port + "/" + database.trim();
-    LineageInfo.setDatasetLineageAttribute(workUnit, "connectionUrl", connectionUrl);
+    DatasetDescriptor source =
+        new DatasetDescriptor(DatasetConstants.PLATFORM_MYSQL, database + "." + entity.getSourceEntityName());
+    source.addMetadata(DatasetConstants.CONNECTION_URL, connectionUrl);
+    LineageInfo.setSource(source, workUnit);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
index d6a1b58..7ff9bb1 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
@@ -22,24 +22,27 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.Callable;
 
+import org.apache.commons.lang.StringUtils;
+
 import com.google.common.base.Optional;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.io.Closer;
 
 import org.apache.gobblin.commit.CommitSequence;
 import org.apache.gobblin.commit.CommitStep;
 import org.apache.gobblin.commit.DeliverySemantics;
 import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.instrumented.Instrumented;
-import org.apache.gobblin.lineage.LineageException;
-import org.apache.gobblin.lineage.LineageInfo;
+import org.apache.gobblin.metrics.event.lineage.LineageException;
+import org.apache.gobblin.metrics.event.lineage.LineageEventBuilder;
 import org.apache.gobblin.metrics.MetricContext;
-import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.metrics.event.FailureEventBuilder;
+import org.apache.gobblin.metrics.event.lineage.LineageInfo;
 import org.apache.gobblin.publisher.CommitSequencePublisher;
 import org.apache.gobblin.publisher.DataPublisher;
 import org.apache.gobblin.publisher.UnpublishedHandling;
@@ -170,8 +173,7 @@ final class SafeDatasetCommit implements Callable<Void> {
       try {
         finalizeDatasetState(datasetState, datasetUrn);
         maySubmitFailureEvent(datasetState);
-        submitLineageEvent(datasetState.getTaskStates());
-
+        maySubmitLineageEvent(datasetState);
         if (commitSequenceBuilder.isPresent()) {
           buildAndExecuteCommitSequence(commitSequenceBuilder.get(), datasetState, datasetUrn);
           datasetState.setState(JobState.RunningState.COMMITTED);
@@ -197,38 +199,45 @@ final class SafeDatasetCommit implements Callable<Void> {
     }
   }
 
-  private void submitLineageEvent(Collection<TaskState> states) {
-    if (states.size() == 0) {
-      return;
+  private void maySubmitLineageEvent(JobState.DatasetState datasetState) {
+    Collection<TaskState> allStates = datasetState.getTaskStates();
+    Collection<TaskState> states = Lists.newArrayList();
+    // Filter out failed states or states that don't have lineage info
+    for (TaskState state : allStates) {
+      if (state.getWorkingState() == WorkUnitState.WorkingState.COMMITTED &&
+          LineageInfo.hasLineageInfo(state)) {
+        states.add(state);
+      }
     }
-
-    TaskState oneWorkUnitState = states.iterator().next();
-    if (!oneWorkUnitState.contains(LineageInfo.LINEAGE_DATASET_URN)) {
-      // Do nothing if the dataset is not configured with lineage info
+    if (states.size() == 0) {
       return;
     }
 
     try {
-      // Aggregate states by lineage.dataset.urn, in case datasetUrn may be set to empty
so that all task states falls into one empty dataset.
-      // FixMe: once all dataset.urn attribues are set properly, we don't need this aggregation.
-      Collection<Collection<State>> datasetStates = LineageInfo.aggregateByDatasetUrn(states).values();
-      for (Collection<State> dataState: datasetStates) {
-        Collection<LineageInfo> branchLineages = LineageInfo.load(dataState, LineageInfo.Level.All);
-        EventSubmitter submitter = new EventSubmitter.Builder(metricContext, LineageInfo.LINEAGE_NAME_SPACE).build();
-        for (LineageInfo info: branchLineages) {
-          submitter.submit(info.getId(), info.getLineageMetaData());
+      if (StringUtils.isEmpty(datasetUrn)) {
+        // This dataset may contain different kinds of LineageEvent
+        for (Collection<TaskState> collection : aggregateByLineageEvent(states)) {
+          submitLineageEvent(collection);
         }
+      } else {
+        submitLineageEvent(states);
       }
     } catch (LineageException e) {
-      log.error ("Lineage event submission failed due to :" + e.toString());
+      log.error("Lineage event submission failed due to :" + e.toString());
     } finally {
-      for (TaskState taskState: states) {
-        // Remove lineage info from the state to avoid sending duplicate lineage events in
the next run
-        taskState.removePropsWithPrefix(LineageInfo.LINEAGE_NAME_SPACE);
+      // Purge lineage info from all states
+      for (TaskState taskState : allStates) {
+        LineageInfo.purgeLineageInfo(taskState);
       }
     }
   }
 
+  private void submitLineageEvent(Collection<TaskState> states) throws LineageException
{
+    Collection<LineageEventBuilder> events = LineageInfo.load(states);
+    // Send events
+    events.forEach(event -> event.submit(metricContext));
+  }
+
   /**
    * Synchronized version of {@link #commitDataset(Collection, DataPublisher)} used when
publisher is not
    * thread safe.
@@ -415,4 +424,15 @@ final class SafeDatasetCommit implements Callable<Void> {
     return Optional.of(new DatasetStateCommitStep.Builder<>().withProps(datasetState).withDatasetUrn(datasetUrn)
         .withDatasetState(datasetState).build());
   }
+
+  private static Collection<Collection<TaskState>> aggregateByLineageEvent(Collection<TaskState>
states) {
+    Map<String, Collection<TaskState>> statesByEvents = Maps.newHashMap();
+    for (TaskState state : states) {
+      String eventName = LineageInfo.getFullEventName(state);
+      Collection<TaskState> statesForEvent = statesByEvents.computeIfAbsent(eventName,
k -> Lists.newArrayList());
+      statesForEvent.add(state);
+    }
+
+    return statesByEvents.values();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-runtime/src/test/resources/templates/textFileBasedSourceTest.template
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/test/resources/templates/textFileBasedSourceTest.template
b/gobblin-runtime/src/test/resources/templates/textFileBasedSourceTest.template
index 4cdf991..7380257 100644
--- a/gobblin-runtime/src/test/resources/templates/textFileBasedSourceTest.template
+++ b/gobblin-runtime/src/test/resources/templates/textFileBasedSourceTest.template
@@ -2,3 +2,5 @@ source.class=org.apache.gobblin.source.extractor.filebased.TextFileBasedSource
 writer.builder.class="org.apache.gobblin.writer.test.GobblinTestEventBusWriter$Builder"
 
 extract.table.type=APPEND_ONLY
+
+data.publisher.final.dir=/tmp

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-test-harness/src/test/resources/runtime_test/skip_workunits_test.properties
----------------------------------------------------------------------
diff --git a/gobblin-test-harness/src/test/resources/runtime_test/skip_workunits_test.properties
b/gobblin-test-harness/src/test/resources/runtime_test/skip_workunits_test.properties
index 83a97de..b5f8fef 100644
--- a/gobblin-test-harness/src/test/resources/runtime_test/skip_workunits_test.properties
+++ b/gobblin-test-harness/src/test/resources/runtime_test/skip_workunits_test.properties
@@ -24,6 +24,6 @@ job.lock.enabled=false
 state.store.dir=./gobblin-test-harness/src/test/resources/runtime_test/state_store
 writer.staging.dir=./gobblin-test-harness/src/test/resources/runtime_test/writer_staging
 writer.output.dir=./gobblin-test-harness/src/test/resources/runtime_test/writer_output
-
+data.publisher.final.dir=/tmp
 
 source.class=org.apache.gobblin.TestSkipWorkUnitsSource


Mime
View raw message