tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jh...@apache.org
Subject [2/2] git commit: TAJO-536: Fix warnings in tajo-core-storage. (jinho)
Date Mon, 27 Jan 2014 11:19:29 GMT
TAJO-536: Fix warnings in tajo-core-storage. (jinho)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/5ac8c77c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/5ac8c77c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/5ac8c77c

Branch: refs/heads/master
Commit: 5ac8c77cd6d77d367e19c2bb7937e835c191199c
Parents: eb563ad
Author: jinossy <jinossy@gmail.com>
Authored: Mon Jan 27 20:18:46 2014 +0900
Committer: jinossy <jinossy@gmail.com>
Committed: Mon Jan 27 20:18:46 2014 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 dev-support/findbugs-exclude.xml                |  32 ++-
 .../java/org/apache/tajo/datum/BitDatum.java    |   8 +-
 .../java/org/apache/tajo/datum/BlobDatum.java   |  12 +-
 .../org/apache/tajo/datum/BooleanDatum.java     |   8 +-
 .../java/org/apache/tajo/datum/CharDatum.java   |  12 +-
 .../java/org/apache/tajo/datum/DateDatum.java   |   2 +-
 .../main/java/org/apache/tajo/datum/Datum.java  |   6 +-
 .../java/org/apache/tajo/datum/Float4Datum.java |  14 +-
 .../java/org/apache/tajo/datum/Float8Datum.java |  14 +-
 .../java/org/apache/tajo/datum/Inet4Datum.java  |  40 ++--
 .../java/org/apache/tajo/datum/Int2Datum.java   |  14 +-
 .../java/org/apache/tajo/datum/Int4Datum.java   |  14 +-
 .../java/org/apache/tajo/datum/Int8Datum.java   |  14 +-
 .../org/apache/tajo/datum/NumericDatum.java     |   2 +-
 .../org/apache/tajo/datum/ProtobufDatum.java    |  10 +-
 .../java/org/apache/tajo/datum/TextDatum.java   |  10 +-
 .../java/org/apache/tajo/datum/TimeDatum.java   |   2 +-
 .../org/apache/tajo/datum/TimestampDatum.java   |   2 +-
 .../main/java/org/apache/tajo/util/Bytes.java   |  25 ++
 .../org/apache/tajo/engine/eval/SignedEval.java |   2 +-
 .../java/org/apache/tajo/storage/Tuple.java     |   2 +
 .../tajo/storage/AbstractStorageManager.java    |  23 +-
 .../org/apache/tajo/storage/FrameTuple.java     |  11 +-
 .../java/org/apache/tajo/storage/LazyTuple.java |  26 ++-
 .../java/org/apache/tajo/storage/RowFile.java   |  18 +-
 .../storage/TextSerializerDeserializer.java     |   4 +-
 .../java/org/apache/tajo/storage/Tuple.java     |   2 +
 .../apache/tajo/storage/TupleComparator.java    |   6 +
 .../java/org/apache/tajo/storage/VTuple.java    |  25 +-
 .../tajo/storage/fragment/FileFragment.java     |   2 +-
 .../apache/tajo/storage/index/bst/BSTIndex.java |  31 ++-
 .../storage/rcfile/BytesRefArrayWritable.java   |   5 +
 .../tajo/storage/rcfile/BytesRefWritable.java   |   2 +-
 .../NonSyncChunkedByteArrayOutputStream.java    | 229 -------------------
 .../org/apache/tajo/storage/rcfile/RCFile.java  | 104 ++++-----
 .../tajo/storage/trevni/TrevniScanner.java      |   1 -
 .../apache/tajo/storage/v2/CSVFileScanner.java  |   7 +-
 .../tajo/storage/v2/DiskFileScanScheduler.java  |   2 +-
 .../apache/tajo/storage/v2/DiskMountInfo.java   |  17 +-
 .../org/apache/tajo/storage/v2/DiskUtil.java    |   8 +-
 .../apache/tajo/storage/v2/FileScanRunner.java  |  13 +-
 .../java/org/apache/tajo/storage/v2/RCFile.java | 134 ++++++-----
 .../apache/tajo/storage/v2/RCFileScanner.java   |  30 +--
 .../apache/tajo/storage/v2/ScanScheduler.java   |   2 +-
 .../tajo/storage/v2/ScheduledInputStream.java   |   4 +-
 .../tajo/storage/v2/StorageManagerV2.java       |   2 +-
 .../org/apache/tajo/storage/TestLazyTuple.java  |  20 ++
 .../org/apache/tajo/storage/TestVTuple.java     |  19 ++
 .../org/apache/tajo/jdbc/MetaDataTuple.java     |   5 +
 50 files changed, 426 insertions(+), 573 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ac8c77c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a867658..0f82e9b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -376,6 +376,8 @@ Release 0.8.0 - unreleased
 
   TASKS
 
+    TAJO-536: Fix warnings in tajo-core-storage. (jinho)
+
     TAJO-545: MySQLStore Documentation. (jaehwa)
 
     TAJO-526: HCatalogStore Documentation. (jaehwa)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ac8c77c/dev-support/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/dev-support/findbugs-exclude.xml b/dev-support/findbugs-exclude.xml
index 99f316c..e30545a 100644
--- a/dev-support/findbugs-exclude.xml
+++ b/dev-support/findbugs-exclude.xml
@@ -25,6 +25,29 @@
 
   <Match>
     <!--
+    This method contains a call to java.lang.Object.wait() which is not in a loop.  If the monitor is used for
+    multiple conditions, the condition the caller intended to wait for might not be the one that actually occurred.
+    -->
+
+    <!-- We will refactor the async storage engine-->
+    <Class name="org.apache.tajo.storage.v2.FileScannerV2"/>
+    <Bug pattern="WA_NOT_IN_LOOP"/>
+  </Match>
+
+  <Match>
+    <!--
+    This method contains a call to java.lang.Object.wait() which is not in a loop.  If the monitor is used for
+    multiple conditions, the condition the caller intended to wait for might not be the one that actually occurred.
+    -->
+
+    <!-- We will refactor the async storage engine-->
+    <Class name="org.apache.tajo.storage.v2.ScheduledInputStream"/>
+    <Bug pattern="WA_NOT_IN_LOOP"/>
+  </Match>
+
+
+  <Match>
+    <!--
     A mutable static field could be changed by malicious code or by accident. The field could be
     made package protected to avoid this vulnerability.
     !-->
@@ -139,13 +162,4 @@
     !-->
     <Bug pattern="MS_MUTABLE_ARRAY"/>
   </Match>
-
-  <Match>
-    <!--
-     The logic explicitly checks equality of two floating point numbers. Ignore the warning
-    !-->
-    <Class name="org.apache.hadoop.hbase.master.AssignmentVerificationReport"/>
-    <Bug pattern="FE_FLOATING_POINT_EQUALITY"/>
-  </Match>
-
 </FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ac8c77c/tajo-common/src/main/java/org/apache/tajo/datum/BitDatum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/BitDatum.java b/tajo-common/src/main/java/org/apache/tajo/datum/BitDatum.java
