accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bil...@apache.org
Subject svn commit: r1373452 - in /accumulo/trunk/core/src: main/java/org/apache/accumulo/core/client/mapreduce/ main/java/org/apache/accumulo/core/file/rfile/ test/java/org/apache/accumulo/core/client/mapreduce/
Date Wed, 15 Aug 2012 15:13:13 GMT
Author: billie
Date: Wed Aug 15 15:13:12 2012
New Revision: 1373452

URL: http://svn.apache.org/viewvc?rev=1373452&view=rev
Log:
ACCUMULO-467 made file properties configurable for AccumuloFileOutputFormat

Modified:
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
    accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java?rev=1373452&r1=1373451&r2=1373452&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
(original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
Wed Aug 15 15:13:12 2012
@@ -46,7 +46,6 @@ import org.apache.hadoop.mapreduce.lib.o
 public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> {
   private static final String PREFIX = AccumuloOutputFormat.class.getSimpleName();
   public static final String FILE_TYPE = PREFIX + ".file_type";
-  public static final String BLOCK_SIZE = PREFIX + ".block_size";
   
   private static final String INSTANCE_HAS_BEEN_SET = PREFIX + ".instanceConfigured";
   private static final String INSTANCE_NAME = PREFIX + ".instanceName";
@@ -61,7 +60,6 @@ public class AccumuloFileOutputFormat ex
     if (extension == null || extension.isEmpty())
       extension = RFile.EXTENSION;
     
-    handleBlockSize(job.getConfiguration());
     final Path file = this.getDefaultWorkFile(job, "." + extension);
     
     return new RecordWriter<Key,Value>() {
@@ -84,28 +82,16 @@ public class AccumuloFileOutputFormat ex
     };
   }
   
-  protected static void handleBlockSize(Configuration conf) {
-    int blockSize;
-    if (conf.getBoolean(INSTANCE_HAS_BEEN_SET, false)) {
-      blockSize = (int) new ZooKeeperInstance(conf.get(INSTANCE_NAME), conf.get(ZOOKEEPERS)).getConfiguration().getMemoryInBytes(
-          Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE);
-    } else {
-      blockSize = getBlockSize(conf);
-    }
-    conf.setInt("io.seqfile.compress.blocksize", blockSize);
-    
-  }
-  
   public static void setFileType(Configuration conf, String type) {
     conf.set(FILE_TYPE, type);
   }
   
+  /**
+   * @deprecated since 1.5, use {@link #setCompressedBlockSize(Configuration, long)} instead
+   */
   public static void setBlockSize(Configuration conf, int blockSize) {
-    conf.setInt(BLOCK_SIZE, blockSize);
-  }
-  
-  private static int getBlockSize(Configuration conf) {
-    return conf.getInt(BLOCK_SIZE, (int) AccumuloConfiguration.getDefaultConfiguration().getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE));
+    long bs = blockSize;
+    setCompressedBlockSize(conf, bs);
   }
   
   /**
@@ -130,4 +116,24 @@ public class AccumuloFileOutputFormat ex
   protected static Instance getInstance(Configuration conf) {
     return new ZooKeeperInstance(conf.get(INSTANCE_NAME), conf.get(ZOOKEEPERS));
   }
+  
+  public static void setReplication(Configuration conf, int replication) {
+    conf.setInt(Property.TABLE_FILE_REPLICATION.getKey(), replication);
+  }
+  
+  public static void setDFSBlockSize(Configuration conf, long blockSize) {
+    conf.setLong(Property.TABLE_FILE_BLOCK_SIZE.getKey(), blockSize);
+  }
+  
+  public static void setCompressedBlockSize(Configuration conf, long cblockSize) {
+    conf.setLong(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), cblockSize);
+  }
+  
+  public static void setCompressedBlockSizeIndex(Configuration conf, long cblockSizeIndex)
{
+    conf.setLong(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX.getKey(), cblockSizeIndex);
+  }
+  
+  public static void setCompressionType(Configuration conf, String compression) {
+    conf.set(Property.TABLE_FILE_COMPRESSION_TYPE.getKey(), compression);
+  }
 }

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java?rev=1373452&r1=1373451&r2=1373452&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
(original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
Wed Aug 15 15:13:12 2012
@@ -104,22 +104,23 @@ public class RFileOperations extends Fil
   @Override
   public FileSKVWriter openWriter(String file, FileSystem fs, Configuration conf, AccumuloConfiguration
acuconf) throws IOException {
     int hrep = conf.getInt("dfs.replication", -1);
-    int trep = acuconf.getCount(Property.TABLE_FILE_REPLICATION);
+    int trep = conf.getInt(Property.TABLE_FILE_REPLICATION.getKey(), acuconf.getCount(Property.TABLE_FILE_REPLICATION));
     int rep = hrep;
     if (trep > 0 && trep != hrep) {
       rep = trep;
     }
     long hblock = conf.getLong("dfs.block.size", 1 << 26);
-    long tblock = acuconf.getMemoryInBytes(Property.TABLE_FILE_BLOCK_SIZE);
+    long tblock = conf.getLong(Property.TABLE_FILE_BLOCK_SIZE.getKey(), acuconf.getMemoryInBytes(Property.TABLE_FILE_BLOCK_SIZE));
     long block = hblock;
     if (tblock > 0)
       block = tblock;
     int bufferSize = conf.getInt("io.file.buffer.size", 4096);
     
-    long blockSize = acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE);
-    long indexBlockSize = acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX);
+    long blockSize = conf.getLong(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE));
+    long indexBlockSize = conf.getLong(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX.getKey(),
+        acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX));
     
-    String compression = acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE);
+    String compression = conf.get(Property.TABLE_FILE_COMPRESSION_TYPE.getKey(), acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE));
     
     CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(fs.create(new Path(file),
false, bufferSize, (short) rep, block), compression, conf);
     Writer writer = new RFile.Writer(_cbw, (int) blockSize, (int) indexBlockSize);

Modified: accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java?rev=1373452&r1=1373451&r2=1373452&view=diff
==============================================================================
--- accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java
(original)
+++ accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java
Wed Aug 15 15:13:12 2012
@@ -27,6 +27,7 @@ import org.apache.accumulo.core.conf.Pro
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.util.ContextFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordWriter;
@@ -59,17 +60,6 @@ public class AccumuloFileOutputFormatTes
   }
   
   @Test
-  public void testSet() throws IOException, InterruptedException {
-    AccumuloFileOutputFormat.setBlockSize(job.getConfiguration(), 300);
-    validate(300);
-  }
-  
-  @Test
-  public void testUnset() throws IOException, InterruptedException {
-    validate((int) AccumuloConfiguration.getDefaultConfiguration().getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE));
-  }
-  
-  @Test
   public void testEmptyWrite() throws IOException, InterruptedException {
     handleWriteTests(false);
   }
@@ -97,10 +87,28 @@ public class AccumuloFileOutputFormatTes
     file.getFileSystem(tac.getConfiguration()).delete(file.getParent(), true);
   }
   
-  public void validate(int size) throws IOException, InterruptedException {
-    AccumuloFileOutputFormat.handleBlockSize(job.getConfiguration());
-    int detSize = job.getConfiguration().getInt("io.seqfile.compress.blocksize", -1);
-    assertEquals(size, detSize);
+  @Test
+  public void validateConfiguration() throws IOException, InterruptedException {
+    Configuration conf = job.getConfiguration();
+    AccumuloConfiguration acuconf = AccumuloConfiguration.getDefaultConfiguration();
+    
+    int a = 7;
+    long b = 300l;
+    long c = 50l;
+    long d = 10l;
+    String e = "type";
+    
+    AccumuloFileOutputFormat.setReplication(conf, a);
+    AccumuloFileOutputFormat.setDFSBlockSize(conf, b);
+    AccumuloFileOutputFormat.setCompressedBlockSize(conf, c);
+    AccumuloFileOutputFormat.setCompressedBlockSizeIndex(conf, d);
+    AccumuloFileOutputFormat.setCompressionType(conf, e);
+    
+    assertEquals(a, conf.getInt(Property.TABLE_FILE_REPLICATION.getKey(), acuconf.getCount(Property.TABLE_FILE_REPLICATION)));
+    assertEquals(b, conf.getLong(Property.TABLE_FILE_BLOCK_SIZE.getKey(), acuconf.getMemoryInBytes(Property.TABLE_FILE_BLOCK_SIZE)));
+    assertEquals(c, conf.getLong(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE)));
+    assertEquals(d,
+        conf.getLong(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX.getKey(), acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX)));
+    assertEquals(e, conf.get(Property.TABLE_FILE_COMPRESSION_TYPE.getKey(), acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE)));
   }
-  
 }



Mime
View raw message