gobblin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ibuen...@apache.org
Subject incubator-gobblin git commit: [GOBBLIN-210] Implemented two abstract sources based on dataset finder
Date Thu, 17 Aug 2017 21:40:54 GMT
Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 280b1d35e -> b54e2818d


[GOBBLIN-210] Implemented two abstract sources based on dataset finder

Closes #2063 from ibuenros/datasetfinder-source


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/b54e2818
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/b54e2818
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/b54e2818

Branch: refs/heads/master
Commit: b54e2818d2b0861a019dafd0ea62b83f701152ee
Parents: 280b1d3
Author: ibuenros <issac.buenrostro@gmail.com>
Authored: Thu Aug 17 14:40:44 2017 -0700
Committer: Issac Buenrostro <ibuenros@apache.org>
Committed: Thu Aug 17 14:40:44 2017 -0700

----------------------------------------------------------------------
 .../org/apache/gobblin/dataset/Dataset.java     |  12 +-
 .../gobblin/dataset/IterableDatasetFinder.java  |  24 ++
 .../gobblin/dataset/PartitionableDataset.java   |  61 +++++
 .../apache/gobblin/dataset/URNIdentified.java   |  29 +++
 .../URNLexicographicalComparator.java           |  53 +++++
 .../dataset/comparators/package-info.java       |   4 +
 .../dataset/test/SimpleDatasetForTesting.java   |  38 ++++
 .../test/SimpleDatasetPartitionForTesting.java  |  35 +++
 .../SimplePartitionableDatasetForTesting.java   |  56 +++++
 .../test/StaticDatasetsFinderForTesting.java    |  61 +++++
 .../source/workunit/BasicWorkUnitStream.java    |  13 +-
 .../management/copy/CloseableFsCopySource.java  |   2 +-
 .../data/management/copy/CopySource.java        |  14 +-
 .../data/management/dataset/DatasetUtils.java   |   9 +
 .../management/source/DatasetFinderSource.java  | 141 ++++++++++++
 .../source/LoopingDatasetFinderSource.java      | 226 +++++++++++++++++++
 .../source/DatasetFinderSourceTest.java         | 137 +++++++++++
 .../source/LoopingDatasetFinderSourceTest.java  | 218 ++++++++++++++++++
 .../apache/gobblin/runtime/task/NoopTask.java   |  60 +++++
 .../org/apache/gobblin/util/HadoopUtils.java    |  20 ++
 20 files changed, 1204 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b54e2818/gobblin-api/src/main/java/org/apache/gobblin/dataset/Dataset.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/Dataset.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/Dataset.java
index fb8c1fa..abc225f 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/dataset/Dataset.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/Dataset.java
@@ -20,9 +20,17 @@ package org.apache.gobblin.dataset;
 /**
  * Interface representing a dataset.
  */
