gobblin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ibuen...@apache.org
Subject incubator-gobblin git commit: [GOBBLIN-200] Add cleanable state store dataset
Date Fri, 29 Sep 2017 17:39:13 GMT
Repository: incubator-gobblin
Updated Branches:
  refs/heads/master e67799948 -> 70cbe91b9


[GOBBLIN-200] Add cleanable state store dataset

Closes #2097 from jack-moseley/state_store_cleaner


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/70cbe91b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/70cbe91b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/70cbe91b

Branch: refs/heads/master
Commit: 70cbe91b9ff2e3d83dae8f36c4abb4a2cbbc425b
Parents: e677999
Author: Jack Moseley <jmoseley@linkedin.com>
Authored: Fri Sep 29 10:39:06 2017 -0700
Committer: Issac Buenrostro <ibuenros@apache.org>
Committed: Fri Sep 29 10:39:06 2017 -0700

----------------------------------------------------------------------
 .../data/management/dataset/DummyDataset.java   |  3 +-
 .../retention/dataset/CleanableDataset.java     |  6 +-
 .../dataset/CleanableDatasetStoreDataset.java   | 59 +++++++++++++++++
 .../retention/dataset/CleanableHiveDataset.java |  3 +-
 .../dataset/TimeBasedDatasetStoreDataset.java   | 57 ++++++++++++++++
 .../TimeBasedDatasetStoreDatasetFinder.java     | 49 ++++++++++++++
 .../version/DatasetStateStoreVersion.java       | 30 +++++++++
 .../TimestampedDatasetStateStoreVersion.java    | 52 +++++++++++++++
 ...mestampedDatasetStateStoreVersionFinder.java | 51 +++++++++++++++
 .../CleanableDatasetStoreDatasetTest.java       | 69 ++++++++++++++++++++
 .../main/resources/state-store-retention.pull   | 25 +++++++
 .../metadata/StateStoreEntryManager.java        |  2 +-
 .../CleanableHivePartitionDataset.java          |  3 +-
 13 files changed, 401 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/70cbe91b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DummyDataset.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DummyDataset.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DummyDataset.java
index 3d7cc7d..0c0ab35 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DummyDataset.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DummyDataset.java
@@ -22,6 +22,7 @@ import lombok.RequiredArgsConstructor;
 import java.io.IOException;
 import java.util.Collection;
 
+import org.apache.gobblin.dataset.FileSystemDataset;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
@@ -37,7 +38,7 @@ import org.apache.gobblin.data.management.retention.dataset.CleanableDataset;
  * Dummy {@link Dataset} that does nothing.
  */
 @RequiredArgsConstructor
