cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From brandonwilli...@apache.org
Subject [2/6] git commit: Add the ability to cleanup files after CqlBulkRecordWriter succeeds
Date Tue, 14 Oct 2014 20:44:18 GMT
Add the ability to cleanup files after CqlBulkRecordWriter succeeds

Patch by Paul Pak, reviewed by Piotr Kołaczkowski for CASSANDRA-7777


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6c0ee30e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6c0ee30e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6c0ee30e

Branch: refs/heads/cassandra-2.1
Commit: 6c0ee30ea7c57066aa15c25a40a217ca83aff248
Parents: 781018c
Author: Brandon Williams <brandonwilliams@apache.org>
Authored: Tue Oct 14 15:42:43 2014 -0500
Committer: Brandon Williams <brandonwilliams@apache.org>
Committed: Tue Oct 14 15:42:43 2014 -0500

----------------------------------------------------------------------
 .../cassandra/hadoop/cql3/CqlBulkOutputFormat.java     | 11 +++++++++++
 .../cassandra/hadoop/cql3/CqlBulkRecordWriter.java     | 13 ++++++++++++-
 2 files changed, 23 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c0ee30e/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java
index 58e05b6..887fe8e 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java
@@ -53,6 +53,7 @@ public class CqlBulkOutputFormat extends AbstractBulkOutputFormat<Object,
List<B
   
     private static final String OUTPUT_CQL_SCHEMA_PREFIX = "cassandra.columnfamily.schema.";
     private static final String OUTPUT_CQL_INSERT_PREFIX = "cassandra.columnfamily.insert.";
+    private static final String DELETE_SOURCE = "cassandra.output.delete.source";
   
     /** Fills the deprecated OutputFormat interface for streaming. */
     @Deprecated
@@ -103,4 +104,14 @@ public class CqlBulkOutputFormat extends AbstractBulkOutputFormat<Object,
List<B
         }
         return insert;
     }
+    
+    public static void setDeleteSourceOnSuccess(Configuration conf, boolean deleteSrc)
+    {
+        conf.setBoolean(DELETE_SOURCE, deleteSrc);
+    }
+    
+    public static boolean getDeleteSourceOnSuccess(Configuration conf)
+    {
+        return conf.getBoolean(DELETE_SOURCE, false);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c0ee30e/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
index 7a75bb4..43e3a12 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
@@ -32,6 +32,8 @@ import org.apache.cassandra.hadoop.ConfigHelper;
 import org.apache.cassandra.hadoop.HadoopCompat;
 import org.apache.cassandra.io.sstable.CQLSSTableWriter;
 import org.apache.cassandra.io.sstable.SSTableLoader;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.streaming.StreamState;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.util.Progressable;
@@ -57,6 +59,7 @@ public class CqlBulkRecordWriter extends AbstractBulkRecordWriter<Object,
List<B
     private String schema;
     private String insertStatement;
     private File outputDir;
+    private boolean deleteSrc;
 
     CqlBulkRecordWriter(TaskAttemptContext context) throws IOException
     {
@@ -84,6 +87,7 @@ public class CqlBulkRecordWriter extends AbstractBulkRecordWriter<Object,
List<B
         schema = CqlBulkOutputFormat.getColumnFamilySchema(conf, columnFamily);
         insertStatement = CqlBulkOutputFormat.getColumnFamilyInsertStatement(conf, columnFamily);
         outputDir = getColumnFamilyDirectory();
+        deleteSrc = CqlBulkOutputFormat.getDeleteSourceOnSuccess(conf);
     }
 
     
@@ -107,7 +111,14 @@ public class CqlBulkRecordWriter extends AbstractBulkRecordWriter<Object,
List<B
                 
                 externalClient.addKnownCfs(keyspace, schema);
 
-                this.loader = new SSTableLoader(outputDir, externalClient, new BulkRecordWriter.NullOutputHandler());
+                this.loader = new SSTableLoader(outputDir, externalClient, new BulkRecordWriter.NullOutputHandler())
{
+                    @Override
+                    public void onSuccess(StreamState finalState)
+                    {
+                        if (deleteSrc)
+                            FileUtils.deleteRecursive(outputDir);
+                    }
+                };
             }
         }
         catch (Exception e)


Mime
View raw message