gobblin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject incubator-gobblin git commit: [GOBBLIN-222] Fix silent failure for loading incompatible state-store
Date Tue, 29 Aug 2017 23:46:26 GMT
Repository: incubator-gobblin
Updated Branches:
  refs/heads/master b67fd71b1 -> 90be15f47


[GOBBLIN-222] Fix silent failure for loading incompatible state-store

Fix silent failure for laoding imcompatible
statestore

Fix the test failure: The executor doesn't handle
the exception properly and we add a
logAndThrowFailures function in IteratorExecutors

Rename the shim layer pakage but keep them
temporarily;
Added runtime Package name changing routine in
gobblin

resolveing conflicts

Resolve merge conflicts

Remove unecessary xml change bacause of intellij's
renaming, fixed findbugsMain

change the shim layer package name back for safety

Closes #2073 from autumnust/deserialization


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/90be15f4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/90be15f4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/90be15f4

Branch: refs/heads/master
Commit: 90be15f470b1427c935fe11c6595b9f8184536ec
Parents: b67fd71
Author: Lei Sun <autumnust@gmail.com>
Authored: Tue Aug 29 16:46:18 2017 -0700
Committer: Abhishek Tiwari <abhishektiwari.btech@gmail.com>
Committed: Tue Aug 29 16:46:18 2017 -0700

----------------------------------------------------------------------
 conf/log4j-compaction.xml                       |  2 +-
 conf/log4j-mapreduce.xml                        |  4 +-
 .../extractor/CheckpointableWatermark.java      |  2 +-
 .../apache/gobblin/metastore/FsStateStore.java  | 13 +++---
 .../local/gobblin-oozie-example-workflow.xml    |  2 +-
 .../gobblin/runtime/FsDatasetStateStore.java    | 22 +++++++--
 .../runtime/FsDatasetStateStoreTest.java        | 26 +++++++++++
 .../util/executors/IteratorExecutor.java        | 31 +++++++++++++
 .../util/hadoop/GobblinSequenceFileReader.java  | 49 ++++++++++++++++++++
 9 files changed, 137 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/90be15f4/conf/log4j-compaction.xml
----------------------------------------------------------------------
diff --git a/conf/log4j-compaction.xml b/conf/log4j-compaction.xml
index 8b42725..cfce89c 100644
--- a/conf/log4j-compaction.xml
+++ b/conf/log4j-compaction.xml
@@ -36,7 +36,7 @@
     </layout>
   </appender>
 
-  <logger name="gobblin.compaction" additivity="false">
+  <logger name="org.apache.gobblin.compaction" additivity="false">
     <level value="info" />
     <appender-ref ref="console" />
   </logger>

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/90be15f4/conf/log4j-mapreduce.xml
----------------------------------------------------------------------
diff --git a/conf/log4j-mapreduce.xml b/conf/log4j-mapreduce.xml
index 5ab248c..826493a 100644
--- a/conf/log4j-mapreduce.xml
+++ b/conf/log4j-mapreduce.xml
@@ -42,12 +42,12 @@
     </layout>
   </appender>
 
-  <logger name="gobblin.runtime" additivity="false">
+  <logger name="org.apache.gobblin.runtime" additivity="false">
     <level value="INFO"/>
     <appender-ref ref="FileRoll" />
   </logger>
 
-  <logger name="gobblin.runtime.mapreduce.CliMRJobLauncher" additivity="false">
+  <logger name="org.apache.gobblin.runtime.mapreduce.CliMRJobLauncher" additivity="false">
     <level value="ERROR"/>
     <appender-ref ref="Console" />
   </logger>

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/90be15f4/gobblin-api/src/main/java/org/apache/gobblin/source/extractor/CheckpointableWatermark.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/source/extractor/CheckpointableWatermark.java
b/gobblin-api/src/main/java/org/apache/gobblin/source/extractor/CheckpointableWatermark.java
index d257e5c..7d2fbab 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/source/extractor/CheckpointableWatermark.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/source/extractor/CheckpointableWatermark.java
@@ -38,4 +38,4 @@ public interface CheckpointableWatermark extends Watermark, Comparable<Checkpoin
    */
   ComparableWatermark getWatermark();
 
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/90be15f4/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FsStateStore.java
----------------------------------------------------------------------
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FsStateStore.java
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FsStateStore.java
index 96c3e51..54dbdd7 100644
--- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FsStateStore.java
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FsStateStore.java
@@ -19,13 +19,12 @@ package org.apache.gobblin.metastore;
 
 import static org.apache.gobblin.util.HadoopUtils.FS_SCHEMES_NON_ATOMIC;
 
-import com.google.common.base.Predicate;
-import org.apache.gobblin.util.HadoopUtils;
 import java.io.IOException;
 import java.net.URI;
 import java.util.Collection;
 import java.util.List;
 
+import org.apache.gobblin.util.hadoop.GobblinSequenceFileReader;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -34,13 +33,15 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.gobblin.util.HadoopUtils;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.util.WritableShimSerialization;
 
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.io.Closer;
+import com.google.common.base.Predicate;
 