-public class DummyDataset implements CopyableDataset, CleanableDataset {
+public class DummyDataset implements CopyableDataset, CleanableDataset, FileSystemDataset
{
 
   private final Path datasetRoot;
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/70cbe91b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/CleanableDataset.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/CleanableDataset.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/CleanableDataset.java
index 033d0aa..b2e55c4 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/CleanableDataset.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/CleanableDataset.java
@@ -17,17 +17,15 @@
 
 package org.apache.gobblin.data.management.retention.dataset;
 
-import org.apache.gobblin.dataset.Dataset;
-import org.apache.gobblin.dataset.FileSystemDataset;
-
 import java.io.IOException;
+import org.apache.gobblin.dataset.Dataset;
 
 
 /**
  * An abstraction for a set of files where a simple {@link org.apache.gobblin.data.management.retention.policy.RetentionPolicy}
  * can be applied.
  */
-public interface CleanableDataset extends Dataset, FileSystemDataset {
+public interface CleanableDataset extends Dataset {
 
   /**
    * Cleans the {@link CleanableDataset}. In general, this means to apply a

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/70cbe91b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/CleanableDatasetStoreDataset.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/CleanableDatasetStoreDataset.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/CleanableDatasetStoreDataset.java
new file mode 100644
index 0000000..9f29c9a
--- /dev/null
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/CleanableDatasetStoreDataset.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.retention.dataset;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import org.apache.gobblin.data.management.policy.VersionSelectionPolicy;
+import org.apache.gobblin.data.management.version.DatasetStateStoreVersion;
+import org.apache.gobblin.data.management.version.DatasetVersion;
+import org.apache.gobblin.data.management.version.finder.VersionFinder;
+import org.apache.gobblin.metastore.DatasetStoreDataset;
+import org.apache.gobblin.metastore.metadata.DatasetStateStoreEntryManager;
+import com.google.common.collect.Lists;
+
+
+/**
+ * A cleanable {@link DatasetStoreDataset}
+ */
+public abstract class CleanableDatasetStoreDataset<T extends DatasetVersion> extends
DatasetStoreDataset implements CleanableDataset {
+
+  public CleanableDatasetStoreDataset(DatasetStoreDataset.Key key, List<DatasetStateStoreEntryManager>
entries) {
+    super(key, entries);
+  }
+
+  public abstract VersionFinder<? extends T> getVersionFinder();
+
+  public abstract VersionSelectionPolicy<T> getVersionSelectionPolicy();
+
+  @Override
+  public void clean() throws IOException {
+
+    List<T> versions = Lists.newArrayList(this.getVersionFinder().findDatasetVersions(this));
+
+    Collections.sort(versions, Collections.reverseOrder());
+
+    Collection<T> deletableVersions = this.getVersionSelectionPolicy().listSelectedVersions(versions);
+
+    for (Object version : deletableVersions) {
+      ((DatasetStateStoreVersion) version).getEntry().delete();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/70cbe91b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/CleanableHiveDataset.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/CleanableHiveDataset.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/CleanableHiveDataset.java
index 282a2a6..97cec4c 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/CleanableHiveDataset.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/CleanableHiveDataset.java
@@ -28,6 +28,7 @@ import java.util.Set;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.gobblin.dataset.FileSystemDataset;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.metadata.Partition;
@@ -66,7 +67,7 @@ import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 @Slf4j
 @SuppressWarnings({ "rawtypes", "unchecked" })
 @Getter
-public class CleanableHiveDataset extends HiveDataset implements CleanableDataset {
+public class CleanableHiveDataset extends HiveDataset implements CleanableDataset, FileSystemDataset
{
 
   private static final String SHOULD_DELETE_DATA_KEY = "gobblin.retention.hive.shouldDeleteData";
   private static final String SHOULD_DELETE_DATA_DEFAULT = Boolean.toString(false);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/70cbe91b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/TimeBasedDatasetStoreDataset.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/TimeBasedDatasetStoreDataset.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/TimeBasedDatasetStoreDataset.java
new file mode 100644
index 0000000..0fa7c18
--- /dev/null
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/TimeBasedDatasetStoreDataset.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.retention.dataset;
+
+import java.util.List;
+import java.util.Properties;
+import org.apache.gobblin.data.management.policy.SelectBeforeTimeBasedPolicy;
+import org.apache.gobblin.data.management.policy.VersionSelectionPolicy;
+import org.apache.gobblin.data.management.version.TimestampedDatasetStateStoreVersion;
+import org.apache.gobblin.data.management.version.TimestampedDatasetVersion;
+import org.apache.gobblin.data.management.version.finder.TimestampedDatasetStateStoreVersionFinder;
+import org.apache.gobblin.data.management.version.finder.VersionFinder;
+import org.apache.gobblin.metastore.metadata.DatasetStateStoreEntryManager;
+import org.apache.gobblin.util.ConfigUtils;
+import lombok.Data;
+
+
+/**
+ * A {@link CleanableDatasetStoreDataset} that deletes entries before a certain time
+ */
+@Data
+public class TimeBasedDatasetStoreDataset extends CleanableDatasetStoreDataset<TimestampedDatasetVersion>
{
+
+  private final VersionFinder<TimestampedDatasetStateStoreVersion> versionFinder;
+  private final VersionSelectionPolicy<TimestampedDatasetVersion> versionSelectionPolicy;
+
+  public TimeBasedDatasetStoreDataset(Key key, List<DatasetStateStoreEntryManager>
entries, Properties props) {
+    super(key, entries);
+    this.versionFinder = new TimestampedDatasetStateStoreVersionFinder();
+    this.versionSelectionPolicy = new SelectBeforeTimeBasedPolicy(ConfigUtils.propertiesToConfig(props));
+  }
+
+  @Override
+  public VersionFinder<TimestampedDatasetStateStoreVersion> getVersionFinder() {
+    return this.versionFinder;
+  }
+
+  @Override
+  public VersionSelectionPolicy<TimestampedDatasetVersion> getVersionSelectionPolicy()
{
+    return this.versionSelectionPolicy;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/70cbe91b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/finder/TimeBasedDatasetStoreDatasetFinder.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/finder/TimeBasedDatasetStoreDatasetFinder.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/finder/TimeBasedDatasetStoreDatasetFinder.java
new file mode 100644
index 0000000..eeb92aa
--- /dev/null
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/finder/TimeBasedDatasetStoreDatasetFinder.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.retention.dataset.finder;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import org.apache.gobblin.data.management.retention.dataset.CleanableDatasetStoreDataset;
+import org.apache.gobblin.data.management.retention.dataset.TimeBasedDatasetStoreDataset;
+import org.apache.gobblin.metastore.DatasetStoreDataset;
+import org.apache.gobblin.metastore.DatasetStoreDatasetFinder;
+import org.apache.hadoop.fs.FileSystem;
+
+
+/**
+ * A {@link DatasetStoreDatasetFinder} that returns {@link CleanableDatasetStoreDataset}
+ */
+public class TimeBasedDatasetStoreDatasetFinder extends DatasetStoreDatasetFinder {
+
+  private Properties props;
+
+  public TimeBasedDatasetStoreDatasetFinder(FileSystem fs, Properties props) throws IOException
{
+    super(fs, props);
+    this.props = props;
+  }
+
+  @Override
+  public List<DatasetStoreDataset> findDatasets() throws IOException {
+    return super.findDatasets().stream()
+                .map(dataset -> new TimeBasedDatasetStoreDataset(dataset.getKey(), dataset.getDatasetStateStoreMetadataEntries(),
props))
+                .collect(Collectors.toList());
+      }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/70cbe91b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/DatasetStateStoreVersion.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/DatasetStateStoreVersion.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/DatasetStateStoreVersion.java
new file mode 100644
index 0000000..47c161d
--- /dev/null
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/DatasetStateStoreVersion.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.version;
+
+import org.apache.gobblin.metastore.metadata.DatasetStateStoreEntryManager;
+
+
+/**
+ * {@link DatasetVersion} that has a {@link DatasetStateStoreEntryManager}
+ */
+public interface DatasetStateStoreVersion extends DatasetVersion {
+
+  DatasetStateStoreEntryManager getEntry();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/70cbe91b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/TimestampedDatasetStateStoreVersion.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/TimestampedDatasetStateStoreVersion.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/TimestampedDatasetStateStoreVersion.java
new file mode 100644
index 0000000..45a153e
--- /dev/null
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/TimestampedDatasetStateStoreVersion.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.version;
+
+import org.apache.gobblin.metastore.metadata.DatasetStateStoreEntryManager;
+import org.joda.time.DateTime;
+import lombok.Getter;
+
+/**
+ * {@link TimestampedDatasetVersion} that has a {@link DatasetStateStoreEntryManager}
+ */
+@Getter
+public class TimestampedDatasetStateStoreVersion extends TimestampedDatasetVersion implements
DatasetStateStoreVersion {
+
+  private final DatasetStateStoreEntryManager entry;
+
+  public TimestampedDatasetStateStoreVersion(DatasetStateStoreEntryManager entry) {
+    super(new DateTime(entry.getTimestamp()), null);
+    this.entry = entry;
+  }
+
+  @Override
+  public int compareTo(FileSystemDatasetVersion other) {
+    TimestampedDatasetVersion otherAsDateTime = (TimestampedDatasetVersion) other;
+    return this.version.equals(otherAsDateTime.version) ? 0 : this.version.compareTo(otherAsDateTime.version);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    return super.equals(obj);
+  }
+
+  @Override
+  public int hashCode() {
+    return this.version.hashCode();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/70cbe91b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/TimestampedDatasetStateStoreVersionFinder.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/TimestampedDatasetStateStoreVersionFinder.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/TimestampedDatasetStateStoreVersionFinder.java
new file mode 100644
index 0000000..3f9b0b5
--- /dev/null
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/TimestampedDatasetStateStoreVersionFinder.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.version.finder;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import org.apache.gobblin.data.management.version.FileSystemDatasetVersion;
+import org.apache.gobblin.data.management.version.TimestampedDatasetStateStoreVersion;
+import org.apache.gobblin.dataset.Dataset;
+import org.apache.gobblin.metastore.DatasetStoreDataset;
+import org.apache.gobblin.metastore.metadata.DatasetStateStoreEntryManager;
+import com.google.common.collect.Lists;
+
+
+/**
+ * {@link VersionFinder} for {@link TimestampedDatasetStateStoreVersion}
+ */
+public class TimestampedDatasetStateStoreVersionFinder implements VersionFinder<TimestampedDatasetStateStoreVersion>
{
+
+  @Override
+  public Class<? extends FileSystemDatasetVersion> versionClass() {
+    return TimestampedDatasetStateStoreVersion.class;
+  }
+
+  @Override
+  public Collection<TimestampedDatasetStateStoreVersion> findDatasetVersions(Dataset
dataset) throws IOException {
+    DatasetStoreDataset storeDataset = ((DatasetStoreDataset) dataset);
+    List<TimestampedDatasetStateStoreVersion> versions = Lists.newArrayList();
+
+    for (DatasetStateStoreEntryManager entry : storeDataset.getDatasetStateStoreMetadataEntries())
{
+      versions.add(new TimestampedDatasetStateStoreVersion(entry));
+    }
+    return versions;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/70cbe91b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/CleanableDatasetStoreDatasetTest.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/CleanableDatasetStoreDatasetTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/CleanableDatasetStoreDatasetTest.java
new file mode 100644
index 0000000..43dbd8a
--- /dev/null
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/CleanableDatasetStoreDatasetTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.retention;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.data.management.retention.dataset.CleanableDataset;
+import org.apache.gobblin.data.management.retention.dataset.finder.TimeBasedDatasetStoreDatasetFinder;
+import org.apache.gobblin.metastore.DatasetStoreDataset;
+import org.apache.gobblin.runtime.FsDatasetStateStore;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import com.google.common.io.Files;
+
+
+/**
+ * Unit test for {@link org.apache.gobblin.data.management.retention.dataset.CleanableDatasetStoreDataset}
+ */
+public class CleanableDatasetStoreDatasetTest {
+  @Test
+  public void testCleanStateStore() throws IOException {
+    File tmpDir = Files.createTempDir();
+    tmpDir.deleteOnExit();
+
+    FileSystem fs = FileSystem.getLocal(new Configuration());
+
+    FsDatasetStateStore store = new FsDatasetStateStore(fs, tmpDir.getAbsolutePath());
+
+    store.persistDatasetState("dataset1", new JobState.DatasetState("job1", "job1_id1"));
+    store.persistDatasetState("dataset1", new JobState.DatasetState("job1", "job1_id2"));
+    store.persistDatasetState("dataset1", new JobState.DatasetState("job2", "job2_id1"));
+    store.persistDatasetState("", new JobState.DatasetState("job3", "job3_id1"));
+
+    Properties props = new Properties();
+
+    props.setProperty(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, tmpDir.getAbsolutePath());
+    props.setProperty("selection.timeBased.lookbackTime", "0m");
+
+    TimeBasedDatasetStoreDatasetFinder datasetFinder = new TimeBasedDatasetStoreDatasetFinder(fs,
props);
+    List<DatasetStoreDataset> datasets = datasetFinder.findDatasets();
+
+    for (DatasetStoreDataset dataset : datasets) {
+      ((CleanableDataset) dataset).clean();
+      File jobDir = new File(tmpDir.getAbsolutePath(), dataset.getKey().getStoreName());
+      Assert.assertEquals(jobDir.list().length, 1);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/70cbe91b/gobblin-example/src/main/resources/state-store-retention.pull
----------------------------------------------------------------------
diff --git a/gobblin-example/src/main/resources/state-store-retention.pull b/gobblin-example/src/main/resources/state-store-retention.pull
new file mode 100644
index 0000000..f8920cc
--- /dev/null
+++ b/gobblin-example/src/main/resources/state-store-retention.pull
@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+type=hadoopJava
+job.class=org.apache.gobblin.data.management.retention.DatasetCleanerJob
+
+gobblin.retention.dataset.finder.class=org.apache.gobblin.data.management.retention.dataset.finder.TimeBasedDatasetStoreDatasetFinder
+selection.timeBased.lookbackTime=30m
+
+state.store.fs.uri=hdfs://localhost:9000
+state.store.dir=example/state-store
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/70cbe91b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/StateStoreEntryManager.java
----------------------------------------------------------------------
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/StateStoreEntryManager.java
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/StateStoreEntryManager.java
index b2fb04c..c4c7796 100644
--- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/StateStoreEntryManager.java
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/StateStoreEntryManager.java
@@ -41,7 +41,7 @@ public abstract class StateStoreEntryManager<T extends State> {
   /** {@link StateStore} where this entry exists. */
   private final StateStore stateStore;
 
-  private final long getTimestamp() {
+  public final long getTimestamp() {
     if (this.timestamp <= 0) {
       throw new RuntimeException("Timestamp is not reliable.");
     }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/70cbe91b/gobblin-modules/gobblin-compliance/src/main/java/org/apache/gobblin/compliance/retention/CleanableHivePartitionDataset.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-compliance/src/main/java/org/apache/gobblin/compliance/retention/CleanableHivePartitionDataset.java
b/gobblin-modules/gobblin-compliance/src/main/java/org/apache/gobblin/compliance/retention/CleanableHivePartitionDataset.java
index c8898f3..67ed73a 100644
--- a/gobblin-modules/gobblin-compliance/src/main/java/org/apache/gobblin/compliance/retention/CleanableHivePartitionDataset.java
+++ b/gobblin-modules/gobblin-compliance/src/main/java/org/apache/gobblin/compliance/retention/CleanableHivePartitionDataset.java
@@ -22,6 +22,7 @@ import java.util.Arrays;
 import java.util.List;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.gobblin.dataset.FileSystemDataset;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.metadata.Partition;
@@ -48,7 +49,7 @@ import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
  * @author adsharma
  */
 @Slf4j
-public class CleanableHivePartitionDataset extends HivePartitionDataset implements CleanableDataset
{
+public class CleanableHivePartitionDataset extends HivePartitionDataset implements CleanableDataset,
FileSystemDataset {
   private FileSystem fs;
   private State state;
 


Mime
View raw message