incubator-accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vi...@apache.org
Subject svn commit: r1189287 - in /incubator/accumulo/trunk/src/core/src: main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java
Date Wed, 26 Oct 2011 16:14:46 GMT
Author: vines
Date: Wed Oct 26 16:14:45 2011
New Revision: 1189287

URL: http://svn.apache.org/viewvc?rev=1189287&view=rev
Log:
re accumulo-55 - deletes empty files on close, with tests. Hopefully I didn't break the build
this time around.

Modified:
    incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
    incubator/accumulo/trunk/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java

Modified: incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java?rev=1189287&r1=1189286&r2=1189287&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
(original)
+++ incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
Wed Oct 26 16:14:45 2011
@@ -56,27 +56,32 @@ public class AccumuloFileOutputFormat ex
   @Override
   public RecordWriter<Key,Value> getRecordWriter(TaskAttemptContext job) throws IOException,
InterruptedException {
     // get the path of the temporary output file
-    Configuration conf = job.getConfiguration();
+    final Configuration conf = job.getConfiguration();
     
     String extension = conf.get(FILE_TYPE);
     if (extension == null || extension.isEmpty()) extension = RFile.EXTENSION;
     
     handleBlockSize(job);
-    Path file = this.getDefaultWorkFile(job, "." + extension);
+    final Path file = this.getDefaultWorkFile(job, "." + extension);
     
     final FileSKVWriter out = FileOperations.getInstance().openWriter(file.toString(), file.getFileSystem(conf),
conf,
         AccumuloConfiguration.getDefaultConfiguration());
     out.startDefaultLocalityGroup();
     
     return new RecordWriter<Key,Value>() {
+      private boolean hasData = false;
+      
       @Override
       public void write(Key key, Value value) throws IOException {
         out.append(key, value);
+        hasData = true;
       }
       
       @Override
       public void close(TaskAttemptContext context) throws IOException, InterruptedException
{
         out.close();
+        if (!hasData)
+          file.getFileSystem(conf).delete(file, false);
       }
     };
   }

Modified: incubator/accumulo/trunk/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java?rev=1189287&r1=1189286&r2=1189287&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java
(original)
+++ incubator/accumulo/trunk/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java
Wed Oct 26 16:14:45 2011
@@ -16,30 +16,52 @@
  */
 package org.apache.accumulo.core.client.mapreduce;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
 public class AccumuloFileOutputFormatTest {
-  JobContext job;
-  TaskAttemptContext tac;
+  static JobContext job;
+  static TaskAttemptContext tac;
+  static Path f = null;
   
   @Before
   public void setup() {
     job = new JobContext(new Configuration(), new JobID());
+
+    Path file = new Path(System.getenv("ACCUMULO_HOME")+"/target/");
+    f = new Path(file,"_temporary");
+    job.getConfiguration().set("mapred.output.dir", file.toString());
+    
     tac = new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID());
   }
   
+  @After
+  public void teardown() throws IOException
+  {
+    if (f!= null && f.getFileSystem(job.getConfiguration()).exists(f))
+    {
+      f.getFileSystem(job.getConfiguration()).delete(f, true);
+    }
+  }
+  
   @Test
   public void testSet() throws IOException, InterruptedException {
     AccumuloFileOutputFormat.setBlockSize(job, 300);
@@ -51,6 +73,34 @@ public class AccumuloFileOutputFormatTes
     validate((int) AccumuloConfiguration.getDefaultConfiguration().getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE));
   }
   
+  @Test
+  public void testEmptyWrite() throws IOException, InterruptedException {
+    handleWriteTests(false);
+  }
+
+  @Test
+  public void testRealWrite() throws IOException, InterruptedException {
+    handleWriteTests(true);
+  }
+  
+  public void handleWriteTests(boolean content) throws IOException, InterruptedException
{
+    AccumuloFileOutputFormat afof = new AccumuloFileOutputFormat();
+    RecordWriter<Key, Value> rw = afof.getRecordWriter(tac);
+    
+    if (content)
+      rw.write(new Key("Key"), new Value("".getBytes()));
+    
+    Path file = afof.getDefaultWorkFile(tac, ".rf");
+    System.out.println(file);
+    rw.close(tac);
+    
+    if (content)
+      assertTrue(file.getFileSystem(job.getConfiguration()).exists(file));
+    else
+      assertFalse(file.getFileSystem(job.getConfiguration()).exists(file));
+    file.getFileSystem(tac.getConfiguration()).delete(file.getParent(), true);
+  }
+  
   public void validate(int size) throws IOException, InterruptedException {
     AccumuloFileOutputFormat.handleBlockSize(job);
     int detSize = job.getConfiguration().getInt("io.seqfile.compress.blocksize", -1);



Mime
View raw message