-public interface Dataset {
+public interface Dataset extends URNIdentified {
+
   /**
-   * Deepest {@link org.apache.hadoop.fs.Path} that contains all files in the dataset.
+   * URN for this dataset.
+   * @deprecated use {@link #getUrn()}
    */
+  @Deprecated
   public String datasetURN();
+
+  @Override
+  default String getUrn() {
+    return datasetURN();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b54e2818/gobblin-api/src/main/java/org/apache/gobblin/dataset/IterableDatasetFinder.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/IterableDatasetFinder.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/IterableDatasetFinder.java
index 93f9586..a842c3c 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/dataset/IterableDatasetFinder.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/IterableDatasetFinder.java
@@ -18,7 +18,11 @@
 package org.apache.gobblin.dataset;
 
 import java.io.IOException;
+import java.util.Comparator;
 import java.util.Iterator;
+import java.util.Spliterators;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
 
 
 /**
@@ -30,7 +34,27 @@ public interface IterableDatasetFinder<T extends Dataset> extends DatasetsFinder
   /**
    * @return An {@link Iterator} over the {@link Dataset}s found.
    * @throws IOException
+   * @deprecated use {@link #getDatasetsStream} instead.
    */
+  @Deprecated
   public Iterator<T> getDatasetsIterator() throws IOException;
 
+  /**
+   * Get a stream of {@link Dataset}s found.
+   * @param desiredCharacteristics desired {@link java.util.Spliterator} characteristics of this stream. The returned
+   *                               stream need not satisfy these characteristics, this argument merely implies that the
+   *                               caller will run optimally when those characteristics are present, allowing pushdown of
+   *                               those characteristics. For example {@link java.util.Spliterator#SORTED} can sometimes
+   *                               be pushed down at a cost, so the {@link DatasetsFinder} would only push it down if it is valuable
+   *                               for the caller.
+   * @param suggestedOrder suggested order of the datasets in the stream. Implementation may or may not return the entries
+   *                       in that order. If the entries are in that order, implementation should ensure the spliterator
+   *                       is annotated as such.
+   * @return a stream of {@link Dataset}s found.
+   * @throws IOException
+   */
+  default Stream<T> getDatasetsStream(int desiredCharacteristics, Comparator<T> suggestedOrder) throws IOException {
+    return StreamSupport.stream(Spliterators.spliteratorUnknownSize(getDatasetsIterator(), 0), false);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b54e2818/gobblin-api/src/main/java/org/apache/gobblin/dataset/PartitionableDataset.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/PartitionableDataset.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/PartitionableDataset.java
new file mode 100644
index 0000000..06e1ec0
--- /dev/null
+++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/PartitionableDataset.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.dataset;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.stream.Stream;
+
+
+/**
+ * A {@link Dataset} that can be partitioned into disjoint subsets of the dataset.
+ * @param <T> the type of partitions returned by the dataset.
+ */
+public interface PartitionableDataset<T extends PartitionableDataset.DatasetPartition> extends Dataset {
+
+  /**
+   * Get a stream of partitions.
+   * @param desiredCharacteristics desired {@link java.util.Spliterator} characteristics of this stream. The returned
+   *                               stream need not satisfy these characteristics, this argument merely implies that the
+   *                               caller will run optimally when those characteristics are present, allowing pushdown of
+   *                               those characteristics. For example {@link java.util.Spliterator#SORTED} can sometimes
+   *                               be pushed down at a cost, so the {@link Dataset} would only push it down if it is valuable
+   *                               for the caller.
+   * @param suggestedOrder suggested order of the partitions in the stream. Implementation may or may not return the entries
+   *                       in that order. If the entries are in that order, implementation should ensure the spliterator
+   *                       is annotated as such.
+   * @return a {@link Stream} over {@link DatasetPartition}s in this dataset.
+   */
+  Stream<T> getPartitions(int desiredCharacteristics, Comparator<T> suggestedOrder) throws IOException;
+
+  /**
+   * A partition of a {@link PartitionableDataset}.
+   */
+  interface DatasetPartition extends URNIdentified {
+    /**
+     * URN for this dataset.
+     */
+    String getUrn();
+
+    /**
+     * @return Dataset this partition belongs to.
+     */
+    Dataset getDataset();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b54e2818/gobblin-api/src/main/java/org/apache/gobblin/dataset/URNIdentified.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/URNIdentified.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/URNIdentified.java
new file mode 100644
index 0000000..b6d5137
--- /dev/null
+++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/URNIdentified.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.dataset;
+
+/**
+ * An object that can be identified by URN.
+ * Note the contract is that given o1, o2, then o1.equals(o2) iff o1.class.equals(o2.class) and o1.getUrn().equals(o2.getUrn())
+ */
+public interface URNIdentified {
+  /**
+   * URN for this object.
+   */
+  public String getUrn();
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b54e2818/gobblin-api/src/main/java/org/apache/gobblin/dataset/comparators/URNLexicographicalComparator.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/comparators/URNLexicographicalComparator.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/comparators/URNLexicographicalComparator.java
new file mode 100644
index 0000000..bef0461
--- /dev/null
+++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/comparators/URNLexicographicalComparator.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.dataset.comparators;
+
+import java.io.Serializable;
+import java.util.Comparator;
+
+import org.apache.gobblin.dataset.URNIdentified;
+
+import lombok.EqualsAndHashCode;
+
+
+/**
+ * Dataset comparator that compares by dataset urn.
+ */
+@EqualsAndHashCode
+public class URNLexicographicalComparator implements Comparator<URNIdentified>, Serializable {
+  private static final long serialVersionUID = 2647543651352156568L;
+
+  @Override
+  public int compare(URNIdentified o1, URNIdentified o2) {
+    return o1.getUrn().compareTo(o2.getUrn());
+  }
+
+  /**
+   * Compare against a raw URN.
+   */
+  public int compare(URNIdentified o1, String urn2) {
+    return o1.getUrn().compareTo(urn2);
+  }
+
+  /**
+   * Compare against a raw URN.
+   */
+  public int compare(String urn1, URNIdentified o2) {
+    return urn1.compareTo(o2.getUrn());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b54e2818/gobblin-api/src/main/java/org/apache/gobblin/dataset/comparators/package-info.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/comparators/package-info.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/comparators/package-info.java
new file mode 100644
index 0000000..fc8fe18
--- /dev/null
+++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/comparators/package-info.java
@@ -0,0 +1,4 @@
+/**
+ * Contains common dataset orders that {@link org.apache.gobblin.dataset.DatasetsFinder}s can push down.
+ */
+package org.apache.gobblin.dataset.comparators;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b54e2818/gobblin-api/src/main/java/org/apache/gobblin/dataset/test/SimpleDatasetForTesting.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/test/SimpleDatasetForTesting.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/test/SimpleDatasetForTesting.java
new file mode 100644
index 0000000..1136ef4
--- /dev/null
+++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/test/SimpleDatasetForTesting.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.dataset.test;
+
+import org.apache.gobblin.dataset.Dataset;
+
+import lombok.AllArgsConstructor;
+import lombok.EqualsAndHashCode;
+
+
+/**
+ * A dumb {@link Dataset} used for testing.
+ */
+@AllArgsConstructor
+@EqualsAndHashCode
+public class SimpleDatasetForTesting implements Dataset {
+  private final String urn;
+
+  @Override
+  public String datasetURN() {
+    return this.urn;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b54e2818/gobblin-api/src/main/java/org/apache/gobblin/dataset/test/SimpleDatasetPartitionForTesting.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/test/SimpleDatasetPartitionForTesting.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/test/SimpleDatasetPartitionForTesting.java
new file mode 100644
index 0000000..92624d9
--- /dev/null
+++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/test/SimpleDatasetPartitionForTesting.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.dataset.test;
+
+import org.apache.gobblin.dataset.Dataset;
+import org.apache.gobblin.dataset.PartitionableDataset;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+
+/**
+ * A dumb {@link org.apache.gobblin.dataset.PartitionableDataset.DatasetPartition} used for testing.
+ */
+@Data
+@EqualsAndHashCode
+public class SimpleDatasetPartitionForTesting implements PartitionableDataset.DatasetPartition {
+  private final String urn;
+  private Dataset dataset;
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b54e2818/gobblin-api/src/main/java/org/apache/gobblin/dataset/test/SimplePartitionableDatasetForTesting.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/test/SimplePartitionableDatasetForTesting.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/test/SimplePartitionableDatasetForTesting.java
new file mode 100644
index 0000000..849e080
--- /dev/null
+++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/test/SimplePartitionableDatasetForTesting.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.dataset.test;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Stream;
+
+import org.apache.gobblin.dataset.PartitionableDataset;
+
+import lombok.EqualsAndHashCode;
+
+
+/**
+ * A {@link PartitionableDataset} that just returns a predefined set of {@link SimpleDatasetPartitionForTesting} used for testing.
+ */
+@EqualsAndHashCode
+public class SimplePartitionableDatasetForTesting implements PartitionableDataset<SimpleDatasetPartitionForTesting> {
+  private final String urn;
+  private final List<SimpleDatasetPartitionForTesting> partitions;
+
+  public SimplePartitionableDatasetForTesting(String urn, List<SimpleDatasetPartitionForTesting> partitions) {
+    this.urn = urn;
+    this.partitions = partitions;
+    for (SimpleDatasetPartitionForTesting partition : this.partitions) {
+      partition.setDataset(this);
+    }
+  }
+
+  @Override
+  public String datasetURN() {
+    return this.urn;
+  }
+
+  @Override
+  public Stream<SimpleDatasetPartitionForTesting> getPartitions(int desiredCharacteristics,
+      Comparator<SimpleDatasetPartitionForTesting> suggestedOrder) throws IOException {
+    return this.partitions.stream();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b54e2818/gobblin-api/src/main/java/org/apache/gobblin/dataset/test/StaticDatasetsFinderForTesting.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/test/StaticDatasetsFinderForTesting.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/test/StaticDatasetsFinderForTesting.java
new file mode 100644
index 0000000..71f6add
--- /dev/null
+++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/test/StaticDatasetsFinderForTesting.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.dataset.test;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Stream;
+
+import org.apache.gobblin.dataset.Dataset;
+import org.apache.gobblin.dataset.IterableDatasetFinder;
+import org.apache.hadoop.fs.Path;
+
+import lombok.AllArgsConstructor;
+
+
+/**
+ * A {@link org.apache.gobblin.dataset.DatasetsFinder} that returns a predefined set of {@link Dataset}s for testing.
+ */
+@AllArgsConstructor
+public class StaticDatasetsFinderForTesting implements IterableDatasetFinder<Dataset> {
+
+  private final List<Dataset> datasets;
+
+  @Override
+  public List<Dataset> findDatasets() throws IOException {
+    return this.datasets;
+  }
+
+  @Override
+  public Path commonDatasetRoot() {
+    return null;
+  }
+
+  @Override
+  public Iterator<Dataset> getDatasetsIterator() throws IOException {
+    return this.datasets.iterator();
+  }
+
+  @Override
+  public Stream<Dataset> getDatasetsStream(int desiredCharacteristics, Comparator<Dataset> suggestedOrder)
+      throws IOException {
+    return this.datasets.stream();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b54e2818/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/BasicWorkUnitStream.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/BasicWorkUnitStream.java b/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/BasicWorkUnitStream.java
index 0d07312..86c813e 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/BasicWorkUnitStream.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/BasicWorkUnitStream.java
@@ -28,7 +28,6 @@ import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 
 import lombok.Getter;
-import lombok.Setter;
 
 
 /**
@@ -120,9 +119,7 @@ public class BasicWorkUnitStream implements WorkUnitStream {
   public static class Builder {
     private Iterator<WorkUnit> workUnits;
     private List<WorkUnit> workUnitList;
-    @Setter
     private boolean finiteStream = true;
-    @Setter
     private boolean safeToMaterialize = false;
 
 
@@ -136,6 +133,16 @@ public class BasicWorkUnitStream implements WorkUnitStream {
       this.finiteStream = true;
     }
 
+    public Builder setFiniteStream(boolean finiteStream) {
+      this.finiteStream = finiteStream;
+      return this;
+    }
+
+    public Builder setSafeToMaterialize(boolean safeToMaterialize) {
+      this.safeToMaterialize = safeToMaterialize;
+      return this;
+    }
+
     public WorkUnitStream build() {
       return new BasicWorkUnitStream(this.workUnits, this.workUnitList, this.finiteStream, this.safeToMaterialize);
     }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b54e2818/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CloseableFsCopySource.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CloseableFsCopySource.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CloseableFsCopySource.java
index f4cf4fa..d71d590 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CloseableFsCopySource.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CloseableFsCopySource.java
@@ -59,7 +59,7 @@ public class CloseableFsCopySource extends CopySource {
   @Override
   protected FileSystem getSourceFileSystem(State state)
       throws IOException {
-    return this.closer.register(super.getSourceFileSystem(state));
+    return this.closer.register(HadoopUtils.getSourceFileSystem(state));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b54e2818/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
index 5dd7f85..f60e5f0 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
@@ -145,8 +145,8 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> {
       DeprecationUtils.renameDeprecatedKeys(state, CopyConfiguration.MAX_COPY_PREFIX + "." + CopyResourcePool.ENTITIES_KEY,
           Lists.newArrayList(MAX_FILES_COPIED_KEY));
 
-      final FileSystem sourceFs = getSourceFileSystem(state);
-      final FileSystem targetFs = getTargetFileSystem(state);
+      final FileSystem sourceFs = HadoopUtils.getSourceFileSystem(state);
+      final FileSystem targetFs = HadoopUtils.getWriterFileSystem(state, 1, 0);
       state.setProp(SlaEventKeys.SOURCE_URI, sourceFs.getUri());
       state.setProp(SlaEventKeys.DESTINATION_URI, targetFs.getUri());
 
@@ -325,7 +325,7 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> {
 
     if (CopyableFile.class.isAssignableFrom(copyEntityClass)) {
       CopyableFile copyEntity = (CopyableFile) deserializeCopyEntity(state);
-      return extractorForCopyableFile(getSourceFileSystem(state), copyEntity, state);
+      return extractorForCopyableFile(HadoopUtils.getSourceFileSystem(state), copyEntity, state);
     }
     return new EmptyExtractor<>("empty");
   }
@@ -339,6 +339,10 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> {
   public void shutdown(SourceState state) {
   }
 
+  /**
+   * @deprecated use {@link HadoopUtils#getSourceFileSystem(State)}.
+   */
+  @Deprecated
   protected FileSystem getSourceFileSystem(State state)
       throws IOException {
     Configuration conf = HadoopUtils.getConfFromState(state, Optional.of(ConfigurationKeys.SOURCE_FILEBASED_ENCRYPTED_CONFIG_PATH));
@@ -346,6 +350,10 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> {
     return HadoopUtils.getOptionallyThrottledFileSystem(FileSystem.get(URI.create(uri), conf), state);
   }
 
+  /**
+   * @deprecated use {@link HadoopUtils#getWriterFileSystem(State, int, int)}.
+   */
+  @Deprecated
   private static FileSystem getTargetFileSystem(State state)
       throws IOException {
     return HadoopUtils.getOptionallyThrottledFileSystem(WriterUtils.getWriterFS(state, 1, 0), state);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b54e2818/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DatasetUtils.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DatasetUtils.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DatasetUtils.java
index 2c1e954..97dd2d9 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DatasetUtils.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DatasetUtils.java
@@ -28,6 +28,8 @@ import org.apache.hadoop.fs.PathFilter;
 
 import com.google.common.collect.Lists;
 
+import org.apache.gobblin.dataset.IterableDatasetFinder;
+import org.apache.gobblin.dataset.IterableDatasetFinderImpl;
 import org.apache.gobblin.data.management.copy.CopyableFile;
 import org.apache.gobblin.data.management.copy.CopyableFileFilter;
 import org.apache.gobblin.dataset.DatasetsFinder;
@@ -90,6 +92,13 @@ public class DatasetUtils {
     }
   }
 
+  public static <T extends org.apache.gobblin.dataset.Dataset> IterableDatasetFinder<T> instantiateIterableDatasetFinder(
+      Properties props, FileSystem fs, String default_class, Object... additionalArgs) throws IOException {
+    DatasetsFinder<T> datasetsFinder = instantiateDatasetFinder(props, fs, default_class, additionalArgs);
+    return datasetsFinder instanceof IterableDatasetFinder ? (IterableDatasetFinder<T>) datasetsFinder
+        : new IterableDatasetFinderImpl<>(datasetsFinder);
+  }
+
   /**
    * Instantiate a {@link PathFilter} from the class name at key {@link #PATH_FILTER_KEY} in props passed. If key
    * {@link #PATH_FILTER_KEY} is not set, a default {@link #ACCEPT_ALL_PATH_FILTER} is returned

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b54e2818/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/DatasetFinderSource.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/DatasetFinderSource.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/DatasetFinderSource.java
new file mode 100644
index 0000000..38fc7e2
--- /dev/null
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/DatasetFinderSource.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.source;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.data.management.dataset.DatasetUtils;
+import org.apache.gobblin.dataset.Dataset;
+import org.apache.gobblin.dataset.IterableDatasetFinder;
+import org.apache.gobblin.dataset.PartitionableDataset;
+import org.apache.gobblin.source.WorkUnitStreamSource;
+import org.apache.gobblin.source.workunit.BasicWorkUnitStream;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.source.workunit.WorkUnitStream;
+import org.apache.gobblin.util.HadoopUtils;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * An abstract source that uses a {@link org.apache.gobblin.dataset.DatasetsFinder} to find {@link Dataset}s and creates a
+ * work unit for each one.
+ */
+@Slf4j
+public abstract class DatasetFinderSource<S, D> implements WorkUnitStreamSource<S, D> {
+
+  protected final boolean drilldownIntoPartitions;
+
+  /**
+   * @param drilldownIntoPartitions if set to true, will process each partition of a {@link PartitionableDataset} as a
+   *                                separate work unit.
+   */
+  public DatasetFinderSource(boolean drilldownIntoPartitions) {
+    this.drilldownIntoPartitions = drilldownIntoPartitions;
+  }
+
+  /**
+   * @return the {@link WorkUnit} for the input dataset.
+   */
+  protected abstract WorkUnit workUnitForDataset(Dataset dataset);
+
+  /**
+   * @return the {@link WorkUnit} for the input partition.
+   */
+  protected abstract WorkUnit workUnitForDatasetPartition(PartitionableDataset.DatasetPartition partition);
+
+  @Override
+  public List<WorkUnit> getWorkunits(SourceState state) {
+    try {
+      return createWorkUnitStream(state).collect(Collectors.toList());
+    } catch (IOException ioe) {
+      throw new RuntimeException(ioe);
+    }
+  }
+
+  @Override
+  public WorkUnitStream getWorkunitStream(SourceState state) {
+    try {
+      return new BasicWorkUnitStream.Builder(createWorkUnitStream(state).iterator()).build();
+    } catch (IOException ioe) {
+      throw new RuntimeException(ioe);
+    }
+  }
+
+  /**
+   * Can be overriden to specify a non-pluggable {@link org.apache.gobblin.dataset.DatasetsFinder}.
+   * @throws IOException
+   */
+  protected IterableDatasetFinder createDatasetsFinder(SourceState state) throws IOException {
+    return DatasetUtils.instantiateIterableDatasetFinder(state.getProperties(),
+        HadoopUtils.getSourceFileSystem(state), null);
+  }
+
+  private Stream<WorkUnit> createWorkUnitStream(SourceState state) throws IOException {
+    IterableDatasetFinder datasetsFinder = createDatasetsFinder(state);
+
+    Stream<Dataset> datasetStream = datasetsFinder.getDatasetsStream(0, null);
+
+    if (this.drilldownIntoPartitions) {
+      return datasetStream.flatMap(dataset -> {
+        if (dataset instanceof PartitionableDataset) {
+          try {
+            return (Stream<PartitionableDataset.DatasetPartition>) ((PartitionableDataset) dataset).getPartitions(0,
+                null);
+          } catch (IOException ioe) {
+            log.error("Failed to get partitions for dataset " + dataset.getUrn());
+            return Stream.empty();
+          }
+        } else {
+          return Stream.of(new DatasetWrapper(dataset));
+        }
+      }).map(this::workUnitForPartitionInternal);
+    } else {
+      return datasetStream.map(this::workUnitForDataset);
+    }
+  }
+
+  private WorkUnit workUnitForPartitionInternal(PartitionableDataset.DatasetPartition partition) {
+    if (partition instanceof DatasetWrapper) {
+      return workUnitForDataset(((DatasetWrapper) partition).dataset);
+    } else {
+      return workUnitForDatasetPartition(partition);
+    }
+  }
+
+  /**
+   * A wrapper around a {@link org.apache.gobblin.dataset.PartitionableDataset.DatasetPartition} that makes it look
+   * like a {@link Dataset} for slightly easier to understand code.
+   */
+  @AllArgsConstructor
+  protected static class DatasetWrapper implements PartitionableDataset.DatasetPartition {
+    @Getter
+    private final Dataset dataset;
+
+    @Override
+    public String getUrn() {
+      return this.dataset.datasetURN();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b54e2818/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSource.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSource.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSource.java
new file mode 100644
index 0000000..4ca0dcb
--- /dev/null
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSource.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.source;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.Spliterator;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.dataset.Dataset;
+import org.apache.gobblin.dataset.IterableDatasetFinder;
+import org.apache.gobblin.dataset.PartitionableDataset;
+import org.apache.gobblin.dataset.URNIdentified;
+import org.apache.gobblin.dataset.comparators.URNLexicographicalComparator;
+import org.apache.gobblin.runtime.task.NoopTask;
+import org.apache.gobblin.source.workunit.BasicWorkUnitStream;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.source.workunit.WorkUnitStream;
+
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.PeekingIterator;
+
+import javax.annotation.Nullable;
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * A source that processes datasets generated by a {@link org.apache.gobblin.dataset.DatasetsFinder}, processing a few of
+ * them each run, and continuing from where it left off in the next run. When it is done processing all the datasets, it
+ * starts over from the beginning. The datasets are processed in lexicographical order based on URN.
+ *
+ * TODO: handle retries
+ */
+@Slf4j
+public abstract class LoopingDatasetFinderSource<S, D> extends DatasetFinderSource<S, D> {
+
+  public static final String MAX_WORK_UNITS_PER_RUN_KEY = "gobblin.source.loopingDatasetFinderSource.maxWorkUnitsPerRun";
+  public static final int MAX_WORK_UNITS_PER_RUN = 10;
+
+  private static final String DATASET_URN = "gobblin.source.loopingDatasetFinderSource.datasetUrn";
+  private static final String PARTITION_URN = "gobblin.source.loopingDatasetFinderSource.partitionUrn";
+  private static final String WORK_UNIT_ORDINAL = "gobblin.source.loopingDatasetFinderSource.workUnitOrdinal";
+  protected static final String END_OF_DATASETS_KEY = "gobblin.source.loopingDatasetFinderSource.endOfDatasets";
+
+  private final URNLexicographicalComparator lexicographicalComparator = new URNLexicographicalComparator();
+
+  /**
+   * @param drilldownIntoPartitions if set to true, will process each partition of a {@link PartitionableDataset} as a
+   *                                separate work unit.
+   */
+  public LoopingDatasetFinderSource(boolean drilldownIntoPartitions) {
+    super(drilldownIntoPartitions);
+  }
+
+  @Override
+  public List<WorkUnit> getWorkunits(SourceState state) {
+    return Lists.newArrayList(getWorkunitStream(state).getMaterializedWorkUnitCollection());
+  }
+
+  @Override
+  public WorkUnitStream getWorkunitStream(SourceState state) {
+    try {
+      int maxWorkUnits = state.getPropAsInt(MAX_WORK_UNITS_PER_RUN_KEY, MAX_WORK_UNITS_PER_RUN);
+
+      List<WorkUnitState> previousWorkUnitStates = state.getPreviousWorkUnitStates();
+      Optional<WorkUnitState> maxWorkUnit;
+      try {
+        maxWorkUnit = previousWorkUnitStates.stream().reduce((wu1, wu2) -> {
+          int wu1Ordinal = wu1.getPropAsInt(WORK_UNIT_ORDINAL);
+          int wu2Ordinal = wu2.getPropAsInt(WORK_UNIT_ORDINAL);
+          return wu1Ordinal > wu2Ordinal ? wu1 : wu2;
+        });
+      } catch (NumberFormatException nfe) {
+        throw new RuntimeException("Work units in state store are corrupted! Missing or malformed " + WORK_UNIT_ORDINAL);
+      }
+
+      String previousDatasetUrnWatermark = null;
+      String previousPartitionUrnWatermark = null;
+      if (maxWorkUnit.isPresent() && !maxWorkUnit.get().getPropAsBoolean(END_OF_DATASETS_KEY, false)) {
+        previousDatasetUrnWatermark = maxWorkUnit.get().getProp(DATASET_URN);
+        previousPartitionUrnWatermark = maxWorkUnit.get().getProp(PARTITION_URN);
+      }
+
+      IterableDatasetFinder datasetsFinder = createDatasetsFinder(state);
+
+      Stream<Dataset> datasetStream = datasetsFinder.getDatasetsStream(Spliterator.SORTED, this.lexicographicalComparator);
+      datasetStream = sortStreamLexicographically(datasetStream);
+
+      return new BasicWorkUnitStream.Builder(new DeepIterator(datasetStream.iterator(), previousDatasetUrnWatermark,
+          previousPartitionUrnWatermark, maxWorkUnits)).setFiniteStream(true).build();
+    } catch (IOException ioe) {
+      throw new RuntimeException(ioe);
+    }
+  }
+
+  /**
+   * A deep iterator that advances input streams until the correct position, then possibly iterates over partitions
+   * of {@link PartitionableDataset}s.
+   */
+  private class DeepIterator extends AbstractIterator<WorkUnit> {
+    private final Iterator<Dataset> baseIterator;
+    private final int maxWorkUnits;
+
+    private Iterator<PartitionableDataset.DatasetPartition> currentPartitionIterator;
+    private int generatedWorkUnits = 0;
+
+    public DeepIterator(Iterator<Dataset> baseIterator, String previousDatasetUrnWatermark,
+        String previousPartitionUrnWatermark, int maxWorkUnits) {
+      this.maxWorkUnits = maxWorkUnits;
+      this.baseIterator = baseIterator;
+
+      Dataset equalDataset = advanceUntilLargerThan(Iterators.peekingIterator(this.baseIterator), previousDatasetUrnWatermark);
+
+      if (drilldownIntoPartitions && equalDataset != null && equalDataset instanceof PartitionableDataset) {
+        this.currentPartitionIterator = getPartitionIterator((PartitionableDataset) equalDataset);
+        advanceUntilLargerThan(Iterators.peekingIterator(this.currentPartitionIterator), previousPartitionUrnWatermark);
+      } else {
+        this.currentPartitionIterator = Iterators.emptyIterator();
+      }
+    }
+
+    /**
+     * Advance an iterator until the next value is larger than the reference.
+     * @return the last value polled if it is equal to reference, or null otherwise.
+     */
+    @Nullable  private <T extends URNIdentified> T advanceUntilLargerThan(PeekingIterator<T> it, String reference) {
+      if (reference == null) {
+        return null;
+      }
+
+      int comparisonResult = -1;
+      while (it.hasNext() && (comparisonResult = lexicographicalComparator.compare(it.peek(), reference)) < 0) {
+        it.next();
+      }
+      return comparisonResult == 0 ? it.next() : null;
+    }
+
+    private Iterator<PartitionableDataset.DatasetPartition> getPartitionIterator(PartitionableDataset dataset) {
+      try {
+        return this.currentPartitionIterator = sortStreamLexicographically(
+            dataset.getPartitions(Spliterator.SORTED, LoopingDatasetFinderSource.this.lexicographicalComparator)).iterator();
+      } catch (IOException ioe) {
+        log.error("Failed to get partitions for dataset " + dataset.getUrn());
+        return Iterators.emptyIterator();
+      }
+    }
+
+    @Override
+    protected WorkUnit computeNext() {
+      if (this.generatedWorkUnits >= this.maxWorkUnits) {
+        return endOfData();
+      }
+
+      while (this.baseIterator.hasNext() || this.currentPartitionIterator.hasNext()) {
+        if (this.currentPartitionIterator != null && this.currentPartitionIterator.hasNext()) {
+          PartitionableDataset.DatasetPartition partition = this.currentPartitionIterator.next();
+          WorkUnit workUnit = workUnitForDatasetPartition(partition);
+          addDatasetInfoToWorkUnit(workUnit, partition.getDataset(), this.generatedWorkUnits++);
+          addPartitionInfoToWorkUnit(workUnit, partition);
+          return workUnit;
+        }
+
+        Dataset dataset = this.baseIterator.next();
+        if (drilldownIntoPartitions && dataset instanceof PartitionableDataset) {
+          this.currentPartitionIterator = getPartitionIterator((PartitionableDataset) dataset);
+        } else {
+          WorkUnit workUnit = workUnitForDataset(dataset);
+          addDatasetInfoToWorkUnit(workUnit, dataset, this.generatedWorkUnits++);
+          return workUnit;
+        }
+      }
+
+      WorkUnit workUnit = NoopTask.noopWorkunit();
+      workUnit.setProp(WORK_UNIT_ORDINAL, this.generatedWorkUnits);
+
+      this.generatedWorkUnits = Integer.MAX_VALUE;
+
+      workUnit.setProp(END_OF_DATASETS_KEY, true);
+      return workUnit;
+    }
+
+    private void addDatasetInfoToWorkUnit(WorkUnit workUnit, Dataset dataset, int workUnitOrdinal) {
+      workUnit.setProp(DATASET_URN, dataset.getUrn());
+      workUnit.setProp(WORK_UNIT_ORDINAL, workUnitOrdinal);
+    }
+
+    private void addPartitionInfoToWorkUnit(WorkUnit workUnit, PartitionableDataset.DatasetPartition partition) {
+      workUnit.setProp(PARTITION_URN, partition.getUrn());
+    }
+  }
+
+  /**
+   * Sort input stream lexicographically. Noop if the input stream is already sorted.
+   */
+  private <T extends URNIdentified> Stream<T> sortStreamLexicographically(Stream<T> inputStream) {
+    Spliterator<T> spliterator = inputStream.spliterator();
+    if (spliterator.hasCharacteristics(Spliterator.SORTED) &&
+        spliterator.getComparator().equals(this.lexicographicalComparator)) {
+      return StreamSupport.stream(spliterator, false);
+    }
+    return StreamSupport.stream(spliterator, false).sorted(this.lexicographicalComparator);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b54e2818/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/source/DatasetFinderSourceTest.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/source/DatasetFinderSourceTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/source/DatasetFinderSourceTest.java
new file mode 100644
index 0000000..0e34b7a
--- /dev/null
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/source/DatasetFinderSourceTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.source;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Stream;
+
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.dataset.Dataset;
+import org.apache.gobblin.dataset.IterableDatasetFinder;
+import org.apache.gobblin.dataset.PartitionableDataset;
+import org.apache.gobblin.dataset.test.SimpleDatasetForTesting;
+import org.apache.gobblin.dataset.test.SimpleDatasetPartitionForTesting;
+import org.apache.gobblin.dataset.test.SimplePartitionableDatasetForTesting;
+import org.apache.gobblin.dataset.test.StaticDatasetsFinderForTesting;
+import org.apache.gobblin.source.extractor.Extractor;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.source.workunit.WorkUnitStream;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Lists;
+
+
+public class DatasetFinderSourceTest {
+
+  public static final String DATASET_URN = "test.datasetUrn";
+  public static final String PARTITION_URN = "test.partitionUrn";
+
+  @Test
+  public void testNonDrilledDown() {
+
+    Dataset dataset1 = new SimpleDatasetForTesting("dataset1");
+    Dataset dataset2 = new SimplePartitionableDatasetForTesting("dataset2", Lists.newArrayList(new SimpleDatasetPartitionForTesting("p1"), new SimpleDatasetPartitionForTesting("p2")));
+    Dataset dataset3 = new SimpleDatasetForTesting("dataset3");
+
+    IterableDatasetFinder finder = new StaticDatasetsFinderForTesting(Lists.newArrayList(dataset1, dataset2, dataset3));
+
+    MySource mySource = new MySource(false, finder);
+    List<WorkUnit> workUnits = mySource.getWorkunits(new SourceState());
+
+    Assert.assertEquals(workUnits.size(), 3);
+    Assert.assertEquals(workUnits.get(0).getProp(DATASET_URN), "dataset1");
+    Assert.assertNull(workUnits.get(0).getProp(PARTITION_URN));
+    Assert.assertEquals(workUnits.get(1).getProp(DATASET_URN), "dataset2");
+    Assert.assertNull(workUnits.get(1).getProp(PARTITION_URN));
+    Assert.assertEquals(workUnits.get(2).getProp(DATASET_URN), "dataset3");
+    Assert.assertNull(workUnits.get(2).getProp(PARTITION_URN));
+
+    WorkUnitStream workUnitStream = mySource.getWorkunitStream(new SourceState());
+
+    Assert.assertEquals(Lists.newArrayList(workUnitStream.getWorkUnits()), workUnits);
+  }
+
+  @Test
+  public void testDrilledDown() {
+    Dataset dataset1 = new SimpleDatasetForTesting("dataset1");
+    Dataset dataset2 = new SimplePartitionableDatasetForTesting("dataset2", Lists.newArrayList(new SimpleDatasetPartitionForTesting("p1"), new SimpleDatasetPartitionForTesting("p2")));
+    Dataset dataset3 = new SimpleDatasetForTesting("dataset3");
+
+    IterableDatasetFinder finder = new StaticDatasetsFinderForTesting(Lists.newArrayList(dataset1, dataset2, dataset3));
+
+    MySource mySource = new MySource(true, finder);
+    List<WorkUnit> workUnits = mySource.getWorkunits(new SourceState());
+
+    Assert.assertEquals(workUnits.size(), 4);
+    Assert.assertEquals(workUnits.get(0).getProp(DATASET_URN), "dataset1");
+    Assert.assertNull(workUnits.get(0).getProp(PARTITION_URN));
+    Assert.assertEquals(workUnits.get(1).getProp(DATASET_URN), "dataset2");
+    Assert.assertEquals(workUnits.get(1).getProp(PARTITION_URN), "p1");
+    Assert.assertEquals(workUnits.get(2).getProp(DATASET_URN), "dataset2");
+    Assert.assertEquals(workUnits.get(2).getProp(PARTITION_URN), "p2");
+    Assert.assertEquals(workUnits.get(3).getProp(DATASET_URN), "dataset3");
+    Assert.assertNull(workUnits.get(3).getProp(PARTITION_URN));
+
+    WorkUnitStream workUnitStream = mySource.getWorkunitStream(new SourceState());
+
+    Assert.assertEquals(Lists.newArrayList(workUnitStream.getWorkUnits()), workUnits);
+  }
+
+  public static class MySource extends DatasetFinderSource<String, String> {
+    private final IterableDatasetFinder datasetsFinder;
+
+    public MySource(boolean drilldownIntoPartitions, IterableDatasetFinder datasetsFinder) {
+      super(drilldownIntoPartitions);
+      this.datasetsFinder = datasetsFinder;
+    }
+
+    @Override
+    public Extractor<String, String> getExtractor(WorkUnitState state) throws IOException {
+      return null;
+    }
+
+    @Override
+    protected WorkUnit workUnitForDataset(Dataset dataset) {
+      WorkUnit workUnit = new WorkUnit();
+      workUnit.setProp(DATASET_URN, dataset.getUrn());
+      return workUnit;
+    }
+
+    @Override
+    protected WorkUnit workUnitForDatasetPartition(PartitionableDataset.DatasetPartition partition) {
+      WorkUnit workUnit = new WorkUnit();
+      workUnit.setProp(DATASET_URN, partition.getDataset().getUrn());
+      workUnit.setProp(PARTITION_URN, partition.getUrn());
+      return workUnit;
+    }
+
+    @Override
+    public void shutdown(SourceState state) {
+
+    }
+
+    @Override
+    protected IterableDatasetFinder createDatasetsFinder(SourceState state) throws IOException {
+      return this.datasetsFinder;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b54e2818/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSourceTest.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSourceTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSourceTest.java
new file mode 100644
index 0000000..76fe172
--- /dev/null
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSourceTest.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.source;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.dataset.Dataset;
+import org.apache.gobblin.dataset.IterableDatasetFinder;
+import org.apache.gobblin.dataset.PartitionableDataset;
+import org.apache.gobblin.dataset.test.SimpleDatasetForTesting;
+import org.apache.gobblin.dataset.test.SimpleDatasetPartitionForTesting;
+import org.apache.gobblin.dataset.test.SimplePartitionableDatasetForTesting;
+import org.apache.gobblin.dataset.test.StaticDatasetsFinderForTesting;
+import org.apache.gobblin.source.extractor.Extractor;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.source.workunit.WorkUnitStream;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Lists;
+
+public class LoopingDatasetFinderSourceTest {
+
+  @Test
+  public void testNonDrilldown() {
+    Dataset dataset1 = new SimpleDatasetForTesting("dataset1");
+    Dataset dataset2 = new SimplePartitionableDatasetForTesting("dataset2", Lists.newArrayList(new SimpleDatasetPartitionForTesting("p1"), new SimpleDatasetPartitionForTesting("p2")));
+    Dataset dataset3 = new SimpleDatasetForTesting("dataset3");
+    Dataset dataset4 = new SimpleDatasetForTesting("dataset4");
+    Dataset dataset5 = new SimpleDatasetForTesting("dataset5");
+
+    IterableDatasetFinder finder = new StaticDatasetsFinderForTesting(
+        Lists.newArrayList(dataset5, dataset4, dataset3, dataset2, dataset1));
+
+    MySource mySource = new MySource(false, finder);
+
+    SourceState sourceState = new SourceState();
+    sourceState.setProp(LoopingDatasetFinderSource.MAX_WORK_UNITS_PER_RUN_KEY, 3);
+
+    WorkUnitStream workUnitStream = mySource.getWorkunitStream(sourceState);
+    List<WorkUnit> workUnits = Lists.newArrayList(workUnitStream.getWorkUnits());
+
+    Assert.assertEquals(workUnits.size(), 3);
+    Assert.assertEquals(workUnits.get(0).getProp(DatasetFinderSourceTest.DATASET_URN), "dataset1");
+    Assert.assertNull(workUnits.get(0).getProp(DatasetFinderSourceTest.PARTITION_URN));
+    Assert.assertEquals(workUnits.get(1).getProp(DatasetFinderSourceTest.DATASET_URN), "dataset2");
+    Assert.assertNull(workUnits.get(1).getProp(DatasetFinderSourceTest.PARTITION_URN));
+    Assert.assertEquals(workUnits.get(2).getProp(DatasetFinderSourceTest.DATASET_URN), "dataset3");
+    Assert.assertNull(workUnits.get(2).getProp(DatasetFinderSourceTest.PARTITION_URN));
+
+    // Second run should continue where it left off
+    List<WorkUnitState> workUnitStates = workUnits.stream().map(WorkUnitState::new).collect(Collectors.toList());
+    SourceState sourceStateSpy = Mockito.spy(sourceState);
+    Mockito.doReturn(workUnitStates).when(sourceStateSpy).getPreviousWorkUnitStates();
+
+    workUnitStream = mySource.getWorkunitStream(sourceStateSpy);
+    workUnits = Lists.newArrayList(workUnitStream.getWorkUnits());
+
+    Assert.assertEquals(workUnits.size(), 3);
+    Assert.assertEquals(workUnits.get(0).getProp(DatasetFinderSourceTest.DATASET_URN), "dataset4");
+    Assert.assertNull(workUnits.get(0).getProp(DatasetFinderSourceTest.PARTITION_URN));
+    Assert.assertEquals(workUnits.get(1).getProp(DatasetFinderSourceTest.DATASET_URN), "dataset5");
+    Assert.assertNull(workUnits.get(1).getProp(DatasetFinderSourceTest.PARTITION_URN));
+    Assert.assertTrue(workUnits.get(2).getPropAsBoolean(LoopingDatasetFinderSource.END_OF_DATASETS_KEY));
+
+    // Loop around
+    workUnitStates = workUnits.stream().map(WorkUnitState::new).collect(Collectors.toList());
+    Mockito.doReturn(workUnitStates).when(sourceStateSpy).getPreviousWorkUnitStates();
+
+    workUnitStream = mySource.getWorkunitStream(sourceStateSpy);
+    workUnits = Lists.newArrayList(workUnitStream.getWorkUnits());
+
+    Assert.assertEquals(workUnits.size(), 3);
+    Assert.assertEquals(workUnits.get(0).getProp(DatasetFinderSourceTest.DATASET_URN), "dataset1");
+    Assert.assertNull(workUnits.get(0).getProp(DatasetFinderSourceTest.PARTITION_URN));
+    Assert.assertEquals(workUnits.get(1).getProp(DatasetFinderSourceTest.DATASET_URN), "dataset2");
+    Assert.assertNull(workUnits.get(1).getProp(DatasetFinderSourceTest.PARTITION_URN));
+    Assert.assertEquals(workUnits.get(2).getProp(DatasetFinderSourceTest.DATASET_URN), "dataset3");
+    Assert.assertNull(workUnits.get(2).getProp(DatasetFinderSourceTest.PARTITION_URN));
+  }
+
+  @Test
+  public void testDrilldown() {
+    // Create three datasets, two of them partitioned
+    Dataset dataset1 = new SimpleDatasetForTesting("dataset1");
+    Dataset dataset2 = new SimplePartitionableDatasetForTesting("dataset2",
+        Lists.newArrayList(new SimpleDatasetPartitionForTesting("p1"),
+            new SimpleDatasetPartitionForTesting("p2"), new SimpleDatasetPartitionForTesting("p3")));
+    Dataset dataset3 = new SimplePartitionableDatasetForTesting("dataset3",
+        Lists.newArrayList(new SimpleDatasetPartitionForTesting("p1"),
+            new SimpleDatasetPartitionForTesting("p2"), new SimpleDatasetPartitionForTesting("p3")));
+
+    IterableDatasetFinder finder = new StaticDatasetsFinderForTesting(
+        Lists.newArrayList(dataset3, dataset2, dataset1));
+
+    MySource mySource = new MySource(true, finder);
+
+    // Limit to 3 wunits per run
+    SourceState sourceState = new SourceState();
+    sourceState.setProp(LoopingDatasetFinderSource.MAX_WORK_UNITS_PER_RUN_KEY, 3);
+
+    // first run, get three first work units
+    WorkUnitStream workUnitStream = mySource.getWorkunitStream(sourceState);
+    List<WorkUnit> workUnits = Lists.newArrayList(workUnitStream.getWorkUnits());
+
+    Assert.assertEquals(workUnits.size(), 3);
+    Assert.assertEquals(workUnits.get(0).getProp(DatasetFinderSourceTest.DATASET_URN), "dataset1");
+    Assert.assertNull(workUnits.get(0).getProp(DatasetFinderSourceTest.PARTITION_URN));
+    Assert.assertEquals(workUnits.get(1).getProp(DatasetFinderSourceTest.DATASET_URN), "dataset2");
+    Assert.assertEquals(workUnits.get(1).getProp(DatasetFinderSourceTest.PARTITION_URN), "p1");
+    Assert.assertEquals(workUnits.get(2).getProp(DatasetFinderSourceTest.DATASET_URN), "dataset2");
+    Assert.assertEquals(workUnits.get(2).getProp(DatasetFinderSourceTest.PARTITION_URN), "p2");
+
+    // Second run should continue where it left off
+    List<WorkUnitState> workUnitStates = workUnits.stream().map(WorkUnitState::new).collect(Collectors.toList());
+    SourceState sourceStateSpy = Mockito.spy(sourceState);
+    Mockito.doReturn(workUnitStates).when(sourceStateSpy).getPreviousWorkUnitStates();
+
+    workUnitStream = mySource.getWorkunitStream(sourceStateSpy);
+    workUnits = Lists.newArrayList(workUnitStream.getWorkUnits());
+
+    Assert.assertEquals(workUnits.size(), 3);
+    Assert.assertEquals(workUnits.get(0).getProp(DatasetFinderSourceTest.DATASET_URN), "dataset2");
+    Assert.assertEquals(workUnits.get(0).getProp(DatasetFinderSourceTest.PARTITION_URN), "p3");
+    Assert.assertEquals(workUnits.get(1).getProp(DatasetFinderSourceTest.DATASET_URN), "dataset3");
+    Assert.assertEquals(workUnits.get(1).getProp(DatasetFinderSourceTest.PARTITION_URN), "p1");
+    Assert.assertEquals(workUnits.get(2).getProp(DatasetFinderSourceTest.DATASET_URN), "dataset3");
+    Assert.assertEquals(workUnits.get(2).getProp(DatasetFinderSourceTest.PARTITION_URN), "p2");
+
+    // third run, continue from where it left off
+    workUnitStates = workUnits.stream().map(WorkUnitState::new).collect(Collectors.toList());
+    Mockito.doReturn(workUnitStates).when(sourceStateSpy).getPreviousWorkUnitStates();
+
+    workUnitStream = mySource.getWorkunitStream(sourceStateSpy);
+    workUnits = Lists.newArrayList(workUnitStream.getWorkUnits());
+
+    Assert.assertEquals(workUnits.size(), 2);
+    Assert.assertEquals(workUnits.get(0).getProp(DatasetFinderSourceTest.DATASET_URN), "dataset3");
+    Assert.assertEquals(workUnits.get(0).getProp(DatasetFinderSourceTest.PARTITION_URN), "p3");
+    Assert.assertTrue(workUnits.get(1).getPropAsBoolean(LoopingDatasetFinderSource.END_OF_DATASETS_KEY));
+
+    // fourth run, finished all work units, loop around
+    workUnitStates = workUnits.stream().map(WorkUnitState::new).collect(Collectors.toList());
+    Mockito.doReturn(workUnitStates).when(sourceStateSpy).getPreviousWorkUnitStates();
+
+    workUnitStream = mySource.getWorkunitStream(sourceStateSpy);
+    workUnits = Lists.newArrayList(workUnitStream.getWorkUnits());
+
+    Assert.assertEquals(workUnits.size(), 3);
+    Assert.assertEquals(workUnits.get(0).getProp(DatasetFinderSourceTest.DATASET_URN), "dataset1");
+    Assert.assertNull(workUnits.get(0).getProp(DatasetFinderSourceTest.PARTITION_URN));
+    Assert.assertEquals(workUnits.get(1).getProp(DatasetFinderSourceTest.DATASET_URN), "dataset2");
+    Assert.assertEquals(workUnits.get(1).getProp(DatasetFinderSourceTest.PARTITION_URN), "p1");
+    Assert.assertEquals(workUnits.get(2).getProp(DatasetFinderSourceTest.DATASET_URN), "dataset2");
+    Assert.assertEquals(workUnits.get(2).getProp(DatasetFinderSourceTest.PARTITION_URN), "p2");
+  }
+
+  public static class MySource extends LoopingDatasetFinderSource<String, String> {
+    private final IterableDatasetFinder datasetsFinder;
+
+    public MySource(boolean drilldownIntoPartitions, IterableDatasetFinder datasetsFinder) {
+      super(drilldownIntoPartitions);
+      this.datasetsFinder = datasetsFinder;
+    }
+
+    @Override
+    public Extractor<String, String> getExtractor(WorkUnitState state) throws IOException {
+      return null;
+    }
+
+    @Override
+    protected WorkUnit workUnitForDataset(Dataset dataset) {
+      WorkUnit workUnit = new WorkUnit();
+      workUnit.setProp(DatasetFinderSourceTest.DATASET_URN, dataset.getUrn());
+      return workUnit;
+    }
+
+    @Override
+    protected WorkUnit workUnitForDatasetPartition(PartitionableDataset.DatasetPartition partition) {
+      WorkUnit workUnit = new WorkUnit();
+      workUnit.setProp(DatasetFinderSourceTest.DATASET_URN, partition.getDataset().getUrn());
+      workUnit.setProp(DatasetFinderSourceTest.PARTITION_URN, partition.getUrn());
+      return workUnit;
+    }
+
+    @Override
+    public void shutdown(SourceState state) {
+
+    }
+
+    @Override
+    protected IterableDatasetFinder createDatasetsFinder(SourceState state) throws IOException {
+      return this.datasetsFinder;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b54e2818/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/task/NoopTask.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/task/NoopTask.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/task/NoopTask.java
new file mode 100644
index 0000000..8bbb6e2
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/task/NoopTask.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.task;
+
+import org.apache.gobblin.publisher.DataPublisher;
+import org.apache.gobblin.publisher.NoopPublisher;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskContext;
+import org.apache.gobblin.source.workunit.WorkUnit;
+
+
+/**
+ * A task that does nothing. Usually used for transferring state from one job to the next.
+ */
+public class NoopTask extends BaseAbstractTask {
+
+  /**
+   * @return A {@link WorkUnit} that will run a {@link NoopTask}.
+   */
+  public static WorkUnit noopWorkunit() {
+    WorkUnit workUnit = new WorkUnit();
+    TaskUtils.setTaskFactoryClass(workUnit, Factory.class);
+    return workUnit;
+  }
+
+  /**
+   * The factory for a {@link NoopTask}.
+   */
+  public static class Factory implements TaskFactory {
+    @Override
+    public TaskIFace createTask(TaskContext taskContext) {
+      return new NoopTask(taskContext);
+    }
+
+    @Override
+    public DataPublisher createDataPublisher(JobState.DatasetState datasetState) {
+      return new NoopPublisher(datasetState);
+    }
+  }
+
+  private NoopTask(TaskContext taskContext) {
+    super(taskContext);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b54e2818/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java
index b1bd6cc..8d186a6 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java
@@ -26,6 +26,7 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.net.URI;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map.Entry;
@@ -920,4 +921,23 @@ public class HadoopUtils {
   public static void addGobblinSite() {
     Configuration.addDefaultResource("gobblin-site.xml");
   }
+
+  /**
+   * Get a {@link FileSystem} object for the uri specified at {@link ConfigurationKeys#SOURCE_FILEBASED_FS_URI}.
+   * @throws IOException
+   */
+  public static FileSystem getSourceFileSystem(State state) throws IOException {
+    Configuration conf = HadoopUtils.getConfFromState(state, Optional.of(ConfigurationKeys.SOURCE_FILEBASED_ENCRYPTED_CONFIG_PATH));
+    String uri = state.getProp(ConfigurationKeys.SOURCE_FILEBASED_FS_URI, ConfigurationKeys.LOCAL_FS_URI);
+    return HadoopUtils.getOptionallyThrottledFileSystem(FileSystem.get(URI.create(uri), conf), state);
+  }
+
+  /**
+   * Get a {@link FileSystem} object for the uri specified at {@link ConfigurationKeys#WRITER_FILE_SYSTEM_URI}.
+   * @throws IOException
+   */
+  public static FileSystem getWriterFileSystem(State state, int numBranches, int branchId)
+      throws IOException {
+    return HadoopUtils.getOptionallyThrottledFileSystem(WriterUtils.getWriterFS(state, numBranches, branchId), state);
+  }
 }


Mime
View raw message