-import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.util.WritableShimSerialization;
 
 
 /**
@@ -225,7 +226,7 @@ public class FsStateStore<T extends State> implements StateStore<T>
{
     Closer closer = Closer.create();
     try {
       @SuppressWarnings("deprecation")
-      SequenceFile.Reader reader = closer.register(new SequenceFile.Reader(this.fs, tablePath,
this.conf));
+      GobblinSequenceFileReader reader = closer.register(new GobblinSequenceFileReader(this.fs,
tablePath, this.conf));
       try {
         Text key = new Text();
         T state = this.stateClass.newInstance();
@@ -260,7 +261,7 @@ public class FsStateStore<T extends State> implements StateStore<T>
{
     Closer closer = Closer.create();
     try {
       @SuppressWarnings("deprecation")
-      SequenceFile.Reader reader = closer.register(new SequenceFile.Reader(this.fs, tablePath,
this.conf));
+      GobblinSequenceFileReader reader = closer.register(new GobblinSequenceFileReader(this.fs,
tablePath, this.conf));
       try {
         Text key = new Text();
         T state = this.stateClass.newInstance();

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/90be15f4/gobblin-oozie/src/test/resources/local/gobblin-oozie-example-workflow.xml
----------------------------------------------------------------------
diff --git a/gobblin-oozie/src/test/resources/local/gobblin-oozie-example-workflow.xml b/gobblin-oozie/src/test/resources/local/gobblin-oozie-example-workflow.xml
index 637ea74..aac84ed 100644
--- a/gobblin-oozie/src/test/resources/local/gobblin-oozie-example-workflow.xml
+++ b/gobblin-oozie/src/test/resources/local/gobblin-oozie-example-workflow.xml
@@ -27,7 +27,7 @@
 					<value>true</value>
 				</property>
 			</configuration>
-			<main-class>gobblin.runtime.local.CliLocalJobLauncher</main-class>
+			<main-class>org.apache.gobblin.runtime.local.CliLocalJobLauncher</main-class>
 			<arg>--jobconfig</arg>
 			<arg>${nameNode}/path/to/jobconfig.properties</arg>
 			<arg>--sysconfig</arg>

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/90be15f4/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/FsDatasetStateStore.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/FsDatasetStateStore.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/FsDatasetStateStore.java
index fa35921..9da34d7 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/FsDatasetStateStore.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/FsDatasetStateStore.java
@@ -34,6 +34,7 @@ import org.apache.gobblin.metastore.predicates.StateStorePredicate;
 import org.apache.gobblin.metastore.predicates.StoreNamePredicate;
 import org.apache.gobblin.runtime.metastore.filesystem.FsDatasetStateStoreEntryManager;
 import org.apache.gobblin.util.filters.HiddenFilter;
+import org.apache.gobblin.util.hadoop.GobblinSequenceFileReader;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -234,8 +235,24 @@ public class FsDatasetStateStore extends FsStateStore<JobState.DatasetState>
imp
 
     Configuration deserializeConfig = new Configuration(this.conf);
     WritableShimSerialization.addToHadoopConfiguration(deserializeConfig);
-    try (@SuppressWarnings("deprecation") SequenceFile.Reader reader = new SequenceFile.Reader(this.fs,
tablePath,
+    try (@SuppressWarnings("deprecation") GobblinSequenceFileReader reader = new GobblinSequenceFileReader(this.fs,
tablePath,
         deserializeConfig)) {
+
+      /**
+       * Add this change so that all stateful flow will have back compatibility.
+       * Shim layer of state store is therefore avoided because of this change.
+       * Keep the implementation of Shim layer temporarily.
+       */
+     String className = reader.getValueClassName();
+     if (className.startsWith("gobblin")) {
+       LOGGER.warn("There's old JobState with no apache package name being read while we
cast them at runtime");
+       className = "org.apache." + className;
+     }
+
+      if (!className.equals(JobState.class.getName()) && !className.equals(JobState.DatasetState.class.getName()))
{
+        throw new RuntimeException("There is a mismatch in the Class Type of state in state-store
and that in runtime");
+      }
+
       // This is necessary for backward compatibility as existing jobs are using the JobState
class
       Object writable = reader.getValueClass() == JobState.class ? new JobState() : new JobState.DatasetState();
 
@@ -255,7 +272,6 @@ public class FsDatasetStateStore extends FsStateStore<JobState.DatasetState>
imp
         throw new IOException(e);
       }
     }
-
     return states;
   }
 
@@ -320,7 +336,7 @@ public class FsDatasetStateStore extends FsStateStore<JobState.DatasetState>
imp
               ExecutorsUtils.newDaemonThreadFactory(Optional.of(LOGGER), Optional.of("GetFsDatasetStateStore-")))
               .executeAndGetResults();
       int maxNumberOfErrorLogs = 10;
