gobblin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hut...@apache.org
Subject incubator-gobblin git commit: [GOBBLIN-264] Add a SharedResourceFactory for creating shared DataPub…
Date Thu, 28 Sep 2017 16:59:26 GMT
Repository: incubator-gobblin
Updated Branches:
  refs/heads/master ae0ba2815 -> 8284bb76b


[GOBBLIN-264] Add a SharedResourceFactory for creating shared DataPub…

Closes #2116 from htran1/shareable_publishers


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/8284bb76
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/8284bb76
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/8284bb76

Branch: refs/heads/master
Commit: 8284bb76bac89c0e15186dbf75717e6ca831eab0
Parents: ae0ba28
Author: Hung Tran <hutran@linkedin.com>
Authored: Thu Sep 28 09:59:21 2017 -0700
Committer: Hung Tran <hutran@linkedin.com>
Committed: Thu Sep 28 09:59:21 2017 -0700

----------------------------------------------------------------------
 .../apache/gobblin/capability/Capability.java   |  40 ++++++
 .../gobblin/capability/CapabilityAware.java     |  38 ++++++
 .../apache/gobblin/publisher/DataPublisher.java |  14 +-
 .../gobblin/publisher/DataPublisherFactory.java |  92 +++++++++++++
 .../gobblin/publisher/DataPublisherKey.java     |  64 +++++++++
 .../publisher/DataPublisherFactoryTest.java     | 134 +++++++++++++++++++
 .../broker/ImmediatelyInvalidResourceEntry.java |  56 ++++++++
 .../apache/gobblin/broker/ResourceInstance.java |  15 ++-
 .../org/apache/gobblin/util/ParallelRunner.java |  80 ++++++-----
 9 files changed, 501 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8284bb76/gobblin-api/src/main/java/org/apache/gobblin/capability/Capability.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/capability/Capability.java b/gobblin-api/src/main/java/org/apache/gobblin/capability/Capability.java
