crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject [1/2] CRUNCH-276: Various static checks and FindBugs fixes. Contributed by Sean Owen.
Date Tue, 15 Oct 2013 04:21:33 GMT
Updated Branches:
  refs/heads/master 68cf3bd3c -> e0dbebf15


http://git-wip-us.apache.org/repos/asf/crunch/blob/e0dbebf1/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java
index aac9e317..ed21911 100644
--- a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java
+++ b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java
@@ -74,6 +74,7 @@ import java.util.List;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -85,7 +86,7 @@ public class HFileTargetIT implements Serializable {
   private static final Path TEMP_DIR = new Path("/tmp");
   private static int tableCounter = 0;
 
-  private static FilterFn<String> SHORT_WORD_FILTER = new FilterFn<String>()
{
+  private static final FilterFn<String> SHORT_WORD_FILTER = new FilterFn<String>()
{
     @Override
     public boolean accept(String input) {
       return input.length() <= 2;
@@ -219,7 +220,7 @@ public class HFileTargetIT implements Serializable {
     }
   }
 
-  /** @seealso CRUNCH-251 */
+  /** See CRUNCH-251 */
   @Test
   public void testMultipleHFileTargets() throws Exception {
     Configuration conf = HBASE_TEST_UTILITY.getConfiguration();
@@ -258,7 +259,7 @@ public class HFileTargetIT implements Serializable {
   @Test
   public void testHFileUsesFamilyConfig() throws IOException {
     DataBlockEncoding newBlockEncoding = DataBlockEncoding.PREFIX;
-    assertTrue(newBlockEncoding != DataBlockEncoding.valueOf(HColumnDescriptor.DEFAULT_DATA_BLOCK_ENCODING));
+    assertNotSame(newBlockEncoding, DataBlockEncoding.valueOf(HColumnDescriptor.DEFAULT_DATA_BLOCK_ENCODING));
 
     Configuration conf = HBASE_TEST_UTILITY.getConfiguration();
     Pipeline pipeline = new MRPipeline(HFileTargetIT.class, conf);
@@ -299,7 +300,7 @@ public class HFileTargetIT implements Serializable {
     assertTrue(hfilesCount > 0);
   }
 
-  private PCollection<Put> convertToPuts(PTable<String, Long> in) {
+  private static PCollection<Put> convertToPuts(PTable<String, Long> in) {
     return in.parallelDo(new MapFn<Pair<String, Long>, Put>() {
       @Override
       public Put map(Pair<String, Long> input) {
@@ -312,23 +313,22 @@ public class HFileTargetIT implements Serializable {
     }, Writables.writables(Put.class));
   }
 
-  private PCollection<KeyValue> convertToKeyValues(PTable<String, Long> in) {
+  private static PCollection<KeyValue> convertToKeyValues(PTable<String, Long>
in) {
     return in.parallelDo(new MapFn<Pair<String, Long>, KeyValue>() {
       @Override
       public KeyValue map(Pair<String, Long> input) {
         String w = input.first();
         long c = input.second();
-        KeyValue kv = new KeyValue(
+        return new KeyValue(
             Bytes.toBytes(w),
             TEST_FAMILY,
             TEST_QUALIFIER,
             Bytes.toBytes(c));
-        return kv;
       }
     }, Writables.writables(KeyValue.class));
   }
 
-  private PCollection<String> split(PCollection<String> in, final String regex)
{
+  private static PCollection<String> split(PCollection<String> in, final String
regex) {
     return in.parallelDo(new DoFn<String, String>() {
       @Override
       public void process(String input, Emitter<String> emitter) {
@@ -340,7 +340,7 @@ public class HFileTargetIT implements Serializable {
   }
 
   /** Reads the first value on a given row from a bunch of hfiles. */
-  private KeyValue readFromHFiles(FileSystem fs, Path mrOutputPath, String row) throws IOException
{
+  private static KeyValue readFromHFiles(FileSystem fs, Path mrOutputPath, String row) throws
IOException {
     List<KeyValueScanner> scanners = Lists.newArrayList();
     KeyValue fakeKV = KeyValue.createFirstOnRow(Bytes.toBytes(row));
     for (FileStatus e : fs.listStatus(mrOutputPath)) {
@@ -366,7 +366,7 @@ public class HFileTargetIT implements Serializable {
     return kv;
   }
 
-  private Path copyResourceFileToHDFS(String resourceName) throws IOException {
+  private static Path copyResourceFileToHDFS(String resourceName) throws IOException {
     Configuration conf = HBASE_TEST_UTILITY.getConfiguration();
     FileSystem fs = FileSystem.get(conf);
     Path resultPath = getTempPathOnHDFS(resourceName);
@@ -383,14 +383,14 @@ public class HFileTargetIT implements Serializable {
     return resultPath;
   }
 
-  private Path getTempPathOnHDFS(String fileName) throws IOException {
+  private static Path getTempPathOnHDFS(String fileName) throws IOException {
     Configuration conf = HBASE_TEST_UTILITY.getConfiguration();
     FileSystem fs = FileSystem.get(conf);
     Path result = new Path(TEMP_DIR, fileName);
     return result.makeQualified(fs);
   }
 
-  private long getWordCountFromTable(HTable table, String word) throws IOException {
+  private static long getWordCountFromTable(HTable table, String word) throws IOException
{
     Get get = new Get(Bytes.toBytes(word));
     KeyValue keyValue = table.get(get).getColumnLatest(TEST_FAMILY, TEST_QUALIFIER);
     if (keyValue == null) {

http://git-wip-us.apache.org/repos/asf/crunch/blob/e0dbebf1/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java
index 149e359..af32c1a 100644
--- a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java
+++ b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java
@@ -18,6 +18,7 @@
 package org.apache.crunch.io.hbase;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.BufferedReader;
@@ -40,8 +41,6 @@ import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
 import org.apache.crunch.Pipeline;
 import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.io.hbase.HBaseSourceTarget;
-import org.apache.crunch.io.hbase.HBaseTarget;
 import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.test.TemporaryPaths;
 import org.apache.crunch.types.writable.Writables;
@@ -149,7 +148,7 @@ public class WordCountHBaseIT {
         String umask = br.readLine();
 
         int umaskBits = Integer.parseInt(umask, 8);
-        int permBits = 0777 & ~umaskBits;
+        int permBits = 0x1ff & ~umaskBits;
         String perms = Integer.toString(permBits, 8);
 
         conf.set("dfs.datanode.data.dir.perm", perms);
@@ -196,7 +195,7 @@ public class WordCountHBaseIT {
     }
   }
 
-  private void jarUp(JarOutputStream jos, File baseDir, String classDir) throws IOException
{
+  private static void jarUp(JarOutputStream jos, File baseDir, String classDir) throws IOException
{
     File file = new File(baseDir, classDir);
     JarEntry e = new JarEntry(classDir);
     e.setTime(file.lastModified());
@@ -229,67 +228,60 @@ public class WordCountHBaseIT {
     String outputTableName = "crunch_counts_" + postFix;
     String otherTableName = "crunch_other_" + postFix;
     String joinTableName = "crunch_join_words_" + postFix;
-    
-    try {
 
-      HTable inputTable = hbaseTestUtil.createTable(Bytes.toBytes(inputTableName), WORD_COLFAM);
-      HTable outputTable = hbaseTestUtil.createTable(Bytes.toBytes(outputTableName), COUNTS_COLFAM);
-      HTable otherTable = hbaseTestUtil.createTable(Bytes.toBytes(otherTableName), COUNTS_COLFAM);
-      
-      int key = 0;
-      key = put(inputTable, key, "cat");
-      key = put(inputTable, key, "cat");
-      key = put(inputTable, key, "dog");
-      Scan scan = new Scan();
-      scan.addFamily(WORD_COLFAM);
-      HBaseSourceTarget source = new HBaseSourceTarget(inputTableName, scan);
-      PTable<ImmutableBytesWritable, Result> words = pipeline.read(source);
-
-      Map<ImmutableBytesWritable, Result> materialized = words.materializeToMap();
-      assertEquals(3, materialized.size());
-
-      PCollection<Put> puts = wordCount(words);
-      pipeline.write(puts, new HBaseTarget(outputTableName));
-      pipeline.write(puts, new HBaseTarget(otherTableName));
-      pipeline.done();
-
-      assertIsLong(outputTable, "cat", 2);
-      assertIsLong(outputTable, "dog", 1);
-      assertIsLong(otherTable, "cat", 2);
-      assertIsLong(otherTable, "dog", 1);
-      
-      // verify we can do joins.
-      HTable joinTable = hbaseTestUtil.createTable(Bytes.toBytes(joinTableName), WORD_COLFAM);
-      key = 0;
-      key = put(joinTable, key, "zebra");
-      key = put(joinTable, key, "donkey");
-      key = put(joinTable, key, "bird");
-      key = put(joinTable, key, "horse");
-      
-      Scan joinScan = new Scan();
-      joinScan.addFamily(WORD_COLFAM);
-      PTable<ImmutableBytesWritable, Result> other = pipeline.read(FromHBase.table(joinTableName,
joinScan));
-      PCollection<String> joined = words.join(other).parallelDo(new StringifyFn(),
Writables.strings());
-      assertEquals(ImmutableSet.of("cat,zebra", "cat,donkey", "dog,bird"),
-          ImmutableSet.copyOf(joined.materialize()));
-      pipeline.done();
-
-      //verify HBaseTarget supports deletes.
-      Scan clearScan = new Scan();
-      clearScan.addFamily(COUNTS_COLFAM);
-      pipeline = new MRPipeline(WordCountHBaseIT.class, hbaseTestUtil.getConfiguration());
-      HBaseSourceTarget clearSource = new HBaseSourceTarget(outputTableName, clearScan);
-      PTable<ImmutableBytesWritable, Result> counts = pipeline.read(clearSource);
-      pipeline.write(clearCounts(counts), new HBaseTarget(outputTableName));
-      pipeline.done();
-      
-      assertDeleted(outputTable, "cat");
-      assertDeleted(outputTable, "dog");
-      
-      
-    } finally {
-      // not quite sure...
-    }
+    HTable inputTable = hbaseTestUtil.createTable(Bytes.toBytes(inputTableName), WORD_COLFAM);
+    HTable outputTable = hbaseTestUtil.createTable(Bytes.toBytes(outputTableName), COUNTS_COLFAM);
+    HTable otherTable = hbaseTestUtil.createTable(Bytes.toBytes(otherTableName), COUNTS_COLFAM);
+
+    int key = 0;
+    key = put(inputTable, key, "cat");
+    key = put(inputTable, key, "cat");
+    key = put(inputTable, key, "dog");
+    Scan scan = new Scan();
+    scan.addFamily(WORD_COLFAM);
+    HBaseSourceTarget source = new HBaseSourceTarget(inputTableName, scan);
+    PTable<ImmutableBytesWritable, Result> words = pipeline.read(source);
+
+    Map<ImmutableBytesWritable, Result> materialized = words.materializeToMap();
+    assertEquals(3, materialized.size());
+
+    PCollection<Put> puts = wordCount(words);
+    pipeline.write(puts, new HBaseTarget(outputTableName));
+    pipeline.write(puts, new HBaseTarget(otherTableName));
+    pipeline.done();
+
+    assertIsLong(outputTable, "cat", 2);
+    assertIsLong(outputTable, "dog", 1);
+    assertIsLong(otherTable, "cat", 2);
+    assertIsLong(otherTable, "dog", 1);
+
+    // verify we can do joins.
+    HTable joinTable = hbaseTestUtil.createTable(Bytes.toBytes(joinTableName), WORD_COLFAM);
+    key = 0;
+    key = put(joinTable, key, "zebra");
+    key = put(joinTable, key, "donkey");
+    key = put(joinTable, key, "bird");
+    key = put(joinTable, key, "horse");
+
+    Scan joinScan = new Scan();
+    joinScan.addFamily(WORD_COLFAM);
+    PTable<ImmutableBytesWritable, Result> other = pipeline.read(FromHBase.table(joinTableName,
joinScan));
+    PCollection<String> joined = words.join(other).parallelDo(new StringifyFn(), Writables.strings());
+    assertEquals(ImmutableSet.of("cat,zebra", "cat,donkey", "dog,bird"),
+        ImmutableSet.copyOf(joined.materialize()));
+    pipeline.done();
+
+    //verify HBaseTarget supports deletes.
+    Scan clearScan = new Scan();
+    clearScan.addFamily(COUNTS_COLFAM);
+    pipeline = new MRPipeline(WordCountHBaseIT.class, hbaseTestUtil.getConfiguration());
+    HBaseSourceTarget clearSource = new HBaseSourceTarget(outputTableName, clearScan);
+    PTable<ImmutableBytesWritable, Result> counts = pipeline.read(clearSource);
+    pipeline.write(clearCounts(counts), new HBaseTarget(outputTableName));
+    pipeline.done();
+
+    assertDeleted(outputTable, "cat");
+    assertDeleted(outputTable, "dog");
   }
 
   protected int put(HTable table, int key, String value) throws IOException {
@@ -299,20 +291,21 @@ public class WordCountHBaseIT {
     return key + 1;
   }
 
-  protected void assertIsLong(HTable table, String key, long i) throws IOException {
+  protected static void assertIsLong(HTable table, String key, long i) throws IOException
{
     Get get = new Get(Bytes.toBytes(key));
     get.addFamily(COUNTS_COLFAM);
     Result result = table.get(get);
 
     byte[] rawCount = result.getValue(COUNTS_COLFAM, null);
-    assertTrue(rawCount != null);
-    assertEquals(new Long(i), new Long(Bytes.toLong(rawCount)));
+    assertNotNull(rawCount);
+    assertEquals(i, Bytes.toLong(rawCount));
   }
   
-  protected void assertDeleted(HTable table, String key) throws IOException {
-      Get get = new Get(Bytes.toBytes(key));
-      get.addFamily(COUNTS_COLFAM);
-      Result result = table.get(get);
-      assertTrue(result.isEmpty());
-    }
+  protected static void assertDeleted(HTable table, String key) throws IOException {
+    Get get = new Get(Bytes.toBytes(key));
+    get.addFamily(COUNTS_COLFAM);
+    Result result = table.get(get);
+    assertTrue(result.isEmpty());
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/e0dbebf1/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java
b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java
index e7bca2b..ae35088 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java
@@ -63,6 +63,7 @@ public class HFileOutputFormatForCrunch extends FileOutputFormat<Object,
KeyValu
   private final byte [] now = Bytes.toBytes(System.currentTimeMillis());
   private final TimeRangeTracker trt = new TimeRangeTracker();
 
+  @Override
   public RecordWriter<Object, KeyValue> getRecordWriter(final TaskAttemptContext context)
       throws IOException, InterruptedException {
     Path outputPath = getDefaultWorkFile(context, "");
@@ -97,6 +98,7 @@ public class HFileOutputFormatForCrunch extends FileOutputFormat<Object,
KeyValu
         .create();
 
     return new RecordWriter<Object, KeyValue>() {
+      @Override
       public void write(Object row, KeyValue kv)
           throws IOException {
         if (kv.getTimestamp() == HConstants.LATEST_TIMESTAMP) {
@@ -106,6 +108,7 @@ public class HFileOutputFormatForCrunch extends FileOutputFormat<Object,
KeyValu
         trt.includeTimestamp(kv);
       }
 
+      @Override
       public void close(TaskAttemptContext c)
           throws IOException, InterruptedException {
         writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,

http://git-wip-us.apache.org/repos/asf/crunch/blob/e0dbebf1/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaReflectDataFactory.java
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaReflectDataFactory.java
b/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaReflectDataFactory.java
index d119d7c..e3d4eb2 100644
--- a/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaReflectDataFactory.java
+++ b/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaReflectDataFactory.java
@@ -29,8 +29,10 @@ import org.apache.crunch.types.avro.ReflectDataFactory;
  */
 public class ScalaReflectDataFactory extends ReflectDataFactory {
 
+  @Override
   public ReflectData getReflectData() { return ScalaSafeReflectData.get(); }
   
+  @Override
   public <T> ReflectDatumReader<T> getReader(Schema schema) {
     return new ScalaSafeReflectDatumReader<T>(schema);
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/e0dbebf1/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumReader.java
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumReader.java
b/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumReader.java
index 80f265c..123b45e 100644
--- a/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumReader.java
+++ b/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumReader.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Map;
 
+import com.google.common.collect.Lists;
 import org.apache.avro.Schema;
 import org.apache.avro.io.ResolvingDecoder;
 import org.apache.avro.reflect.ReflectDatumReader;
@@ -30,9 +31,6 @@ import org.apache.hadoop.util.ReflectionUtils;
 
 import scala.collection.JavaConversions;
 
-/**
- *
- */
 public class ScalaSafeReflectDatumReader<T> extends ReflectDatumReader<T> {
   
   public ScalaSafeReflectDatumReader(Schema schema) {
@@ -44,9 +42,9 @@ public class ScalaSafeReflectDatumReader<T> extends ReflectDatumReader<T>
{
       ResolvingDecoder in) throws IOException {
     Schema expectedType = expected.getElementType();
     long l = in.readArrayStart();
-    long base = 0;
     if (l > 0) {
       Object array = newArray(old, (int) l, expected);
+      long base = 0;
       do {
         for (long i = 0; i < l; i++) {
           addToArray(array, base + i, read(peekArray(array), expectedType, in));
@@ -82,9 +80,11 @@ public class ScalaSafeReflectDatumReader<T> extends ReflectDatumReader<T>
{
         scala.collection.Iterable it = toIter(array);
         if (scala.collection.immutable.List.class.isAssignableFrom(collectionClass)) {
           return it.toList();
-        } else if (scala.collection.mutable.Buffer.class.isAssignableFrom(collectionClass))
{
+        }
+        if (scala.collection.mutable.Buffer.class.isAssignableFrom(collectionClass)) {
           return it.toBuffer();
-        } else if (scala.collection.immutable.Set.class.isAssignableFrom(collectionClass))
{
+        }
+        if (scala.collection.immutable.Set.class.isAssignableFrom(collectionClass)) {
           return it.toSet();
         }
         return it;
@@ -98,7 +98,7 @@ public class ScalaSafeReflectDatumReader<T> extends ReflectDatumReader<T>
{
   }
   
   @Override
-  @SuppressWarnings(value="unchecked")
+  @SuppressWarnings("unchecked")
   protected Object newArray(Object old, int size, Schema schema) {
     ScalaSafeReflectData data = ScalaSafeReflectData.get();
     Class collectionClass = ScalaSafeReflectData.getClassProp(schema,
@@ -110,14 +110,15 @@ public class ScalaSafeReflectDatumReader<T> extends ReflectDatumReader<T>
{
       }
       if (scala.collection.Iterable.class.isAssignableFrom(collectionClass) ||
           collectionClass.isAssignableFrom(ArrayList.class)) {
-        return new ArrayList();
+        return Lists.newArrayList();
       }
       return ReflectionUtils.newInstance(collectionClass, null);
     }
     Class elementClass = ScalaSafeReflectData.getClassProp(schema,
         ScalaSafeReflectData.ELEMENT_PROP);
-    if (elementClass == null)
+    if (elementClass == null) {
       elementClass = data.getClass(schema.getElementType());
+    }
     return Array.newInstance(elementClass, size);
   }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/e0dbebf1/crunch-test/src/main/java/org/apache/crunch/test/TemporaryPath.java
----------------------------------------------------------------------
diff --git a/crunch-test/src/main/java/org/apache/crunch/test/TemporaryPath.java b/crunch-test/src/main/java/org/apache/crunch/test/TemporaryPath.java
index 1721eca..3cd175f 100644
--- a/crunch-test/src/main/java/org/apache/crunch/test/TemporaryPath.java
+++ b/crunch-test/src/main/java/org/apache/crunch/test/TemporaryPath.java
@@ -48,7 +48,7 @@ public final class TemporaryPath extends ExternalResource {
   private final Set<String> confKeys;
 
   /**
-   * Construct {@link TemporaryPath}.
+   * Construct {@code TemporaryPath}.
    * @param confKeys {@link Configuration} keys containing directories to override
    */
   public TemporaryPath(String... confKeys) {
@@ -130,7 +130,7 @@ public final class TemporaryPath extends ExternalResource {
     return copyResourceFile(resourceName).getAbsolutePath();
   }
 
-  private void copy(String resourceName, File dest) throws IOException {
+  private static void copy(String resourceName, File dest) throws IOException {
     Files.copy(Resources.newInputStreamSupplier(Resources.getResource(resourceName)), dest);
   }
 


Mime
View raw message