-      IteratorExecutor.logFailures(results, LOGGER, maxNumberOfErrorLogs);
+      IteratorExecutor.logAndThrowFailures(results, LOGGER, maxNumberOfErrorLogs);
     } catch (InterruptedException e) {
       throw new IOException("Failed to get latest dataset states.", e);
     }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/90be15f4/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/FsDatasetStateStoreTest.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/FsDatasetStateStoreTest.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/FsDatasetStateStoreTest.java
index 1ff6a07..edeca5b 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/FsDatasetStateStoreTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/FsDatasetStateStoreTest.java
@@ -44,6 +44,7 @@ import org.apache.gobblin.metastore.StateStore;
 import com.google.common.io.Files;
 
 
+
 /**
  * Unit tests for {@link FsDatasetStateStore}.
  *
@@ -181,6 +182,31 @@ public class FsDatasetStateStoreTest {
     Assert.assertEquals(datasetState.getDuration(), 1000);
   }
 
+  /**
+   * Loading previous statestore without apache package name.
+   *
+   * Specifically the example used here is the state store generated from previous gobblin-kafka
version without
+   * changing the package name into apache-intialized.
+   *
+   * Should pass the test even the class name doesn't match given the change in
+   * @throws IOException
+   */
+  @Test
+  public void testGetPreviousDatasetStatesByUrnsNoApache() throws IOException{
+    String JOB_NAME_FOR_INCOMPATIBLE_STATE_STORE = "test_failing_job";
+
+    FsDatasetStateStore _fsDatasetStateStore =
+        new FsDatasetStateStore(ConfigurationKeys.LOCAL_FS_URI,
+            "gobblin-runtime/src/test/resources/datasetState");
+
+    try {
+      Map<String, JobState.DatasetState> datasetStatesByUrns =
+          _fsDatasetStateStore.getLatestDatasetStatesByUrns(JOB_NAME_FOR_INCOMPATIBLE_STATE_STORE);
+    } catch (RuntimeException re){
+      Assert.fail("Loading of state store should not fail.");
+    }
+  }
+
   @Test
   public void testGetMetadataForTables() throws Exception {
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/90be15f4/gobblin-utility/src/main/java/org/apache/gobblin/util/executors/IteratorExecutor.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/executors/IteratorExecutor.java
b/gobblin-utility/src/main/java/org/apache/gobblin/util/executors/IteratorExecutor.java
index 13dc93e..2ca271e 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/executors/IteratorExecutor.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/executors/IteratorExecutor.java
@@ -169,4 +169,35 @@ public class IteratorExecutor<T> {
     }
   }
 
+  /**
+   * Log failures in the output of {@link #executeAndGetResults()}, and also propagate exception
to upper layer.
+   * @param results output of {@link #executeAndGetResults()}
+   * @param useLogger logger to log the messages into.
+   * @param atMost will log at most this many errors.
+   */
+  public static <T> void logAndThrowFailures(List<Either<T, ExecutionException>>
results, Logger useLogger, int atMost) {
+    Logger actualLogger = useLogger == null ? log : useLogger;
+    Iterator<Either<T, ExecutionException>> it = results.iterator();
+    int printed = 0;
+    ExecutionException exc = null;
+
+    while (it.hasNext()) {
+      Either<T, ExecutionException> nextResult = it.next();
+      if (nextResult instanceof Either.Right) {
+        exc = ((Either.Right<T, ExecutionException>) nextResult).getRight();
+        actualLogger.error("Iterator executor failure.", exc);
+        printed++;
+        if (printed >= atMost) {
+          break;
+        }
+      }
+    }
+
+    /**
+     * Throw any exception that Executor ran into.
+     */
+    if (printed > 0) {
+      throw new RuntimeException(exc);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/90be15f4/gobblin-utility/src/main/java/org/apache/gobblin/util/hadoop/GobblinSequenceFileReader.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/hadoop/GobblinSequenceFileReader.java
b/gobblin-utility/src/main/java/org/apache/gobblin/util/hadoop/GobblinSequenceFileReader.java
new file mode 100644
index 0000000..c619fa5
--- /dev/null
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/hadoop/GobblinSequenceFileReader.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util.hadoop;
+
+import java.io.IOException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+
+
+/**
+ * Override the {@link SequenceFile.Reader} mainly to
+ * override the {@link SequenceFile.Reader {@link #getValueClassName()}} so that
+ * we can handle the package name issue properly.
+ */
+@Slf4j
+public class GobblinSequenceFileReader extends SequenceFile.Reader {
+  public GobblinSequenceFileReader(FileSystem fs, Path file,
+      Configuration conf) throws IOException {
+    super(fs, file, conf);
+  }
+
+  /** Returns the name of the value class. */
+  public String getValueClassName() {
+    if (super.getValueClassName().startsWith("gobblin.")) {
+      log.info("[We have]   " + super.getValueClassName());
+      return "org.apache." + super.getValueClassName();
+    }
+
+    return super.getValueClassName();
+  }
+}


Mime
View raw message