index 9838570..5194420 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/BitDatum.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/BitDatum.java
@@ -25,14 +25,10 @@ import org.apache.tajo.util.NumberUtil;
 
 public class BitDatum extends Datum {
   private static final int size = 1;
-  @Expose	byte val;
-	
-	public BitDatum() {
-		super(TajoDataTypes.Type.BIT);
-	}
+  @Expose	final byte val;
 	
 	public BitDatum(byte val) {
-		this();
+		super(TajoDataTypes.Type.BIT);
 		this.val = val;
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ac8c77c/tajo-common/src/main/java/org/apache/tajo/datum/BlobDatum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/BlobDatum.java b/tajo-common/src/main/java/org/apache/tajo/datum/BlobDatum.java
index 1ba8a64..4c4f557 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/BlobDatum.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/BlobDatum.java
@@ -31,29 +31,25 @@ import java.util.Arrays;
 import static org.apache.tajo.common.TajoDataTypes.Type.BLOB;
 
 public class BlobDatum extends Datum {
-	@Expose private byte [] val;
+	@Expose private final byte [] val;
 	private ByteBuffer bb = null;
 
-	public BlobDatum() {
-		super(BLOB);
-	}
-	
 	public BlobDatum(byte[] val) {
-		this();
+    super(BLOB);
 		this.val = val;
 		this.bb = ByteBuffer.wrap(val);	
 		bb.flip();
 	}
 
   public BlobDatum(byte[] val, int offset, int length) {
-    this();
+    super(BLOB);
     this.val = val;
     this.bb = ByteBuffer.wrap(val, offset, length);
     bb.flip();
   }
 	
 	public BlobDatum(ByteBuffer val) {
-		this();
+    super(BLOB);
 		this.val = val.array();
 		this.bb = val.duplicate();
 		bb.flip();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ac8c77c/tajo-common/src/main/java/org/apache/tajo/datum/BooleanDatum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/BooleanDatum.java b/tajo-common/src/main/java/org/apache/tajo/datum/BooleanDatum.java
index d975cd8..33a2a69 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/BooleanDatum.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/BooleanDatum.java
@@ -23,7 +23,7 @@ import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.datum.exception.InvalidOperationException;
 
 public class BooleanDatum extends Datum {
-  @Expose private boolean val;
+  @Expose private final boolean val;
   public static final String TRUE_STRING ="t";
   public static final String FALSE_STRING ="f";
   public static final BooleanDatum TRUE = new BooleanDatum(true);
@@ -54,10 +54,6 @@ public class BooleanDatum extends Datum {
       new Datum [] {NullDatum.get(), TRUE, FALSE          }  // false
   };
 
-  protected BooleanDatum() {
-    super(TajoDataTypes.Type.BOOLEAN);
-  }
-
   protected BooleanDatum(boolean val) {
     super(TajoDataTypes.Type.BOOLEAN);
     this.val = val;
@@ -69,7 +65,7 @@ public class BooleanDatum extends Datum {
   }
 
   protected BooleanDatum(int byteVal) {
-    this();
+    super(TajoDataTypes.Type.BOOLEAN);
     this.val = byteVal == TRUE_INT;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ac8c77c/tajo-common/src/main/java/org/apache/tajo/datum/CharDatum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/CharDatum.java b/tajo-common/src/main/java/org/apache/tajo/datum/CharDatum.java
index 3987078..ba66320 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/CharDatum.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/CharDatum.java
@@ -27,16 +27,12 @@ import java.util.Arrays;
 import static org.apache.tajo.common.TajoDataTypes.Type;
 
 public class CharDatum extends Datum {
-  @Expose private int size;
-  @Expose private byte[] bytes;
+  @Expose private final int size;
+  @Expose private final byte[] bytes;
   private String chars = null;
 
-	public CharDatum() {
-		super(Type.CHAR);
-	}
-
 	public CharDatum(byte val) {
-    this();
+    super(Type.CHAR);
     this.size = 1;
     bytes = new byte[size];
     bytes[0] = val;
@@ -47,7 +43,7 @@ public class CharDatum extends Datum {
   }
 
   public CharDatum(byte [] bytes) {
-    this();
+    super(Type.CHAR);
     this.bytes = bytes;
     this.size = bytes.length;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ac8c77c/tajo-common/src/main/java/org/apache/tajo/datum/DateDatum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/DateDatum.java b/tajo-common/src/main/java/org/apache/tajo/datum/DateDatum.java
index 89a4a99..5e0c26d 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/DateDatum.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/DateDatum.java
@@ -31,7 +31,7 @@ public class DateDatum extends Datum {
   /** ISO 8601/SQL standard format - ex) 1997-12-17 */
   public static final String DEFAULT_FORMAT_STRING = "yyyy-MM-dd";
   private static final DateTimeFormatter DEFAULT_FORMATTER = DateTimeFormat.forPattern(DEFAULT_FORMAT_STRING);
-  private LocalDate date;
+  private final LocalDate date;
 
   public DateDatum(int value) {
     super(TajoDataTypes.Type.DATE);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ac8c77c/tajo-common/src/main/java/org/apache/tajo/datum/Datum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/Datum.java b/tajo-common/src/main/java/org/apache/tajo/datum/Datum.java
index 0b30e87..aee5621 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/Datum.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/Datum.java
@@ -27,11 +27,7 @@ import org.apache.tajo.json.GsonObject;
 import static org.apache.tajo.common.TajoDataTypes.Type;
 
 public abstract class Datum implements Comparable<Datum>, GsonObject {
-  @Expose	private Type type;
-
-  @SuppressWarnings("unused")
-  private Datum() {
-  }
+  @Expose	private final Type type;
 
   public Datum(Type type) {
     this.type = type;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ac8c77c/tajo-common/src/main/java/org/apache/tajo/datum/Float4Datum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/Float4Datum.java b/tajo-common/src/main/java/org/apache/tajo/datum/Float4Datum.java
index 9754b0a..0c65dfc 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/Float4Datum.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/Float4Datum.java
@@ -28,19 +28,15 @@ import java.nio.ByteBuffer;
 
 public class Float4Datum extends NumericDatum {
   private static final int size = 4;
-  @Expose float val;
-
-	public Float4Datum() {
-		super(TajoDataTypes.Type.FLOAT4);
-	}
+  @Expose final float val;
 
 	public Float4Datum(float val) {
-		this();
+    super(TajoDataTypes.Type.FLOAT4);
 		this.val = val;
 	}
 
   public Float4Datum(byte[] bytes) {
-    this();
+    super(TajoDataTypes.Type.FLOAT4);
     ByteBuffer bb = ByteBuffer.wrap(bytes);
     this.val = bb.getFloat();
   }
@@ -302,7 +298,7 @@ public class Float4Datum extends NumericDatum {
   }
 
   @Override
-  public void inverseSign() {
-    this.val = - val;
+  public NumericDatum inverseSign() {
+    return new Float4Datum(-val);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ac8c77c/tajo-common/src/main/java/org/apache/tajo/datum/Float8Datum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/Float8Datum.java b/tajo-common/src/main/java/org/apache/tajo/datum/Float8Datum.java
index 782bfa3..5566b3a 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/Float8Datum.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/Float8Datum.java
@@ -32,19 +32,15 @@ import java.nio.ByteBuffer;
 
 public class Float8Datum extends NumericDatum {
   private static final int size = 8;
-  @Expose private double val;
-
-	public Float8Datum() {
-		super(TajoDataTypes.Type.FLOAT8);
-	}
+  @Expose private final double val;
 
 	public Float8Datum(double val) {
-		this();
+    super(TajoDataTypes.Type.FLOAT8);
 		this.val = val;
 	}
 
   public Float8Datum(byte[] bytes) {
-    this();
+    super(TajoDataTypes.Type.FLOAT8);
     ByteBuffer bb = ByteBuffer.wrap(bytes);
     this.val = bb.getDouble();
   }
@@ -294,7 +290,7 @@ public class Float8Datum extends NumericDatum {
   }
 
   @Override
-  public void inverseSign() {
-    this.val = -val;
+  public NumericDatum inverseSign() {
+    return new Float8Datum(-val);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ac8c77c/tajo-common/src/main/java/org/apache/tajo/datum/Inet4Datum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/Inet4Datum.java b/tajo-common/src/main/java/org/apache/tajo/datum/Inet4Datum.java
index fe06d55..0663f0a 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/Inet4Datum.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/Inet4Datum.java
@@ -27,37 +27,33 @@ import static org.apache.tajo.common.TajoDataTypes.Type;
 
 public class Inet4Datum extends Datum {
   private static final int size = 4;
-  @Expose private int address;
-
-	public Inet4Datum() {
-		super(Type.INET4);
-	}
+  @Expose private final int address;
 
 	public Inet4Datum(String addr) {
-		this();
+    super(Type.INET4);
 		String [] elems = addr.split("\\.");
-		address  = Integer.valueOf(elems[3]) & 0xFF;
-    address |= ((Integer.valueOf(elems[2]) << 8) & 0xFF00);
-    address |= ((Integer.valueOf(elems[1]) << 16) & 0xFF0000);
-    address |= ((Integer.valueOf(elems[0]) << 24) & 0xFF000000);
-	}
+    address = Integer.valueOf(elems[3]) & 0xFF
+        | ((Integer.valueOf(elems[2]) << 8) & 0xFF00)
+        | ((Integer.valueOf(elems[1]) << 16) & 0xFF0000)
+        | ((Integer.valueOf(elems[0]) << 24) & 0xFF000000);
+  }
 
 	public Inet4Datum(byte[] addr) {
-		this();
+    super(Type.INET4);
 		Preconditions.checkArgument(addr.length == size);
-		address  = addr[3] & 0xFF;
-    address |= ((addr[2] << 8) & 0xFF00);
-    address |= ((addr[1] << 16) & 0xFF0000);
-    address |= ((addr[0] << 24) & 0xFF000000);
-	}
+    address = addr[3] & 0xFF
+        | ((addr[2] << 8) & 0xFF00)
+        | ((addr[1] << 16) & 0xFF0000)
+        | ((addr[0] << 24) & 0xFF000000);
+  }
 
   public Inet4Datum(byte[] addr, int offset, int length) {
-    this();
+    super(Type.INET4);
     Preconditions.checkArgument(length == size);
-    address  = addr[offset + 3] & 0xFF;
-    address |= ((addr[offset + 2] << 8) & 0xFF00);
-    address |= ((addr[offset + 1] << 16) & 0xFF0000);
-    address |= ((addr[offset] << 24) & 0xFF000000);
+    address = addr[offset + 3] & 0xFF
+        | ((addr[offset + 2] << 8) & 0xFF00)
+        | ((addr[offset + 1] << 16) & 0xFF0000)
+        | ((addr[offset] << 24) & 0xFF000000);
   }
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ac8c77c/tajo-common/src/main/java/org/apache/tajo/datum/Int2Datum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/Int2Datum.java b/tajo-common/src/main/java/org/apache/tajo/datum/Int2Datum.java
index a3b4b2f..e24d7be 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/Int2Datum.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/Int2Datum.java
@@ -28,19 +28,15 @@ import java.nio.ByteBuffer;
 
 public class Int2Datum extends NumericDatum {
   private static final int size = 2;
-  @Expose private short val;
-
-  public Int2Datum() {
-    super(TajoDataTypes.Type.INT2);
-  }
+  @Expose private final short val;
 
 	public Int2Datum(short val) {
-		this();
+    super(TajoDataTypes.Type.INT2);
 		this.val = val;
 	}
 
   public Int2Datum(byte[] bytes) {
-    this();
+    super(TajoDataTypes.Type.INT2);
     ByteBuffer bb = ByteBuffer.wrap(bytes);
     this.val = bb.getShort();
   }
@@ -293,7 +289,7 @@ public class Int2Datum extends NumericDatum {
   }
 
   @Override
-  public void inverseSign() {
-    this.val = (short) -val;
+  public NumericDatum inverseSign() {
+    return new Int2Datum((short) -val);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ac8c77c/tajo-common/src/main/java/org/apache/tajo/datum/Int4Datum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/Int4Datum.java b/tajo-common/src/main/java/org/apache/tajo/datum/Int4Datum.java
index 9eb02e1..9cd0c06 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/Int4Datum.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/Int4Datum.java
@@ -28,19 +28,15 @@ import java.nio.ByteBuffer;
 
 public class Int4Datum extends NumericDatum {
   private static final int size = 4;
-  @Expose private int val;
-
-	public Int4Datum() {
-		super(Type.INT4);
-	}
+  @Expose private final int val;
 
 	public Int4Datum(int val) {
-		this();
+    super(Type.INT4);
 		this.val = val;
 	}
 
   public Int4Datum(byte[] bytes) {
-    this();
+    super(Type.INT4);
     ByteBuffer bb = ByteBuffer.wrap(bytes);
     this.val = bb.getInt();
   }
@@ -297,7 +293,7 @@ public class Int4Datum extends NumericDatum {
   }
 
   @Override
-  public void inverseSign() {
-    this.val = - val;
+  public NumericDatum inverseSign() {
+    return new Int4Datum(-val);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ac8c77c/tajo-common/src/main/java/org/apache/tajo/datum/Int8Datum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/Int8Datum.java b/tajo-common/src/main/java/org/apache/tajo/datum/Int8Datum.java
index c46106f..1104a3c 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/Int8Datum.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/Int8Datum.java
@@ -29,19 +29,15 @@ import java.nio.ByteBuffer;
 
 public class Int8Datum extends NumericDatum {
   private static final int size = 8;
-  @Expose private long val;
+  @Expose private final long val;
 
-	public Int8Datum() {
-		super(TajoDataTypes.Type.INT8);
-	}
-	
 	public Int8Datum(long val) {
-		this();
+    super(TajoDataTypes.Type.INT8);
 		this.val = val;
 	}
 
   public Int8Datum(byte[] bytes) {
-    this();
+    super(TajoDataTypes.Type.INT8);
     ByteBuffer bb = ByteBuffer.wrap(bytes);
     val = bb.getLong();
   }
@@ -305,7 +301,7 @@ public class Int8Datum extends NumericDatum {
   }
 
   @Override
-  public void inverseSign() {
-    this.val = -val;
+  public NumericDatum inverseSign() {
+    return new Int8Datum(-val);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ac8c77c/tajo-common/src/main/java/org/apache/tajo/datum/NumericDatum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/NumericDatum.java b/tajo-common/src/main/java/org/apache/tajo/datum/NumericDatum.java
index b12ae91..e021781 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/NumericDatum.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/NumericDatum.java
@@ -35,5 +35,5 @@ public abstract class NumericDatum extends Datum {
   
   public abstract Datum divide(Datum datum);
   
-  public abstract void inverseSign();
+  public abstract NumericDatum inverseSign();
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ac8c77c/tajo-common/src/main/java/org/apache/tajo/datum/ProtobufDatum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/ProtobufDatum.java b/tajo-common/src/main/java/org/apache/tajo/datum/ProtobufDatum.java
index 46df4f0..822a1e9 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/ProtobufDatum.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/ProtobufDatum.java
@@ -21,21 +21,13 @@ import com.google.protobuf.Message;
 import org.apache.tajo.common.TajoDataTypes;
 
 public class ProtobufDatum extends Datum {
-  private Message value;
-
-  public ProtobufDatum() {
-    super(TajoDataTypes.Type.PROTOBUF);
-  }
+  private final Message value;
 
   public ProtobufDatum(Message message) {
     super(TajoDataTypes.Type.PROTOBUF);
     this.value = message;
   }
 
-  public void set(Message message) {
-    this.value = message;
-  }
-
   public Message get() {
     return value;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ac8c77c/tajo-common/src/main/java/org/apache/tajo/datum/TextDatum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/TextDatum.java b/tajo-common/src/main/java/org/apache/tajo/datum/TextDatum.java
index fba3fe9..6eb056f 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/TextDatum.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/TextDatum.java
@@ -28,18 +28,14 @@ import java.util.Arrays;
 import java.util.Comparator;
 
 public class TextDatum extends Datum {
-  @Expose private int size;
-  @Expose private byte [] bytes;
+  @Expose private final int size;
+  @Expose private final byte[] bytes;
 
   public static final TextDatum EMPTY_TEXT = new TextDatum("");
   public static final Comparator<byte[]> COMPARATOR = UnsignedBytes.lexicographicalComparator();
 
-  public TextDatum() {
-    super(TajoDataTypes.Type.TEXT);
-  }
-
   public TextDatum(byte[] bytes) {
-    this();
+    super(TajoDataTypes.Type.TEXT);
     this.bytes = bytes;
     this.size = bytes.length;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ac8c77c/tajo-common/src/main/java/org/apache/tajo/datum/TimeDatum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/TimeDatum.java b/tajo-common/src/main/java/org/apache/tajo/datum/TimeDatum.java
index 9f67249..55023da 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/TimeDatum.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/TimeDatum.java
@@ -31,7 +31,7 @@ public class TimeDatum extends Datum {
   /** ISO 8601/SQL standard format - ex) 07:37:16-08 */
   public static final String DEFAULT_FORMAT_STRING = "HH:mm:ss";
   private static final DateTimeFormatter DEFAULT_FORMATTER = DateTimeFormat.forPattern(DEFAULT_FORMAT_STRING);
-  private LocalTime time;
+  private final LocalTime time;
 
   public TimeDatum(long value) {
     super(TajoDataTypes.Type.TIME);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ac8c77c/tajo-common/src/main/java/org/apache/tajo/datum/TimestampDatum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/TimestampDatum.java b/tajo-common/src/main/java/org/apache/tajo/datum/TimestampDatum.java
index 039439c..3ff1fca 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/TimestampDatum.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/TimestampDatum.java
@@ -33,7 +33,7 @@ public class TimestampDatum extends Datum {
   public static final String FRACTION_FORMAT_STRING = "yyyy-MM-dd HH:mm:ss.SSS";
   private static final DateTimeFormatter DEFAULT_FORMATTER = DateTimeFormat.forPattern(DEFAULT_FORMAT_STRING);
   private static final DateTimeFormatter FRACTION_FORMATTER = DateTimeFormat.forPattern(FRACTION_FORMAT_STRING);
-  private DateTime dateTime;
+  private final DateTime dateTime;
 
   public TimestampDatum(int timestamp) {
     super(TajoDataTypes.Type.TIMESTAMP);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ac8c77c/tajo-common/src/main/java/org/apache/tajo/util/Bytes.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/Bytes.java b/tajo-common/src/main/java/org/apache/tajo/util/Bytes.java
index cfabf21..0b5a470 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/Bytes.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/Bytes.java
@@ -1775,4 +1775,29 @@ public class Bytes {
     }
     return nread;
   }
+
+  /**
+   * Similar to readFully(). Skips bytes in a loop.
+   * @param in The DataInput to skip bytes from
+   * @param len number of bytes to skip.
+   * @throws IOException if it could not skip requested number of bytes
+   * for any reason (including EOF)
+   */
+  public static void skipFully(DataInput in, int len) throws IOException {
+    int amt = len;
+    while (amt > 0) {
+      long ret = in.skipBytes(amt);
+      if (ret == 0) {
+        // skip may return 0 even if we're not at EOF.  Luckily, we can
+        // use the read() method to figure out if we're at the end.
+        int b = in.readByte();
+        if (b == -1) {
+          throw new EOFException( "Premature EOF from inputStream after " +
+              "skipping " + (len - amt) + " byte(s).");
+        }
+        ret = 1;
+      }
+      amt -= ret;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ac8c77c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/SignedEval.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/SignedEval.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/SignedEval.java
index e0f50c1..3c2eeb4 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/SignedEval.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/SignedEval.java
@@ -57,7 +57,7 @@ public class SignedEval extends EvalNode implements Cloneable {
   public Datum eval(Schema schema, Tuple tuple) {
     NumericDatum result = childEval.eval(schema, tuple);
     if (negative) {
-      result.inverseSign();
+      return result.inverseSign();
     }
     return result;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ac8c77c/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/storage/Tuple.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/storage/Tuple.java b/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/storage/Tuple.java
index 711666f..3b6a550 100644
--- a/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/storage/Tuple.java
+++ b/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/storage/Tuple.java
@@ -77,4 +77,6 @@ public interface Tuple extends Cloneable {
   public TextDatum getText(int fieldId);
 
   public Tuple clone() throws CloneNotSupportedException;
+
+  public Datum[] getValues();
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ac8c77c/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java
index 2ff1a63..91a535e 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java
@@ -198,7 +198,6 @@ public abstract class AbstractStorageManager {
 
   public FileFragment[] splitBroadcastTable(Path tablePath) throws IOException {
     FileSystem fs = tablePath.getFileSystem(conf);
-    TableMeta meta = getTableMeta(tablePath);
     List<FileFragment> listTablets = new ArrayList<FileFragment>();
     FileFragment tablet;
 
@@ -227,7 +226,6 @@ public abstract class AbstractStorageManager {
       throws IOException {
     FileSystem fs = tablePath.getFileSystem(conf);
 
-    TableMeta meta = getTableMeta(tablePath);
     long defaultBlockSize = size;
     List<FileFragment> listTablets = new ArrayList<FileFragment>();
     FileFragment tablet;
@@ -613,9 +611,24 @@ public abstract class AbstractStorageManager {
     return splits;
   }
 
-  private class InvalidInputException extends IOException {
-    public InvalidInputException(
-        List<IOException> errors) {
+  private static class InvalidInputException extends IOException {
+    List<IOException> errors;
+    public InvalidInputException(List<IOException> errors) {
+      this.errors = errors;
+    }
+
+    @Override
+    public String getMessage(){
+       StringBuffer sb = new StringBuffer();
+      int messageLimit = Math.min(errors.size(), 10);
+      for (int i = 0; i < messageLimit ; i ++) {
+        sb.append(errors.get(i).getMessage()).append("\n");
+      }
+
+      if(messageLimit < errors.size())
+        sb.append("skipped .....").append("\n");
+
+      return sb.toString();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ac8c77c/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java
index 8979b4b..f05a316 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java
@@ -33,7 +33,7 @@ import java.net.InetAddress;
  * It contains two tuples and pretends to be one instance of Tuple for
  * join qual evaluatations.
  */
-public class FrameTuple implements Tuple {
+public class FrameTuple implements Tuple, Cloneable {
   private int size;
   private int leftSize;
   
@@ -199,7 +199,14 @@ public class FrameTuple implements Tuple {
 
   @Override
   public Tuple clone() throws CloneNotSupportedException {
-    return new FrameTuple(this.left.clone(), this.right.clone());
+    FrameTuple frameTuple = (FrameTuple) super.clone();
+    frameTuple.set(this.left.clone(), this.right.clone());
+    return frameTuple;
+  }
+
+  @Override
+  public Datum[] getValues(){
+    throw new UnsupportedException();
   }
 
   public String toString() {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ac8c77c/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
index 9967d7a..4d484df 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
@@ -25,7 +25,7 @@ import org.apache.tajo.datum.exception.InvalidCastException;
 import java.net.InetAddress;
 import java.util.Arrays;
 
-public class LazyTuple implements Tuple {
+public class LazyTuple implements Tuple, Cloneable {
   private long offset;
   private Datum[] values;
   private byte[][] textBytes;
@@ -47,12 +47,12 @@ public class LazyTuple implements Tuple {
   }
 
   public LazyTuple(LazyTuple tuple) {
-    this.values = new Datum[tuple.size()];
-    System.arraycopy(tuple.values, 0, values, 0, tuple.size());
+    this.values = tuple.getValues();
     this.offset = tuple.offset;
     this.schema = tuple.schema;
-    this.textBytes = tuple.textBytes.clone();
+    this.textBytes = new byte[size()][];
     this.nullBytes = tuple.nullBytes;
+    this.serializeDeserialize = tuple.serializeDeserialize;
   }
 
   @Override
@@ -262,7 +262,8 @@ public class LazyTuple implements Tuple {
     return hashCode;
   }
 
-  public Datum[] toArray() {
+  @Override
+  public Datum[] getValues() {
     Datum[] datums = new Datum[values.length];
     for (int i = 0; i < values.length; i++) {
       datums[i] = get(i);
@@ -272,17 +273,18 @@ public class LazyTuple implements Tuple {
 
   @Override
   public Tuple clone() throws CloneNotSupportedException {
-    return new LazyTuple(this);
+    LazyTuple lazyTuple = (LazyTuple) super.clone();
+
+    lazyTuple.values = getValues(); //shallow copy
+    lazyTuple.textBytes = new byte[size()][];
+    return lazyTuple;
   }
 
   @Override
   public boolean equals(Object obj) {
-    if (obj instanceof LazyTuple) {
-      LazyTuple other = (LazyTuple) obj;
-      return Arrays.equals(toArray(), other.toArray());
-    } else if (obj instanceof VTuple) {
-      VTuple other = (VTuple) obj;
-      return Arrays.equals(toArray(), other.values);
+    if (obj instanceof Tuple) {
+      Tuple other = (Tuple) obj;
+      return Arrays.equals(getValues(), other.getValues());
     }
     return false;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ac8c77c/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RowFile.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RowFile.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RowFile.java
index bbe0529..1e89f31 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RowFile.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RowFile.java
@@ -35,6 +35,7 @@ import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.storage.exception.AlreadyExistsStorageException;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.util.BitArray;
+import org.apache.tajo.util.Bytes;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -64,7 +65,6 @@ public class RowFile {
     private ByteBuffer buffer;
     private final int tupleHeaderSize;
     private BitArray nullFlags;
-    private int numBitsOfNullFlags;
     private long bufferStartPos;
 
     public RowFileScanner(Configuration conf, final Schema schema, final TableMeta meta, final FileFragment fragment)
@@ -74,9 +74,9 @@ public class RowFile {
       SYNC_INTERVAL =
           conf.getInt(ConfVars.RAWFILE_SYNC_INTERVAL.varname,
               SYNC_SIZE * 100);
-      numBitsOfNullFlags = (int) Math.ceil(((double)schema.getColumnNum()));
-      nullFlags = new BitArray(numBitsOfNullFlags);
-      tupleHeaderSize = nullFlags.bytesLength() + (2 * Short.SIZE/8);
+
+      nullFlags = new BitArray(schema.getColumnNum());
+      tupleHeaderSize = nullFlags.bytesLength() + (2 * Short.SIZE / 8);
       this.start = fragment.getStartKey();
       this.end = this.start + fragment.getEndKey();
     }
@@ -122,7 +122,7 @@ public class RowFile {
 
     private void readHeader() throws IOException {
       SYNC_INTERVAL = in.readInt();
-      in.read(this.sync, 0, SYNC_HASH_SIZE);
+      Bytes.readFully(in, this.sync, 0, SYNC_HASH_SIZE);
     }
 
     /**
@@ -320,8 +320,6 @@ public class RowFile {
     private ByteBuffer buffer;
 
     private BitArray nullFlags;
-    private int numBitsOfNullFlags;
-
     // statistics
     private TableStatistics stats;
 
@@ -361,8 +359,7 @@ public class RowFile {
 
       buffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
 
-      numBitsOfNullFlags = (int) Math.ceil(((double)schema.getColumnNum()));
-      nullFlags = new BitArray(numBitsOfNullFlags);
+      nullFlags = new BitArray(schema.getColumnNum());
 
       if (enabledStats) {
         this.stats = new TableStatistics(this.schema);
@@ -436,6 +433,7 @@ public class RowFile {
               break;
             case INET6:
               buffer.put(t.getIPv6Bytes(i));
+              break;
             case NULL_TYPE:
               nullFlags.set(i);
               break;
@@ -490,7 +488,7 @@ public class RowFile {
       }
     }
 
-    synchronized void checkAndWriteSync() throws IOException {
+    private void checkAndWriteSync() throws IOException {
       if (out.getPos() >= lastSyncPos + SYNC_INTERVAL) {
         sync();
       }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ac8c77c/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java
index 9995a4a..07ea79b 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java
@@ -31,8 +31,8 @@ import java.io.OutputStream;
 
 //Compatibility with Apache Hive
 public class TextSerializerDeserializer implements SerializerDeserializer {
-  public static byte[] trueBytes = "true".getBytes();
-  public static byte[] falseBytes = "false".getBytes();
+  public static final byte[] trueBytes = "true".getBytes();
+  public static final byte[] falseBytes = "false".getBytes();
   private ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance();
 
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ac8c77c/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Tuple.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Tuple.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Tuple.java
index 6d0e833..ba35988 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Tuple.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Tuple.java
@@ -77,4 +77,6 @@ public interface Tuple extends Cloneable {
   public TextDatum getText(int fieldId);
 
   public Tuple clone() throws CloneNotSupportedException;
+
+  public Datum[] getValues();
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ac8c77c/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java
index f6f5f81..69c1c04 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.storage;
 
+import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.SortSpec;
@@ -114,6 +115,11 @@ public class TupleComparator implements Comparator<Tuple>, ProtoObject<TupleComp
   }
 
   @Override
+  public int hashCode() {
+    return Objects.hashCode(sortKeyIds);
+  }
+
+  @Override
   public boolean equals(Object obj) {
     if (obj instanceof TupleComparator) {
       TupleComparator other = (TupleComparator) obj;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ac8c77c/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/VTuple.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/VTuple.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/VTuple.java
index 48d6fd1..878c05e 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/VTuple.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/VTuple.java
@@ -25,12 +25,12 @@ import org.apache.tajo.datum.exception.InvalidCastException;
 import java.net.InetAddress;
 import java.util.Arrays;
 
-public class VTuple implements Tuple {
+public class VTuple implements Tuple, Cloneable {
 	@Expose public Datum [] values;
 	@Expose private long offset;
 	
 	public VTuple(int size) {
-		values = new Datum [size];
+		values = new Datum[size];
 	}
 
   public VTuple(Tuple tuple) {
@@ -169,7 +169,11 @@ public class VTuple implements Tuple {
 
   @Override
   public Tuple clone() throws CloneNotSupportedException {
-    return new VTuple(this);
+    VTuple tuple = (VTuple) super.clone();
+
+    tuple.values = new Datum[size()];
+    System.arraycopy(values, 0, tuple.values, 0, size()); //shallow copy
+    return tuple;
   }
 
   public String toString() {
@@ -207,15 +211,16 @@ public class VTuple implements Tuple {
 	}
 
   @Override
+  public Datum[] getValues() {
+    return values;
+  }
+
+  @Override
   public boolean equals(Object obj) {
-    if (obj instanceof VTuple) {
-      VTuple other = (VTuple) obj;
-      return Arrays.equals(values, other.values);
-    } else if (obj instanceof LazyTuple) {
-      LazyTuple other = (LazyTuple) obj;
-      return Arrays.equals(values, other.toArray());
+    if (obj instanceof Tuple) {
+      Tuple other = (Tuple) obj;
+      return Arrays.equals(getValues(), other.getValues());
     }
-
     return false;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ac8c77c/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java
index bbf98e3..ea8bf9f 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java
@@ -34,7 +34,7 @@ import java.util.List;
 import static org.apache.tajo.catalog.proto.CatalogProtos.FileFragmentProto;
 import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
 
-public class FileFragment implements Fragment, Comparable<FileFragment> {
+public class FileFragment implements Fragment, Comparable<FileFragment>, Cloneable {
   @Expose private String tableName; // required
   @Expose private Path uri; // required
   @Expose private Long startOffset; // required

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ac8c77c/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java
index 7f9f68e..bc8fe96 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java
@@ -33,6 +33,7 @@ import org.apache.tajo.storage.TupleComparator;
 import org.apache.tajo.storage.index.IndexMethod;
 import org.apache.tajo.storage.index.IndexWriter;
 import org.apache.tajo.storage.index.OrderIndexReader;
+import org.apache.tajo.util.Bytes;
 
 import java.io.Closeable;
 import java.io.FileNotFoundException;
@@ -245,13 +246,6 @@ public class BSTIndex implements IndexMethod {
         keySet.clear();
         rootCollector.clear();
       }
-
-      if (out != null) {
-        out = null;
-      }
-      if (rootOut != null) {
-        rootOut = null;
-      }
     }
 
     private class KeyOffsetCollector {
@@ -305,7 +299,7 @@ public class BSTIndex implements IndexMethod {
     private int offsetCursor;
 
     // mutex
-    private final Integer mutex = 0;
+    private final Object mutex = new Object();
 
     /**
      *
@@ -336,7 +330,8 @@ public class BSTIndex implements IndexMethod {
       // schema
       int schemaByteSize = indexIn.readInt();
       byte [] schemaBytes = new byte[schemaByteSize];
-      indexIn.read(schemaBytes);
+      Bytes.readFully(indexIn, schemaBytes, 0, schemaByteSize);
+
       SchemaProto.Builder builder = SchemaProto.newBuilder();
       builder.mergeFrom(schemaBytes);
       SchemaProto proto = builder.build();
@@ -345,7 +340,8 @@ public class BSTIndex implements IndexMethod {
       // comparator
       int compByteSize = indexIn.readInt();
       byte [] compBytes = new byte[compByteSize];
-      indexIn.read(compBytes);
+      Bytes.readFully(indexIn, compBytes, 0, compByteSize);
+
       TupleComparatorProto.Builder compProto = TupleComparatorProto.newBuilder();
       compProto.mergeFrom(compBytes);
       this.comparator = new TupleComparator(compProto.build());
@@ -356,10 +352,11 @@ public class BSTIndex implements IndexMethod {
       this.entryNum = indexIn.readInt();
       if (entryNum > 0) { // if there is no any entry, do not read firstKey/lastKey values
         byte [] minBytes = new byte[indexIn.readInt()];
-        indexIn.read(minBytes);
+        Bytes.readFully(indexIn, minBytes, 0, minBytes.length);
         this.firstKey = RowStoreUtil.RowStoreDecoder.toTuple(keySchema, minBytes);
+
         byte [] maxBytes = new byte[indexIn.readInt()];
-        indexIn.read(maxBytes);
+        Bytes.readFully(indexIn, maxBytes, 0, maxBytes.length);
         this.lastKey = RowStoreUtil.RowStoreDecoder.toTuple(keySchema, maxBytes);
       }
     }
@@ -483,7 +480,7 @@ public class BSTIndex implements IndexMethod {
         for (int i = 0; i < entryNum; i++) {
           counter++;
           buf = new byte[in.readInt()];
-          in.read(buf);
+          Bytes.readFully(in, buf, 0, buf.length);
           dataSubIndex[i] = RowStoreUtil.RowStoreDecoder.toTuple(keySchema, buf);
 
           int offsetNum = in.readInt();
@@ -505,7 +502,7 @@ public class BSTIndex implements IndexMethod {
         byte[] buf;
         for (int i = 0; i < counter; i++) {
           buf = new byte[in.readInt()];
-          in.read(buf);
+          Bytes.readFully(in, buf, 0, buf.length);
           dataSubIndex[i] = RowStoreUtil.RowStoreDecoder.toTuple(keySchema, buf);
 
           int offsetNum = in.readInt();
@@ -534,7 +531,7 @@ public class BSTIndex implements IndexMethod {
       byte[] buf;
       for (int i = 0; i < entryNum; i++) {
         buf = new byte[in.readInt()];
-        in.read(buf);
+        Bytes.readFully(in, buf, 0, buf.length);
         keyTuple = RowStoreUtil.RowStoreDecoder.toTuple(keySchema, buf);
         dataIndex[i] = keyTuple;
         this.offsetIndex[i] = in.readLong();
@@ -574,7 +571,9 @@ public class BSTIndex implements IndexMethod {
       int offset = -1;
       int start = startPos;
       int end = endPos;
-      int centerPos = (start + end) / 2;
+
+      //http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6412541
+      int centerPos = (start + end) >>> 1;
       while (true) {
         if (comparator.compare(arr[centerPos], key) > 0) {
           if (centerPos == 0) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ac8c77c/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/rcfile/BytesRefArrayWritable.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/rcfile/BytesRefArrayWritable.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/rcfile/BytesRefArrayWritable.java
index 512b074..5e200a0 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/rcfile/BytesRefArrayWritable.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/rcfile/BytesRefArrayWritable.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.storage.rcfile;
 
+import com.google.common.base.Objects;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableFactories;
 import org.apache.hadoop.io.WritableFactory;
@@ -151,6 +152,10 @@ public class BytesRefArrayWritable implements Writable,
     return 0;
   }
 
+  @Override
+  public int hashCode(){
+    return Objects.hashCode(bytesRefWritables);
+  }
   /**
    * Returns <tt>true</tt> if this instance contains one or more the specified
    * BytesRefWritable.

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ac8c77c/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/rcfile/BytesRefWritable.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/rcfile/BytesRefWritable.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/rcfile/BytesRefWritable.java
index 1ffe6ad..c83b505 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/rcfile/BytesRefWritable.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/rcfile/BytesRefWritable.java
@@ -34,7 +34,7 @@ import java.io.IOException;
 public class BytesRefWritable implements Writable, Comparable<BytesRefWritable> {
 
   private static final byte[] EMPTY_BYTES = new byte[0];
-  public static BytesRefWritable ZeroBytesRefWritable = new BytesRefWritable();
+  public static final BytesRefWritable ZeroBytesRefWritable = new BytesRefWritable();
 
   int start = 0;
   int length = 0;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ac8c77c/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/rcfile/NonSyncChunkedByteArrayOutputStream.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/rcfile/NonSyncChunkedByteArrayOutputStream.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/rcfile/NonSyncChunkedByteArrayOutputStream.java
deleted file mode 100644
index e6db288..0000000
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/rcfile/NonSyncChunkedByteArrayOutputStream.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.rcfile;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataInput;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.LinkedList;
-
-/**
- * A thread-not-safe version of ByteArrayOutputStream, which removes all
- * synchronized modifiers.
- */
-public class NonSyncChunkedByteArrayOutputStream extends ByteArrayOutputStream {
-  private LinkedList<byte[]> buffer = new LinkedList<byte[]>();
-  private int chunkSize;
-  private int currentChunk = 0;
-  private int currentPos = 0;
-
-
-  public NonSyncChunkedByteArrayOutputStream(int chunkSize) {
-    super(0);
-    this.chunkSize = chunkSize;
-    buffer.add(new byte[chunkSize]);
-  }
-
-  public NonSyncChunkedByteArrayOutputStream() {
-    this(256 * 1024);
-  }
-
-  public int getLength() {
-    return count;
-  }
-
-  @Override
-  public int size() {
-     return count;
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public void reset() {
-    count = 0;
-    currentChunk = 0;
-    currentPos = 0;
-    buffer.clear();
-    buffer.add(new byte[chunkSize]);
-    buf = new byte[0];
-
-  }
-
-  private static byte[] vLongBytes = new byte[9];
-  public static int writeVLongToByteArray(byte[] bytes, int offset, long l) {
-    if (l >= -112 && l <= 127) {
-      bytes[offset] = (byte) l;
-      return 1;
-    }
-
-    int len = -112;
-    if (l < 0) {
-      l ^= -1L; // take one's complement'
-      len = -120;
-    }
-
-    long tmp = l;
-    while (tmp != 0) {
-      tmp = tmp >> 8;
-      len--;
-    }
-
-    bytes[offset] = (byte) len;
-
-    len = (len < -120) ? -(len + 120) : -(len + 112);
-
-    for (int idx = len; idx != 0; idx--) {
-      int shiftbits = (idx - 1) * 8;
-      long mask = 0xFFL << shiftbits;
-      bytes[offset+1-(idx - len)] = (byte) ((l & mask) >> shiftbits);
-    }
-    return 1 + len;
-  }
-
-  public int writeVLong(long l) {
-    int len = writeVLongToByteArray(vLongBytes, 0, l);
-    write(vLongBytes, 0, len);
-    return len;
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public void write(int b) {
-    ensureBuffer(1);
-    buffer.get(currentChunk)[currentPos] = (byte) b;
-    count += 1;
-    currentPos += 1;
-  }
-
-  private int ensureBuffer(int bytes) {
-    if(currentPos  >= chunkSize) {
-      currentPos = 0;
-      buffer.add(new byte[chunkSize]);
-      currentChunk++;
-    }
-
-    int remainingChunk = buffer.size() -  (currentChunk + 1);
-    int availableSize =  (remainingChunk * chunkSize) + (chunkSize - currentPos);
-
-    while (availableSize <= bytes ){
-      buffer.add(new byte[chunkSize]);
-      availableSize += chunkSize;
-    }
-    return availableSize;
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public void write(byte b[]) {
-    write(b, 0, b.length);
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public void write(byte b[], int off, int len) {
-    if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
-        || ((off + len) < 0)) {
-      throw new IndexOutOfBoundsException();
-    } else if (len == 0) {
-      return;
-    }
-    ensureBuffer(len);
-
-    int remainingBytes = len;
-    while (remainingBytes > 0 ){
-      byte[] chunk = buffer.get(currentChunk);
-
-      if(chunkSize - currentPos > remainingBytes){
-        System.arraycopy(b, off, chunk, currentPos, remainingBytes);
-
-        currentPos += remainingBytes;
-        count += remainingBytes;
-        remainingBytes -= remainingBytes;
-        break;
-      } else {
-        int remainLen = chunkSize - currentPos;
-        System.arraycopy(b, off, chunk, currentPos, remainLen);
-        off += remainLen;
-        remainingBytes -= remainLen;
-        count += remainLen;
-        currentPos = 0;
-        currentChunk++;
-      }
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public void writeTo(OutputStream out) throws IOException {
-    byte[] buf;
-    int remainingBytes = count;
-    while ((buf = buffer.poll()) != null) {
-      int size = Math.min(buf.length, remainingBytes);
-      out.write(buf, 0, size);
-      remainingBytes -= size;
-    }
-  }
-
-  public void write(DataInput in, int len) throws IOException {
-    if (len == 0) return;
-
-    ensureBuffer(len);
-
-    int remainingBytes = len;
-    while (remainingBytes > 0) {
-      byte[] chunk =  buffer.get(currentChunk);
-      if(chunkSize - currentPos >= remainingBytes){
-        in.readFully(chunk, currentPos, remainingBytes);
-        currentPos += remainingBytes;
-        count += remainingBytes;
-        remainingBytes -= remainingBytes;
-      } else {
-        int remainLen = chunk.length - currentPos;
-        in.readFully(chunk, currentPos, remainLen);
-        remainingBytes -= remainLen;
-        count += remainLen;
-        currentPos = 0;
-        currentChunk++;
-      }
-    }
-  }
-
-  public byte[] toArray(){
-    byte[] b = new byte[count];
-    int remainingBytes = count;
-
-    for (byte[] bytes : buffer){
-      int size = Math.min(bytes.length, remainingBytes);
-      System.arraycopy(bytes, 0, b, count - remainingBytes, size);
-      remainingBytes -= size;
-    }
-    return b;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ac8c77c/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
index 9fe0bce..a6b8781 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
@@ -38,6 +38,7 @@ import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.NullDatum;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.Bytes;
 
 import java.io.*;
 import java.rmi.server.UID;
@@ -486,7 +487,7 @@ public class RCFile {
         }
 
         if (skipTotal != 0) {
-          in.skipBytes(skipTotal);
+          Bytes.skipFully(in, skipTotal);
           skipTotal = 0;
         }
 
@@ -524,7 +525,7 @@ public class RCFile {
       }
 
       if (skipTotal != 0) {
-        in.skipBytes(skipTotal);
+        Bytes.skipFully(in, skipTotal);
       }
     }
 
@@ -569,7 +570,6 @@ public class RCFile {
    * compatible with SequenceFile's.
    */
   public static class RCFileAppender extends FileAppender {
-    Configuration conf;
     FSDataOutputStream out;
 
     CompressionCodec codec = null;
@@ -583,7 +583,7 @@ public class RCFile {
     // the max size of memory for buffering records before writes them out
     private int COLUMNS_BUFFER_SIZE = 16 * 1024 * 1024; // 4M
     // the conf string for COLUMNS_BUFFER_SIZE
-    public static String COLUMNS_BUFFER_SIZE_CONF_STR = "hive.io.rcfile.record.buffer.size";
+    public static final String COLUMNS_BUFFER_SIZE_CONF_STR = "hive.io.rcfile.record.buffer.size";
 
     // how many records already buffered
     private int bufferedRecords = 0;
@@ -703,8 +703,6 @@ public class RCFile {
     public RCFileAppender(Configuration conf, final Schema schema, final TableMeta meta, final Path path) throws IOException {
       super(conf, schema, meta, path);
 
-      this.conf = conf;
-      this.fs = path.getFileSystem(conf);
       RECORD_INTERVAL = conf.getInt(RECORD_INTERVAL_CONF_STR, RECORD_INTERVAL);
       COLUMNS_BUFFER_SIZE = conf.getInt(COLUMNS_BUFFER_SIZE_CONF_STR, COLUMNS_BUFFER_SIZE);
       columnNumber = schema.getColumnNum();
@@ -812,7 +810,6 @@ public class RCFile {
 
     void init(Configuration conf, FSDataOutputStream out,
               CompressionCodec codec, Metadata metadata) throws IOException {
-      this.conf = conf;
       this.out = out;
       this.codec = codec;
       this.metadata = metadata;
@@ -891,18 +888,10 @@ public class RCFile {
       }
 
       bufferedRecords++;
-      if (this.isCompressed()) {
-        //TODO compression rate base flush
-        if ((columnBufferSize > COLUMNS_BUFFER_SIZE)
-            || (bufferedRecords >= RECORD_INTERVAL)) {
-          flushRecords();
-        }
-      } else {
-        //TODO block base flush
-        if ((columnBufferSize > COLUMNS_BUFFER_SIZE)
-            || (bufferedRecords >= RECORD_INTERVAL)) {
-          flushRecords();
-        }
+      //TODO compression rate base flush
+      if ((columnBufferSize > COLUMNS_BUFFER_SIZE)
+          || (bufferedRecords >= RECORD_INTERVAL)) {
+        flushRecords();
       }
     }
 
@@ -977,42 +966,50 @@ public class RCFile {
         deflateOut = new DataOutputStream(new BufferedOutputStream(deflateFilter));
       }
 
-      for (int columnIndex = 0; columnIndex < columnNumber; columnIndex++) {
-        ColumnBuffer currentBuf = columnBuffers[columnIndex];
-        currentBuf.flushGroup();
-
-        NonSyncByteArrayOutputStream columnValue = currentBuf.columnValBuffer;
-        int colLen;
-        int plainLen = columnValue.getLength();
-        if (isCompressed) {
-          deflateFilter.resetState();
-          deflateOut.write(columnValue.getData(), 0, columnValue.getLength());
-          deflateOut.flush();
-          deflateFilter.finish();
-          columnValue.close();
-          // find how much compressed data was added for this column
-          colLen = valueBuffer.getLength() - valueLength;
-          currentBuf.columnValueLength = colLen;
-        } else {
-          colLen = plainLen;
+      try {
+        for (int columnIndex = 0; columnIndex < columnNumber; columnIndex++) {
+          ColumnBuffer currentBuf = columnBuffers[columnIndex];
+          currentBuf.flushGroup();
+
+          NonSyncByteArrayOutputStream columnValue = currentBuf.columnValBuffer;
+          int colLen;
+          int plainLen = columnValue.getLength();
+          if (isCompressed) {
+            deflateFilter.resetState();
+            deflateOut.write(columnValue.getData(), 0, columnValue.getLength());
+            deflateOut.flush();
+            deflateFilter.finish();
+            columnValue.close();
+            // find how much compressed data was added for this column
+            colLen = valueBuffer.getLength() - valueLength;
+            currentBuf.columnValueLength = colLen;
+          } else {
+            colLen = plainLen;
+          }
+          valueLength += colLen;
         }
-        valueLength += colLen;
+      } catch (IOException e) {
+        IOUtils.cleanup(LOG, deflateOut);
+        throw e;
       }
 
-      int keyLength = getKeyBufferSize();
-      if (keyLength < 0) {
-        throw new IOException("negative length keys not allowed: " + keyLength);
-      }
       if (compressor != null) {
         org.apache.tajo.storage.compress.CodecPool.returnCompressor(compressor);
       }
 
+      int keyLength = getKeyBufferSize();
+      if (keyLength < 0) {
+        throw new IOException("negative length keys not allowed: " + keyLength);
+      }
       // Write the key out
       writeKey(keyLength + valueLength, keyLength);
       // write the value out
       if (isCompressed) {
-        out.write(valueBuffer.getData(), 0, valueBuffer.getLength());
-        valueBuffer.close();
+        try {
+          out.write(valueBuffer.getData(), 0, valueBuffer.getLength());
+        } finally {
+          IOUtils.cleanup(LOG, valueBuffer, deflateOut, deflateFilter);
+        }
       } else {
         for (int columnIndex = 0; columnIndex < columnNumber; ++columnIndex) {
           columnBuffers[columnIndex].columnValBuffer.writeTo(out);
@@ -1077,7 +1074,7 @@ public class RCFile {
     }
 
     @Override
-    public synchronized void close() throws IOException {
+    public void close() throws IOException {
       if (bufferedRecords > 0) {
         flushRecords();
       }
@@ -1124,8 +1121,6 @@ public class RCFile {
     private int currentKeyLength;
     private int currentRecordLength;
 
-    private final Configuration conf;
-
     private ValueBuffer currentValue;
 
     private int readRowsIndexInBuffer = 0;
@@ -1164,7 +1159,6 @@ public class RCFile {
 
       startOffset = fragment.getStartKey();
       endOffset = startOffset + fragment.getEndKey();
-      this.conf = conf;
       more = startOffset < endOffset;
       start = 0;
     }
@@ -1362,7 +1356,7 @@ public class RCFile {
     /**
      * Return the current byte position in the input file.
      */
-    public synchronized long getPosition() throws IOException {
+    public long getPosition() throws IOException {
       return in.getPos();
     }
 
@@ -1376,7 +1370,7 @@ public class RCFile {
      * words, the current seek can only seek to the end of the file. For other
      * positions, use {@link RCFile.RCFileScanner#sync(long)}.
      */
-    public synchronized void seek(long position) throws IOException {
+    public void seek(long position) throws IOException {
       in.seek(position);
     }
 
@@ -1387,7 +1381,7 @@ public class RCFile {
      * Otherwise, the seek or sync will have no effect, it will continue to get rows from the
      * buffer built up from the call to next.
      */
-    public synchronized void resetBuffer() {
+    public void resetBuffer() {
       readRowsIndexInBuffer = 0;
       recordsNumInValBuffer = 0;
     }
@@ -1395,7 +1389,7 @@ public class RCFile {
     /**
      * Seek to the next sync mark past a given position.
      */
-    public synchronized void sync(long position) throws IOException {
+    public void sync(long position) throws IOException {
       if (position + SYNC_SIZE >= end) {
         seek(end);
         return;
@@ -1463,7 +1457,7 @@ public class RCFile {
      * @return the length of the next record or -1 if there is no next record
      * @throws IOException
      */
-    private synchronized int readRecordLength() throws IOException {
+    private int readRecordLength() throws IOException {
       if (in.getPos() >= end) {
         return -1;
       }
@@ -1492,7 +1486,7 @@ public class RCFile {
         return;
       }
       if (!currentValue.inited) {
-        in.skip(currentRecordLength - currentKeyLength);
+        IOUtils.skipFully(in, currentRecordLength - currentKeyLength);
       }
     }
 
@@ -1612,7 +1606,7 @@ public class RCFile {
      * @return next row number
      * @throws IOException
      */
-    public synchronized boolean nextBuffer(LongWritable readRows) throws IOException {
+    public boolean nextBuffer(LongWritable readRows) throws IOException {
       if (readRowsIndexInBuffer < recordsNumInValBuffer) {
         readRows.set(passedRowsNum);
         readRowsIndexInBuffer++;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ac8c77c/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/trevni/TrevniScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/trevni/TrevniScanner.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/trevni/TrevniScanner.java
index 4165afe..2c2037f 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/trevni/TrevniScanner.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/trevni/TrevniScanner.java
@@ -41,7 +41,6 @@ import java.nio.ByteBuffer;
 import static org.apache.tajo.common.TajoDataTypes.DataType;
 
 public class TrevniScanner extends FileScanner {
-  private boolean inited = false;
   private ColumnFileReader reader;
   private int [] projectionMap;
   private ColumnValues [] columns;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ac8c77c/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/CSVFileScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/CSVFileScanner.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/CSVFileScanner.java
index 3cc4edd..b93672b 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/CSVFileScanner.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/CSVFileScanner.java
@@ -27,21 +27,16 @@ import org.apache.hadoop.fs.Seekable;
 import org.apache.hadoop.io.compress.*;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.storage.LazyTuple;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.compress.CodecPool;
 import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.fragment.Fragment;
-import org.apache.tajo.storage.fragment.FragmentConvertor;
 import org.apache.tajo.util.Bytes;
 
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 
-import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
-
 public class CSVFileScanner extends FileScannerV2 {
   public static final String DELIMITER = "csvfile.delimiter";
   public static final String DELIMITER_DEFAULT = "|";
@@ -165,7 +160,7 @@ public class CSVFileScanner extends FileScannerV2 {
 
   @Override
   public boolean isStopScanScheduling() {
-    if(sin != null && sin.IsEndOfStream()) {
+    if(sin != null && sin.isEndOfStream()) {
       return true;
     } else {
       return false;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ac8c77c/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskFileScanScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskFileScanScheduler.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskFileScanScheduler.java
index 8ecf1de..1babf99 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskFileScanScheduler.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskFileScanScheduler.java
@@ -31,7 +31,7 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import static org.apache.tajo.conf.TajoConf.ConfVars;
 
-public class DiskFileScanScheduler extends Thread {
+public final class DiskFileScanScheduler extends Thread {
   private static final Log LOG = LogFactory.getLog(DiskFileScanScheduler.class);
 
 	private Queue<FileScannerV2> requestQueue = new LinkedList<FileScannerV2>();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ac8c77c/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskMountInfo.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskMountInfo.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskMountInfo.java
index d9b0dd2..56100f2 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskMountInfo.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskMountInfo.java
@@ -18,6 +18,8 @@
 
 package org.apache.tajo.storage.v2;
 
+import com.google.common.base.Objects;
+
 public class DiskMountInfo implements Comparable<DiskMountInfo> {
 	private String mountPath;
 	
@@ -58,6 +60,19 @@ public class DiskMountInfo implements Comparable<DiskMountInfo> {
 		return deviceId;
 	}
 
+  @Override
+  public boolean equals(Object obj){
+    if (!(obj instanceof DiskMountInfo)) return false;
+
+    if (compareTo((DiskMountInfo) obj) == 0) return true;
+    else return false;
+  }
+
+  @Override
+  public int hashCode(){
+    return Objects.hashCode(mountPath);
+  }
+
 	@Override
 	public int compareTo(DiskMountInfo other) {
 		String path1 = mountPath;
@@ -76,7 +91,7 @@ public class DiskMountInfo implements Comparable<DiskMountInfo> {
 			
 			if(path1Length < path2Length) {
 				return 1;
-			} else if(path1Length > path1Length) {
+			} else if(path1Length > path2Length) {
 				return -1;
 			} else {
 				return path1.compareTo(path2);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ac8c77c/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskUtil.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskUtil.java
index c50e244..bb90c39 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskUtil.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskUtil.java
@@ -32,7 +32,9 @@ import java.util.TreeSet;
 
 public class DiskUtil {
 
-	public enum OSType {
+  static String UNIX_DISK_DEVICE_PATH = "/proc/partitions";
+
+  public enum OSType {
 		OS_TYPE_UNIX, OS_TYPE_WINXP, OS_TYPE_SOLARIS, OS_TYPE_MAC
 	}
 
@@ -70,7 +72,7 @@ public class DiskUtil {
 	private static List<DiskDeviceInfo> getUnixDiskDeviceInfos() {
 		List<DiskDeviceInfo> infos = new ArrayList<DiskDeviceInfo>();
 		
-		File file = new File("/proc/partitions");
+		File file = new File(UNIX_DISK_DEVICE_PATH);
 		if(!file.exists()) {
 			System.out.println("No partition file:" + file.getAbsolutePath());
 			return getDefaultDiskDeviceInfos();
@@ -78,7 +80,7 @@ public class DiskUtil {
 		
 		BufferedReader reader = null;
 		try {
-			reader = new BufferedReader(new InputStreamReader(new FileInputStream("/proc/partitions")));
+			reader = new BufferedReader(new InputStreamReader(new FileInputStream(UNIX_DISK_DEVICE_PATH)));
 			String line = null;
 			
 			int count = 0;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ac8c77c/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/FileScanRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/FileScanRunner.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/FileScanRunner.java
index 10f12be..07fbe6c 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/FileScanRunner.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/FileScanRunner.java
@@ -18,15 +18,10 @@
 
 package org.apache.tajo.storage.v2;
 
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicInteger;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class FileScanRunner extends Thread {
   private static final Log LOG = LogFactory.getLog(FileScanRunner.class);
@@ -55,8 +50,8 @@ public class FileScanRunner extends Thread {
 
 	public void run() {
     try {
-      long startTime = System.currentTimeMillis();
-      boolean fetching = fileScanner.isFetchProcessing();
+//      long startTime = System.currentTimeMillis();
+//      boolean fetching = fileScanner.isFetchProcessing();
       fileScanner.scan(maxReadBytes);
 //      if(diskFileScanScheduler.getDiskId() == 1) {
 //        LOG.info("========>" + diskFileScanScheduler.getDiskId() + "," + fileScanner.getId() +

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ac8c77c/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/RCFile.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/RCFile.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/RCFile.java
index acddf79..4b79c51 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/RCFile.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/RCFile.java
@@ -18,37 +18,24 @@
 
 package org.apache.tajo.storage.v2;
 
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.rmi.server.UID;
-import java.security.MessageDigest;
-import java.util.Arrays;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.ChecksumException;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.*;
 import org.apache.hadoop.io.SequenceFile.Metadata;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.VersionMismatchException;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.io.compress.*;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.storage.rcfile.*;
+import org.apache.tajo.util.Bytes;
+
+import java.io.*;
+import java.rmi.server.UID;
+import java.security.MessageDigest;
+import java.util.Arrays;
 
 /**
  * <code>RCFile</code>s, short of Record Columnar File, are flat files
@@ -311,6 +298,16 @@ public class RCFile {
       throw new RuntimeException("compareTo not supported in class "
           + this.getClass().getName());
     }
+
+    @Override
+    public boolean equals(Object obj){
+      return super.equals(obj);
+    }
+
+    @Override
+    public int hashCode(){
+      return super.hashCode();
+    }
   }
 
   /**
@@ -488,7 +485,7 @@ public class RCFile {
         }
 
         if (skipTotal != 0) {
-          in.skipBytes(skipTotal);
+          Bytes.skipFully(in, skipTotal);
           skipTotal = 0;
         }
 
@@ -515,7 +512,7 @@ public class RCFile {
       }
 
       if (skipTotal != 0) {
-        in.skipBytes(skipTotal);
+        Bytes.skipFully(in, skipTotal);
       }
     }
 
@@ -551,6 +548,16 @@ public class RCFile {
       throw new RuntimeException("compareTo not supported in class "
           + this.getClass().getName());
     }
+
+    @Override
+    public boolean equals(Object obj){
+      return super.equals(obj);
+    }
+
+    @Override
+    public int hashCode(){
+      return super.hashCode();
+    }
   }
 
   /**
@@ -604,7 +611,7 @@ public class RCFile {
     // the max size of memory for buffering records before writes them out
     private int columnsBufferSize = 4 * 1024 * 1024; // 4M
     // the conf string for COLUMNS_BUFFER_SIZE
-    public static String COLUMNS_BUFFER_SIZE_CONF_STR = "hive.io.rcfile.record.buffer.size";
+    public static final String COLUMNS_BUFFER_SIZE_CONF_STR = "hive.io.rcfile.record.buffer.size";
 
     // how many records already buffered
     private int bufferedRecords = 0;
@@ -624,7 +631,7 @@ public class RCFile {
     /*
      * used for buffering appends before flush them out
      */
-    class ColumnBuffer {
+    static class ColumnBuffer {
       // used for buffer a column's values
       NonSyncDataOutputBuffer columnValBuffer;
       // used to store each value's length
@@ -917,34 +924,39 @@ public class RCFile {
         deflateOut = new DataOutputStream(deflateFilter);
       }
 
-      for (int columnIndex = 0; columnIndex < columnNumber; columnIndex++) {
-        ColumnBuffer currentBuf = columnBuffers[columnIndex];
-        currentBuf.flushGroup();
-
-        NonSyncDataOutputBuffer columnValue = currentBuf.columnValBuffer;
-        int colLen;
-        int plainLen = columnValuePlainLength[columnIndex];
-
-        if (isCompressed) {
-          if (deflateFilter instanceof SchemaAwareCompressionOutputStream) {
-            ((SchemaAwareCompressionOutputStream)deflateFilter).
-              setColumnIndex(columnIndex);
+      try {
+        for (int columnIndex = 0; columnIndex < columnNumber; columnIndex++) {
+          ColumnBuffer currentBuf = columnBuffers[columnIndex];
+          currentBuf.flushGroup();
+
+          NonSyncDataOutputBuffer columnValue = currentBuf.columnValBuffer;
+          int colLen;
+          int plainLen = columnValuePlainLength[columnIndex];
+
+          if (isCompressed) {
+            if (deflateFilter instanceof SchemaAwareCompressionOutputStream) {
+              ((SchemaAwareCompressionOutputStream) deflateFilter).
+                  setColumnIndex(columnIndex);
+            }
+            deflateFilter.resetState();
+            deflateOut.write(columnValue.getData(), 0, columnValue.getLength());
+            deflateOut.flush();
+            deflateFilter.finish();
+            // find how much compressed data was added for this column
+            colLen = valueBuffer.getLength() - valueLength;
+          } else {
+            colLen = columnValuePlainLength[columnIndex];
           }
-          deflateFilter.resetState();
-          deflateOut.write(columnValue.getData(), 0, columnValue.getLength());
-          deflateOut.flush();
-          deflateFilter.finish();
-          // find how much compressed data was added for this column
-          colLen = valueBuffer.getLength() - valueLength;
-        } else {
-          colLen = columnValuePlainLength[columnIndex];
+          valueLength += colLen;
+          key.setColumnLenInfo(colLen, currentBuf.valLenBuffer, plainLen,
+              columnIndex);
+          plainTotalColumnLength[columnIndex] += plainLen;
+          comprTotalColumnLength[columnIndex] += colLen;
+          columnValuePlainLength[columnIndex] = 0;
         }
-        valueLength += colLen;
-        key.setColumnLenInfo(colLen, currentBuf.valLenBuffer, plainLen,
-          columnIndex);
-        plainTotalColumnLength[columnIndex] += plainLen;
-        comprTotalColumnLength[columnIndex] += colLen;
-        columnValuePlainLength[columnIndex] = 0;
+      } catch (IOException e) {
+        IOUtils.cleanup(LOG, deflateOut);
+        throw e;
       }
 
       int keyLength = key.getSize();
@@ -1020,7 +1032,7 @@ public class RCFile {
       }
     }
 
-    public synchronized void close() throws IOException {
+    public void close() throws IOException {
       if (bufferedRecords > 0) {
         flushRecords();
       }
@@ -1297,7 +1309,7 @@ public class RCFile {
     }
 
     /** Return the current byte position in the input file. */
-    public synchronized long getPosition() throws IOException {
+    public long getPosition() throws IOException {
       return sin.getPos();
     }
 
@@ -1311,7 +1323,7 @@ public class RCFile {
      * words, the current seek can only seek to the end of the file. For other
      * positions, use {@link org.apache.tajo.storage.v2.RCFile.Reader#sync(long)}.
      */
-    public synchronized void seek(long position) throws IOException {
+    public void seek(long position) throws IOException {
       sin.seek(position);
       sin.readNext(128 * 1024);
     }
@@ -1323,13 +1335,13 @@ public class RCFile {
      * Otherwise, the seek or sync will have no effect, it will continue to get rows from the
      * buffer built up from the call to next.
      */
-    public synchronized void resetBuffer() {
+    public void resetBuffer() {
       readRowsIndexInBuffer = 0;
       recordsNumInValBuffer = 0;
     }
 
     /** Seek to the next sync mark past a given position. */
-    public synchronized void sync(long position) throws IOException {
+    public void sync(long position) throws IOException {
       if (position + SYNC_SIZE >= end) {
         seek(end);
         return;
@@ -1393,7 +1405,7 @@ public class RCFile {
      * @return the length of the next record or -1 if there is no next record
      * @throws java.io.IOException
      */
-    private synchronized int readRecordLength() throws IOException {
+    private int readRecordLength() throws IOException {
     	if (sin.getPos() >= end) {
         return -1;
       }
@@ -1422,7 +1434,7 @@ public class RCFile {
         return;
       }
       if (!currentValue.inited) {
-        sin.skip(currentRecordLength - currentKeyLength);
+        IOUtils.skipFully(sin, currentRecordLength - currentKeyLength);
       }
     }
 
@@ -1578,7 +1590,7 @@ public class RCFile {
      */
     @SuppressWarnings("unused")
     @Deprecated
-    public synchronized boolean nextColumnsBatch() throws IOException {
+    public boolean nextColumnsBatch() throws IOException {
       passedRowsNum += (recordsNumInValBuffer - readRowsIndexInBuffer);
       return nextKeyBuffer() > 0;
     }
@@ -1593,7 +1605,7 @@ public class RCFile {
      * @return next row number
      * @throws java.io.IOException
      */
-    public synchronized boolean next(LongWritable readRows) throws IOException {
+    public boolean next(LongWritable readRows) throws IOException {
       if (hasRecordsInBuffer()) {
         readRows.set(passedRowsNum);
         readRowsIndexInBuffer++;
@@ -1656,7 +1668,7 @@ public class RCFile {
      *
      * @throws java.io.IOException
      */
-    public synchronized void getCurrentRow(BytesRefArrayWritable ret) throws IOException {
+    public void getCurrentRow(BytesRefArrayWritable ret) throws IOException {
 
       if (!keyInit || rowFetched) {
         return;


Mime
View raw message