new file mode 100644
index 0000000..02ee89a
--- /dev/null
+++ b/gobblin-api/src/main/java/org/apache/gobblin/capability/Capability.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.capability;
+
+import org.apache.gobblin.annotation.Alpha;
+
+import lombok.Data;
+
+/**
+ * Represents a set of functionality a job-creator can ask for. Examples could include
+ * encryption, compression, partitioning...
+ *
+ * Each Capability has a name and then a set of associated configuration properties. An example
is
+ * the encryption algorithm to use.
+ */
+@Alpha
+@Data
+public class Capability {
+  /**
+   * Threadsafe capability.
+   */
+  public static final Capability THREADSAFE = new Capability("THREADSAFE", false);
+
+  private final String name;
+  private final boolean critical;
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8284bb76/gobblin-api/src/main/java/org/apache/gobblin/capability/CapabilityAware.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/capability/CapabilityAware.java
b/gobblin-api/src/main/java/org/apache/gobblin/capability/CapabilityAware.java
new file mode 100644
index 0000000..2b56469
--- /dev/null
+++ b/gobblin-api/src/main/java/org/apache/gobblin/capability/CapabilityAware.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.capability;
+
+import java.util.Map;
+
+import org.apache.gobblin.annotation.Alpha;
+
+/**
+ * Describes an object that is aware of the capabilities it supports.
+ */
+@Alpha
+public interface CapabilityAware {
+  /**
+   * Checks if this object supports the given Capability with the given properties.
+   *
+   * Implementers of this should always check if their super-class may happen to support
a capability
+   * before returning false!
+   * @param c Capability being queried
+   * @param properties Properties specific to the capability. Properties are capability specific.
+   * @return True if this object supports the given capability + property settings, false
if not
+   */
+  boolean supportsCapability(Capability c, Map<String, Object> properties);
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8284bb76/gobblin-api/src/main/java/org/apache/gobblin/publisher/DataPublisher.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/publisher/DataPublisher.java b/gobblin-api/src/main/java/org/apache/gobblin/publisher/DataPublisher.java
index a190d4a..4a551df 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/publisher/DataPublisher.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/publisher/DataPublisher.java
@@ -21,7 +21,10 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.util.Collection;
+import java.util.Map;
 
+import org.apache.gobblin.capability.Capability;
+import org.apache.gobblin.capability.CapabilityAware;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.configuration.WorkUnitState;
@@ -30,7 +33,11 @@ import org.apache.gobblin.configuration.WorkUnitState;
 /**
  * Defines how to publish data and its corresponding metadata. Can be used for either task
level or job level publishing.
  */
-public abstract class DataPublisher implements Closeable {
+public abstract class DataPublisher implements Closeable, CapabilityAware {
+  /**
+   * Reusable capability.
+   */
+  public static final Capability REUSABLE = new Capability("REUSABLE", false);
 
   protected final State state;
 
@@ -125,4 +132,9 @@ public abstract class DataPublisher implements Closeable {
   protected boolean shouldPublishMetadataFirst() {
     return true;
   }
+
+  @Override
+  public boolean supportsCapability(Capability c, Map<String, Object> properties) {
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8284bb76/gobblin-core/src/main/java/org/apache/gobblin/publisher/DataPublisherFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/publisher/DataPublisherFactory.java
b/gobblin-core/src/main/java/org/apache/gobblin/publisher/DataPublisherFactory.java
new file mode 100644
index 0000000..4e565ad
--- /dev/null
+++ b/gobblin-core/src/main/java/org/apache/gobblin/publisher/DataPublisherFactory.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.publisher;
+
+import java.io.IOException;
+import java.util.Collections;
+
+import org.apache.gobblin.broker.ImmediatelyInvalidResourceEntry;
+import org.apache.gobblin.broker.ResourceInstance;
+import org.apache.gobblin.broker.iface.ConfigView;
+import org.apache.gobblin.broker.iface.NotConfiguredException;
+import org.apache.gobblin.broker.iface.ScopeType;
+import org.apache.gobblin.broker.iface.ScopedConfigView;
+import org.apache.gobblin.broker.iface.SharedResourceFactory;
+import org.apache.gobblin.broker.iface.SharedResourceFactoryResponse;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+import org.apache.gobblin.capability.Capability;
+import org.apache.gobblin.configuration.State;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * A {@link SharedResourceFactory} for creating {@link DataPublisher}s.
+ *
+ * The factory creates a {@link DataPublisher} with the publisher class name and state.
+ */
+@Slf4j
+public class DataPublisherFactory<S extends ScopeType<S>>
+    implements SharedResourceFactory<DataPublisher, DataPublisherKey, S> {
+
+  public static final String FACTORY_NAME = "dataPublisher";
+
+  public static <S extends ScopeType<S>> DataPublisher get(String publisherClassName,
State state,
+      SharedResourcesBroker<S> broker) throws IOException {
+    try {
+      return broker.getSharedResource(new DataPublisherFactory<S>(), new DataPublisherKey(publisherClassName,
state));
+    } catch (NotConfiguredException nce) {
+      throw new IOException(nce);
+    }
+  }
+
+  @Override
+  public String getName() {
+    return FACTORY_NAME;
+  }
+
+  @Override
+  public SharedResourceFactoryResponse<DataPublisher> createResource(SharedResourcesBroker<S>
broker,
+      ScopedConfigView<S, DataPublisherKey> config) throws NotConfiguredException {
+    try {
+      DataPublisherKey key = config.getKey();
+      String publisherClassName = key.getPublisherClassName();
+      State state = key.getState();
+      Class<? extends DataPublisher> dataPublisherClass =  (Class<? extends DataPublisher>)
Class
+          .forName(publisherClassName);
+
+      DataPublisher publisher = DataPublisher.getInstance(dataPublisherClass, state);
+
+      // If the publisher is threadsafe then it is shareable, so return it as a resource
instance that may be cached
+      // by the broker.
+      // Otherwise, it is not shareable, so return it as an immediately invalidated resource
that will only be returned
+      // once from the broker.
+      if (publisher.supportsCapability(Capability.THREADSAFE, Collections.EMPTY_MAP)) {
+        return new ResourceInstance<>(publisher);
+      } else {
+        return new ImmediatelyInvalidResourceEntry<>(publisher);
+      }
+    } catch (ReflectiveOperationException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public S getAutoScope(SharedResourcesBroker<S> broker, ConfigView<S, DataPublisherKey>
config) {
+    return broker.selfScope().getType().rootScope();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8284bb76/gobblin-core/src/main/java/org/apache/gobblin/publisher/DataPublisherKey.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/publisher/DataPublisherKey.java
b/gobblin-core/src/main/java/org/apache/gobblin/publisher/DataPublisherKey.java
new file mode 100644
index 0000000..fca1ba1
--- /dev/null
+++ b/gobblin-core/src/main/java/org/apache/gobblin/publisher/DataPublisherKey.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.publisher;
+
+import org.apache.gobblin.broker.iface.SharedResourceKey;
+import org.apache.gobblin.configuration.State;
+
+import lombok.Getter;
+
+
+/**
+ * {@link SharedResourceKey} for requesting {@link DataPublisher}s from a
+ * {@link org.apache.gobblin.broker.iface.SharedResourceFactory
+ */
+@Getter
+public class DataPublisherKey implements SharedResourceKey {
+  private final String publisherClassName;
+  private final State state;
+
+  public DataPublisherKey(String publisherClassName, State state) {
+    this.publisherClassName = publisherClassName;
+    this.state = state;
+  }
+
+  @Override
+  public String toConfigurationKey() {
+    return this.publisherClassName;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    DataPublisherKey that = (DataPublisherKey) o;
+
+    return publisherClassName == null ?
+        that.publisherClassName == null : publisherClassName.equals(that.publisherClassName);
+  }
+
+  @Override
+  public int hashCode() {
+    return publisherClassName != null ? publisherClassName.hashCode() : 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8284bb76/gobblin-core/src/test/java/org/apache/gobblin/publisher/DataPublisherFactoryTest.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/java/org/apache/gobblin/publisher/DataPublisherFactoryTest.java
b/gobblin-core/src/test/java/org/apache/gobblin/publisher/DataPublisherFactoryTest.java
new file mode 100644
index 0000000..b2cd739
--- /dev/null
+++ b/gobblin-core/src/test/java/org/apache/gobblin/publisher/DataPublisherFactoryTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.publisher;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.ConfigFactory;
+
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.broker.SimpleScope;
+import org.apache.gobblin.broker.SimpleScopeType;
+import org.apache.gobblin.broker.iface.NoSuchScopeException;
+import org.apache.gobblin.broker.iface.NotConfiguredException;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+import org.apache.gobblin.capability.Capability;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.configuration.WorkUnitState;
+
+/**
+ * Tests for DataPublisherFactory
+ */
+public class DataPublisherFactoryTest {
+
+  @Test
+  public void testGetNonThreadSafePublisher()
+      throws IOException {
+    SharedResourcesBroker broker =
+        SharedResourcesBrokerFactory.<SimpleScopeType>createDefaultTopLevelBroker(ConfigFactory.empty(),
+            SimpleScopeType.GLOBAL.defaultScopeInstance());
+
+    DataPublisher publisher1 = DataPublisherFactory.get(TestNonThreadsafeDataPublisher.class.getName(),
null, broker);
+    DataPublisher publisher2 = DataPublisherFactory.get(TestNonThreadsafeDataPublisher.class.getName(),
null, broker);
+
+    // should get different publishers
+    Assert.assertNotEquals(publisher1, publisher2);
+
+    // Check capabilities
+    Assert.assertTrue(publisher1.supportsCapability(DataPublisher.REUSABLE, Collections.EMPTY_MAP));
+    Assert.assertFalse(publisher1.supportsCapability(Capability.THREADSAFE, Collections.EMPTY_MAP));
+  }
+
+  @Test
+  public void testGetThreadSafePublisher()
+      throws IOException, NotConfiguredException, NoSuchScopeException {
+    SharedResourcesBroker<SimpleScopeType> broker =
+        SharedResourcesBrokerFactory.<SimpleScopeType>createDefaultTopLevelBroker(ConfigFactory.empty(),
+            SimpleScopeType.GLOBAL.defaultScopeInstance());
+
+    SharedResourcesBroker<SimpleScopeType> localBroker1 =
+        broker.newSubscopedBuilder(new SimpleScope<>(SimpleScopeType.LOCAL, "local1")).build();
+
+    DataPublisher publisher1 = DataPublisherFactory.get(TestThreadsafeDataPublisher.class.getName(),
null, broker);
+    DataPublisher publisher2 = DataPublisherFactory.get(TestThreadsafeDataPublisher.class.getName(),
null, broker);
+
+    // should get the same publisher
+    Assert.assertEquals(publisher1, publisher2);
+
+    DataPublisher publisher3 =
+        localBroker1.getSharedResource(new DataPublisherFactory<>(),
+            new DataPublisherKey(TestThreadsafeDataPublisher.class.getName(), null));
+
+    // should get the same publisher
+    Assert.assertEquals(publisher2, publisher3);
+
+    DataPublisher publisher4 =
+        localBroker1.getSharedResourceAtScope(new DataPublisherFactory<>(),
+            new DataPublisherKey(TestThreadsafeDataPublisher.class.getName(), null), SimpleScopeType.LOCAL);
+
+    // should get a different publisher
+    Assert.assertNotEquals(publisher3, publisher4);
+
+    // Check capabilities
+    Assert.assertTrue(publisher1.supportsCapability(DataPublisher.REUSABLE, Collections.EMPTY_MAP));
+    Assert.assertTrue(publisher1.supportsCapability(Capability.THREADSAFE, Collections.EMPTY_MAP));
+  }
+
+  private static class TestNonThreadsafeDataPublisher extends DataPublisher {
+    public TestNonThreadsafeDataPublisher(State state) {
+      super(state);
+    }
+
+    @Override
+    public void initialize() throws IOException {
+    }
+
+    @Override
+    public void publishData(Collection<? extends WorkUnitState> states) throws IOException
{
+    }
+
+    @Override
+    public void publishMetadata(Collection<? extends WorkUnitState> states) throws
IOException {
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    @Override
+    public boolean supportsCapability(Capability c, Map<String, Object> properties)
{
+      return c == DataPublisher.REUSABLE;
+    }
+  }
+
+  private static class TestThreadsafeDataPublisher extends TestNonThreadsafeDataPublisher
{
+    public TestThreadsafeDataPublisher(State state) {
+      super(state);
+    }
+
+    @Override
+    public boolean supportsCapability(Capability c, Map<String, Object> properties)
{
+      return (c == Capability.THREADSAFE || c == DataPublisher.REUSABLE);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8284bb76/gobblin-utility/src/main/java/org/apache/gobblin/broker/ImmediatelyInvalidResourceEntry.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/broker/ImmediatelyInvalidResourceEntry.java
b/gobblin-utility/src/main/java/org/apache/gobblin/broker/ImmediatelyInvalidResourceEntry.java
new file mode 100644
index 0000000..b3f8502
--- /dev/null
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/broker/ImmediatelyInvalidResourceEntry.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.broker;
+
+import lombok.EqualsAndHashCode;
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * A {@link ResourceEntry} that expires immediately. The resource is not closed on invalidation
since the lifetime of
+ * the object cannot be determined by the cache, so the recipient of the resource needs to
close it.
+ */
+@Slf4j
+@EqualsAndHashCode(callSuper = true)
+public class ImmediatelyInvalidResourceEntry<T> extends ResourceInstance<T> {
+  private boolean valid;
+
+  public ImmediatelyInvalidResourceEntry(T resource) {
+    super(resource);
+    this.valid = true;
+  }
+
+  @Override
+  public T getResource() {
+    // mark the object as invalid before returning so that a new one will be created on the
next
+    // request from the factory
+    this.valid = false;
+
+    return super.getResource();
+  }
+
+  @Override
+  public boolean isValid() {
+    return this.valid;
+  }
+
+  @Override
+  public void onInvalidate() {
+    // these type of resource cannot be closed on invalidation since the lifetime can't be
determined
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8284bb76/gobblin-utility/src/main/java/org/apache/gobblin/broker/ResourceInstance.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/broker/ResourceInstance.java
b/gobblin-utility/src/main/java/org/apache/gobblin/broker/ResourceInstance.java
index e0f8611..b5f04c3 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/broker/ResourceInstance.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/broker/ResourceInstance.java
@@ -26,7 +26,20 @@ import lombok.Data;
  */
 @Data
 public class ResourceInstance<T> implements ResourceEntry<T> {
-  private final T resource;
+  // Note: the name here is theResource instead of resource since to avoid a collision of
the lombok generated getter
+  // and the getResource() method defined in {@link ResourceEntry}. The collision results
in unintended side effects
+  // when getResource() is overridden since it may have additional logic that should not
be executed when the value of
+  // this field is fetched using the getter, such as in the Lombok generated toString().
+  private final T theResource;
+
+  /**
+   * This method returns the resource, but may have logic before the return.
+   * @return the resource
+   */
+  @Override
+  public T getResource() {
+    return getTheResource();
+  }
 
   @Override
   public boolean isValid() {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8284bb76/gobblin-utility/src/main/java/org/apache/gobblin/util/ParallelRunner.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/ParallelRunner.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/ParallelRunner.java
index c112d5b..e2f4964 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/ParallelRunner.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/ParallelRunner.java
@@ -18,6 +18,8 @@
 package org.apache.gobblin.util;
 
 import lombok.Data;
+import lombok.Getter;
+import lombok.Setter;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -83,7 +85,14 @@ public class ParallelRunner implements Closeable {
   public static final int DEFAULT_PARALLEL_RUNNER_THREADS = 10;
 
   private final ExecutorService executor;
-  private final FileSystem fs;
+
+  /**
+   * Setting of fs is allowed to support reusing the {@link ParallelRunner} with different
{@link FileSystem}s
+   * after all tasks have completed execution.
+   */
+  @Getter
+  @Setter
+  private FileSystem fs;
 
   private final List<NamedFuture> futures = Lists.newArrayList();
 
@@ -341,38 +350,49 @@ public class ParallelRunner implements Closeable {
     this.futures.add(new NamedFuture(this.executor.submit(callable), name));
   }
 
-  @Override
-  public void close() throws IOException {
+  /**
+   * Wait for all submitted tasks to complete. The {@link ParallelRunner} can be reused after
this call.
+   * @throws IOException
+   */
+  public void waitForTasks() throws IOException {
     // Wait for all submitted tasks to complete
-    try {
-      boolean wasInterrupted = false;
-      IOException exception = null;
-      for (NamedFuture future : this.futures) {
-        try {
-          if (wasInterrupted) {
-            future.getFuture().cancel(true);
-          } else {
-            future.getFuture().get();
-          }
-        } catch (InterruptedException ie) {
-          LOGGER.warn("Task was interrupted: " + future.getName());
-          wasInterrupted = true;
-          if (exception == null) {
-            exception = new IOException(ie);
-          }
-        } catch (ExecutionException ee) {
-          LOGGER.warn("Task failed: " + future.getName(), ee.getCause());
-          if (exception == null) {
-            exception = new IOException(ee.getCause());
-          }
+    boolean wasInterrupted = false;
+    IOException exception = null;
+    for (NamedFuture future : this.futures) {
+      try {
+        if (wasInterrupted) {
+          future.getFuture().cancel(true);
+        } else {
+          future.getFuture().get();
+        }
+      } catch (InterruptedException ie) {
+        LOGGER.warn("Task was interrupted: " + future.getName());
+        wasInterrupted = true;
+        if (exception == null) {
+          exception = new IOException(ie);
+        }
+      } catch (ExecutionException ee) {
+        LOGGER.warn("Task failed: " + future.getName(), ee.getCause());
+        if (exception == null) {
+          exception = new IOException(ee.getCause());
         }
       }
-      if (wasInterrupted) {
-        Thread.currentThread().interrupt();
-      }
-      if (exception != null && this.failPolicy == FailPolicy.FAIL_ONE_FAIL_ALL) {
-        throw exception;
-      }
+    }
+    if (wasInterrupted) {
+      Thread.currentThread().interrupt();
+    }
+    if (exception != null && this.failPolicy == FailPolicy.FAIL_ONE_FAIL_ALL) {
+      throw exception;
+    }
+
+    // clear so that more tasks can be submitted to this ParallelRunner
+    futures.clear();
+  }
+
+  @Override
+  public void close() throws IOException {
+    try {
+      waitForTasks();
     } finally {
       ExecutorsUtils.shutdownExecutorService(this.executor, Optional.of(LOGGER));
     }


Mime
View raw message