tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jihoon...@apache.org
Subject [1/3] TAJO-176: Implement Tajo JDBC Driver. (Keuntae Park via jihoon)
Date Mon, 18 Nov 2013 08:56:46 GMT
Updated Branches:
  refs/heads/master 6e2db3baf -> 342fd47ff


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/342fd47f/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/jdbc/TajoResultSetBase.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/jdbc/TajoResultSetBase.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/jdbc/TajoResultSetBase.java
new file mode 100644
index 0000000..f4d685f
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/jdbc/TajoResultSetBase.java
@@ -0,0 +1,1129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.jdbc;
+
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.storage.Tuple;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Reader;
+import java.math.BigDecimal;
+import java.net.URL;
+import java.sql.*;
+import java.util.Calendar;
+import java.util.Map;
+
+public abstract class TajoResultSetBase implements ResultSet {
+  protected int curRow;
+  protected long totalRow;
+  protected boolean wasNull;
+  protected Schema schema;
+  protected Tuple cur;
+
+  protected void init() {
+    cur = null;
+    curRow = 0;
+    totalRow = 0;
+    wasNull = false;
+  }
+
+  private void handleNull(Datum d) {
+    wasNull = (d instanceof NullDatum);
+  }
+
+  @Override
+  public void beforeFirst() throws SQLException {
+    init();
+  }
+
+  @Override
+  public boolean getBoolean(int fieldId) throws SQLException {
+    Datum datum = cur.get(fieldId - 1);
+    handleNull(datum);
+    return datum.asBool();
+  }
+
+  @Override
+  public boolean getBoolean(String colName) throws SQLException {
+    Datum datum = cur.get(findColumn(colName));
+    handleNull(datum);
+    return datum.asBool();
+  }
+
+  @Override
+  public byte getByte(int fieldId) throws SQLException {
+    Datum datum = cur.get(fieldId - 1);
+    handleNull(datum);
+    return datum.asByte();
+  }
+
+  @Override
+  public byte getByte(String name) throws SQLException {
+    Datum datum = cur.get(findColumn(name));
+    handleNull(datum);
+    return datum.asByte();
+  }
+
+  @Override
+  public byte[] getBytes(int fieldId) throws SQLException {
+    Datum datum = cur.get(fieldId - 1);
+    handleNull(datum);
+    return datum.asByteArray();
+  }
+
+  @Override
+  public byte[] getBytes(String name) throws SQLException {
+    Datum datum = cur.get(findColumn(name));
+    handleNull(datum);
+    return datum.asByteArray();
+  }
+
+  @Override
+  public double getDouble(int fieldId) throws SQLException {
+    Datum datum = cur.get(fieldId - 1);
+    handleNull(datum);
+    return datum.asFloat8();
+  }
+
+  @Override
+  public double getDouble(String name) throws SQLException {
+    Datum datum = cur.get(findColumn(name));
+    handleNull(datum);
+    return datum.asFloat8();
+  }
+
+  @Override
+  public float getFloat(int fieldId) throws SQLException {
+    Datum datum = cur.get(fieldId - 1);
+    handleNull(datum);
+    return datum.asFloat4();
+  }
+
+  @Override
+  public float getFloat(String name) throws SQLException {
+    Datum datum = cur.get(findColumn(name));
+    handleNull(datum);
+    return datum.asFloat4();
+  }
+
+  @Override
+  public int getInt(int fieldId) throws SQLException {
+    Datum datum = cur.get(fieldId - 1);
+    handleNull(datum);
+    return datum.asInt4();
+  }
+
+  @Override
+  public int getInt(String name) throws SQLException {
+    Datum datum = cur.get(findColumn(name));
+    handleNull(datum);
+    return datum.asInt4();
+  }
+
+  @Override
+  public long getLong(int fieldId) throws SQLException {
+    Datum datum = cur.get(fieldId - 1);
+    handleNull(datum);
+    return datum.asInt8();
+  }
+
+  @Override
+  public long getLong(String name) throws SQLException {
+    Datum datum = cur.get(findColumn(name));
+    handleNull(datum);
+    return datum.asInt8();
+  }
+
+  @Override
+  public Object getObject(int fieldId) throws SQLException {
+    Datum d = cur.get(fieldId - 1);
+    handleNull(d);
+
+    TajoDataTypes.Type dataType = schema.getColumn(fieldId - 1).getDataType().getType();
+
+    switch(dataType) {
+      case BOOLEAN:  return d.asBool();
+      case INT1:
+      case INT2: return d.asInt2();
+      case INT4: return d.asInt4();
+      case INT8: return d.asInt8();
+      case TEXT:
+      case CHAR:
+      case DATE:
+      case VARCHAR:  return d.asChars();
+      case FLOAT4:  return d.asFloat4();
+      case FLOAT8:  return d.asFloat8();
+      case DECIMAL:
+      case NUMERIC:  return d.asFloat8();
+      default: return d.asChars();
+    }
+  }
+
+  @Override
+  public Object getObject(String name) throws SQLException {
+    return getObject(findColumn(name));
+  }
+
+  @Override
+  public short getShort(int fieldId) throws SQLException {
+    Datum datum = cur.get(fieldId - 1);
+    handleNull(datum);
+    return datum.asInt2();
+  }
+
+  @Override
+  public short getShort(String name) throws SQLException {
+    Datum datum = cur.get(findColumn(name));
+    handleNull(datum);
+    return datum.asInt2();
+  }
+
+  @Override
+  public String getString(int fieldId) throws SQLException {
+    Datum datum = cur.get(fieldId - 1);
+    handleNull(datum);
+    return datum.asChars();
+  }
+
+  @Override
+  public String getString(String name) throws SQLException {
+    Datum datum = cur.get(findColumn(name));
+    handleNull(datum);
+    return datum.asChars();
+  }
+
+  @Override
+  public boolean isWrapperFor(Class<?> clazz) throws SQLException {
+    throw new SQLFeatureNotSupportedException("isWrapperFor not supported");
+  }
+
+  @Override
+  public <T> T unwrap(Class<T> clazz) throws SQLException {
+    throw new SQLFeatureNotSupportedException("unwrap not supported");
+  }
+
+  @Override
+  public boolean absolute(int row) throws SQLException {
+    throw new SQLFeatureNotSupportedException("absolute not supported");
+  }
+
+  @Override
+  public void afterLast() throws SQLException {
+    while (this.next())
+      ;
+  }
+
+  @Override
+  public void cancelRowUpdates() throws SQLException {
+    throw new SQLFeatureNotSupportedException("cancelRowUpdates not supported");
+  }
+
+  @Override
+  public void clearWarnings() throws SQLException {
+    throw new SQLFeatureNotSupportedException("clearWarnings not supported");
+  }
+
+  @Override
+  public void deleteRow() throws SQLException {
+    throw new SQLFeatureNotSupportedException("deleteRow not supported");
+  }
+
+  @Override
+  public int findColumn(String colName) throws SQLException {
+    return schema.getColumnIdByName(colName);
+  }
+
+  @Override
+  public boolean first() throws SQLException {
+    this.beforeFirst();
+    return this.next();
+  }
+
+  @Override
+  public Array getArray(int index) throws SQLException {
+    throw new SQLFeatureNotSupportedException("getArray not supported");
+  }
+
+  @Override
+  public Array getArray(String name) throws SQLException {
+    throw new SQLFeatureNotSupportedException("getArray not supported");
+  }
+
+  @Override
+  public InputStream getAsciiStream(int index) throws SQLException {
+    throw new SQLFeatureNotSupportedException("getAsciiStream not supported");
+  }
+
+  @Override
+  public InputStream getAsciiStream(String name) throws SQLException {
+    throw new SQLFeatureNotSupportedException("getAsciiStream not supported");
+  }
+
+  @Override
+  public BigDecimal getBigDecimal(int index) throws SQLException {
+    throw new SQLFeatureNotSupportedException("getBigDecimal not supported");
+  }
+
+  @Override
+  public BigDecimal getBigDecimal(String name) throws SQLException {
+    throw new SQLFeatureNotSupportedException("getBigDecimal not supported");
+  }
+
+  @Override
+  public BigDecimal getBigDecimal(int index, int x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("getBigDecimal not supported");
+  }
+
+  @Override
+  public BigDecimal getBigDecimal(String name, int x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("getBigDecimal not supported");
+  }
+
+  @Override
+  public InputStream getBinaryStream(int index) throws SQLException {
+    throw new SQLFeatureNotSupportedException("getBinaryStream not supported");
+  }
+
+  @Override
+  public InputStream getBinaryStream(String name) throws SQLException {
+    throw new SQLFeatureNotSupportedException("getBinaryStream not supported");
+  }
+
+  @Override
+  public Blob getBlob(int index) throws SQLException {
+    throw new SQLFeatureNotSupportedException("getBlob not supported");
+  }
+
+  @Override
+  public Blob getBlob(String name) throws SQLException {
+    throw new SQLFeatureNotSupportedException("getBlob not supported");
+  }
+
+  @Override
+  public Reader getCharacterStream(int index) throws SQLException {
+    throw new SQLFeatureNotSupportedException("getCharacterStream not supported");
+  }
+
+  @Override
+  public Reader getCharacterStream(String name) throws SQLException {
+    throw new SQLFeatureNotSupportedException("getCharacterStream not supported");
+  }
+
+  @Override
+  public Clob getClob(int index) throws SQLException {
+    throw new SQLFeatureNotSupportedException("getClob not supported");
+  }
+
+  @Override
+  public Clob getClob(String name) throws SQLException {
+    throw new SQLFeatureNotSupportedException("getClob not supported");
+  }
+
+  @Override
+  public int getConcurrency() throws SQLException {
+    return ResultSet.CONCUR_READ_ONLY;
+  }
+
+  @Override
+  public String getCursorName() throws SQLException {
+    throw new SQLFeatureNotSupportedException("getCursorName not supported");
+  }
+
+  @Override
+  public Date getDate(int index) throws SQLException {
+    Object obj = getObject(index);
+    if (obj == null) {
+      return null;
+    }
+
+    try {
+      return Date.valueOf((String) obj);
+    } catch (Exception e) {
+      throw new SQLException("Cannot convert column " + index
+          + " to date: " + e.toString());
+    }
+  }
+
+  @Override
+  public Date getDate(String name) throws SQLException {
+    return getDate(findColumn(name));
+  }
+
+  @Override
+  public Date getDate(int index, Calendar x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("getDate not supported");
+  }
+
+  @Override
+  public Date getDate(String name, Calendar x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("getDate not supported");
+  }
+
+  @Override
+  public int getFetchDirection() throws SQLException {
+    return ResultSet.FETCH_FORWARD;
+  }
+
+  @Override
+  public int getFetchSize() throws SQLException {
+    throw new SQLFeatureNotSupportedException("getFetchSize not supported");
+  }
+
+  @Override
+  public int getHoldability() throws SQLException {
+    throw new SQLFeatureNotSupportedException("getHoldability not supported");
+  }
+
+  @Override
+  public ResultSetMetaData getMetaData() throws SQLException {
+    return new TajoResultSetMetaData(schema);
+  }
+
+  @Override
+  public Reader getNCharacterStream(int index) throws SQLException {
+    throw new SQLFeatureNotSupportedException("getNCharacterStream not supported");
+  }
+
+  @Override
+  public Reader getNCharacterStream(String name) throws SQLException {
+    throw new SQLFeatureNotSupportedException("getNCharacterStream not supported");
+  }
+
+  @Override
+  public NClob getNClob(int index) throws SQLException {
+    throw new SQLFeatureNotSupportedException("getNClob not supported");
+  }
+
+  @Override
+  public NClob getNClob(String name) throws SQLException {
+    throw new SQLFeatureNotSupportedException("getNClob not supported");
+  }
+
+  @Override
+  public String getNString(int index) throws SQLException {
+    throw new SQLFeatureNotSupportedException("getNString not supported");
+  }
+
+  @Override
+  public String getNString(String name) throws SQLException {
+    throw new SQLFeatureNotSupportedException("getNString not supported");
+  }
+
+  @Override
+  public Object getObject(int index, Map<String, Class<?>> x)
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("getObject not supported");
+  }
+
+  @Override
+  public Object getObject(String name, Map<String, Class<?>> x)
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("getObject not supported");
+  }
+
+  public <T> T getObject(String name, Class<T> x)
+      throws SQLException {
+    //JDK 1.7
+    throw new SQLFeatureNotSupportedException("getObject not supported");
+  }
+
+  public <T> T getObject(int index, Class<T> x)
+      throws SQLException {
+    //JDK 1.7
+    throw new SQLFeatureNotSupportedException("getObject not supported");
+  }
+
+  @Override
+  public Ref getRef(int index) throws SQLException {
+    throw new SQLFeatureNotSupportedException("getRef not supported");
+  }
+
+  @Override
+  public Ref getRef(String name) throws SQLException {
+    throw new SQLFeatureNotSupportedException("getRef not supported");
+  }
+
+  @Override
+  public int getRow() throws SQLException {
+    return curRow;
+  }
+
+  @Override
+  public RowId getRowId(int index) throws SQLException {
+    throw new SQLFeatureNotSupportedException("getRowId not supported");
+  }
+
+  @Override
+  public RowId getRowId(String name) throws SQLException {
+    throw new SQLFeatureNotSupportedException("getRowId not supported");
+  }
+
+  @Override
+  public SQLXML getSQLXML(int index) throws SQLException {
+    throw new SQLFeatureNotSupportedException("getSQLXML not supported");
+  }
+
+  @Override
+  public SQLXML getSQLXML(String name) throws SQLException {
+    throw new SQLFeatureNotSupportedException("getSQLXML not supported");
+  }
+
+  @Override
+  public Statement getStatement() throws SQLException {
+    throw new SQLFeatureNotSupportedException("getStatement not supported");
+  }
+
+  @Override
+  public Time getTime(int index) throws SQLException {
+    throw new SQLFeatureNotSupportedException("getTime not supported");
+  }
+
+  @Override
+  public Time getTime(String name) throws SQLException {
+    throw new SQLFeatureNotSupportedException("getTime not supported");
+  }
+
+  @Override
+  public Time getTime(int index, Calendar x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("getTime not supported");
+  }
+
+  @Override
+  public Time getTime(String name, Calendar x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("getTime not supported");
+  }
+
+  @Override
+  public Timestamp getTimestamp(int index) throws SQLException {
+    throw new SQLFeatureNotSupportedException("getTimestamp not supported");
+  }
+
+  @Override
+  public Timestamp getTimestamp(String name) throws SQLException {
+    throw new SQLFeatureNotSupportedException("getTimestamp not supported");
+  }
+
+  @Override
+  public Timestamp getTimestamp(int index, Calendar x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("getTimestamp not supported");
+  }
+
+  @Override
+  public Timestamp getTimestamp(String name, Calendar x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("getTimestamp not supported");
+  }
+
+  @Override
+  public int getType() throws SQLException {
+    return ResultSet.TYPE_FORWARD_ONLY;
+  }
+
+  @Override
+  public URL getURL(int index) throws SQLException {
+    throw new SQLFeatureNotSupportedException("getURL not supported");
+  }
+
+  @Override
+  public URL getURL(String name) throws SQLException {
+    throw new SQLFeatureNotSupportedException("getURL not supported");
+  }
+
+  @Override
+  public InputStream getUnicodeStream(int index) throws SQLException {
+    throw new SQLFeatureNotSupportedException("getUnicodeStream not supported");
+  }
+
+  @Override
+  public InputStream getUnicodeStream(String name) throws SQLException {
+    throw new SQLFeatureNotSupportedException("getUnicodeStream not supported");
+  }
+
+  @Override
+  public SQLWarning getWarnings() throws SQLException {
+    throw new SQLFeatureNotSupportedException("getWarnings not supported");
+  }
+
+  @Override
+  public void insertRow() throws SQLException {
+    throw new SQLFeatureNotSupportedException("insertRow not supported");
+  }
+
+  @Override
+  public boolean isAfterLast() throws SQLException {
+    return this.curRow > this.totalRow;
+  }
+
+  @Override
+  public boolean isBeforeFirst() throws SQLException {
+    return this.curRow == 0;
+  }
+
+  @Override
+  public boolean isClosed() throws SQLException {
+    return this.curRow == -1;
+  }
+
+  @Override
+  public boolean isFirst() throws SQLException {
+    return this.curRow == 1;
+  }
+
+  @Override
+  public boolean isLast() throws SQLException {
+    return this.curRow == this.totalRow;
+  }
+
+  @Override
+  public boolean last() throws SQLException {
+    Tuple last = null;
+    while (this.next()) {
+      last = cur;
+    }
+    cur = last;
+    return true;
+  }
+
+  @Override
+  public void moveToCurrentRow() throws SQLException {
+    throw new SQLFeatureNotSupportedException("moveToCurrentRow not supported");
+  }
+
+  @Override
+  public void moveToInsertRow() throws SQLException {
+    throw new SQLFeatureNotSupportedException("moveToInsertRow not supported");
+  }
+
+  @Override
+  public boolean next() throws SQLException {
+    try {
+      if (totalRow <= 0) {
+        return false;
+      }
+
+      cur = nextTuple();
+      curRow++;
+      if (cur != null) {
+        return true;
+      }
+    } catch (IOException e) {
+      throw new SQLException(e.getMessage());
+    }
+    return false;
+  }
+
+  protected abstract Tuple nextTuple() throws IOException;
+
+  @Override
+  public boolean previous() throws SQLException {
+    throw new SQLFeatureNotSupportedException("previous not supported");
+  }
+
+  @Override
+  public void refreshRow() throws SQLException {
+    throw new SQLFeatureNotSupportedException("refreshRow not supported");
+  }
+
+  @Override
+  public boolean relative(int rows) throws SQLException {
+    throw new SQLFeatureNotSupportedException("relative not supported");
+  }
+
+  @Override
+  public boolean rowDeleted() throws SQLException {
+    throw new SQLFeatureNotSupportedException("rowDeleted not supported");
+  }
+
+  @Override
+  public boolean rowInserted() throws SQLException {
+    throw new SQLFeatureNotSupportedException("rowInserted not supported");
+  }
+
+  @Override
+  public boolean rowUpdated() throws SQLException {
+    throw new SQLFeatureNotSupportedException("rowUpdated not supported");
+  }
+
+  @Override
+  public void setFetchDirection(int direction) throws SQLException {
+    throw new SQLFeatureNotSupportedException("setFetchDirection not supported");
+  }
+
+  @Override
+  public void setFetchSize(int size) throws SQLException {
+    throw new SQLFeatureNotSupportedException("setFetchSize not supported");
+  }
+
+  @Override
+  public void updateArray(int index, Array x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateArray not supported");
+  }
+
+  @Override
+  public void updateArray(String name, Array x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateArray not supported");
+  }
+
+  @Override
+  public void updateAsciiStream(int index, InputStream x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateAsciiStream not supported");
+  }
+
+  @Override
+  public void updateAsciiStream(String name, InputStream x)
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateAsciiStream not supported");
+  }
+
+  @Override
+  public void updateAsciiStream(int index, InputStream x, int length)
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateAsciiStream not supported");
+  }
+
+  @Override
+  public void updateAsciiStream(String name, InputStream x, int length)
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateAsciiStream not supported");
+  }
+
+  @Override
+  public void updateAsciiStream(int index, InputStream x, long length)
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateAsciiStream not supported");
+  }
+
+  @Override
+  public void updateAsciiStream(String name, InputStream x, long length)
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateAsciiStream not supported");
+  }
+
+  @Override
+  public void updateBigDecimal(int index, BigDecimal x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateBigDecimal not supported");
+  }
+
+  @Override
+  public void updateBigDecimal(String name, BigDecimal x)
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateBigDecimal not supported");
+  }
+
+  @Override
+  public void updateBinaryStream(int index, InputStream x)
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateBinaryStream not supported");
+  }
+
+  @Override
+  public void updateBinaryStream(String name, InputStream x)
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateBinaryStream not supported");
+  }
+
+  @Override
+  public void updateBinaryStream(int index, InputStream x, int length)
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateBinaryStream not supported");
+  }
+
+  @Override
+  public void updateBinaryStream(String name, InputStream x, int length)
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateBinaryStream not supported");
+  }
+
+  @Override
+  public void updateBinaryStream(int index, InputStream x, long length)
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateBinaryStream not supported");
+  }
+
+  @Override
+  public void updateBinaryStream(String name, InputStream x, long length)
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateBinaryStream not supported");
+  }
+
+  @Override
+  public void updateBlob(int index, Blob x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateBlob not supported");
+  }
+
+  @Override
+  public void updateBlob(String name, Blob x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateBlob not supported");
+  }
+
+  @Override
+  public void updateBlob(int index, InputStream x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateBlob not supported");
+  }
+
+  @Override
+  public void updateBlob(String name, InputStream x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateBlob not supported");
+  }
+
+  @Override
+  public void updateBlob(int index, InputStream x, long length)
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateBlob not supported");
+  }
+
+  @Override
+  public void updateBlob(String name, InputStream x, long length)
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateBlob not supported");
+  }
+
+  @Override
+  public void updateBoolean(int index, boolean x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateBoolean not supported");
+  }
+
+  @Override
+  public void updateBoolean(String name, boolean x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateBoolean not supported");
+  }
+
+  @Override
+  public void updateByte(int index, byte x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateByte not supported");
+  }
+
+  @Override
+  public void updateByte(String name, byte x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateByte not supported");
+  }
+
+  @Override
+  public void updateBytes(int index, byte[] x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateByte not supported");
+  }
+
+  @Override
+  public void updateBytes(String name, byte[] x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateByte not supported");
+  }
+
+  @Override
+  public void updateCharacterStream(int index, Reader x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateCharacterStream not supported");
+  }
+
+  @Override
+  public void updateCharacterStream(String name, Reader x)
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateCharacterStream not supported");
+  }
+
+  @Override
+  public void updateCharacterStream(int index, Reader x, int length)
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateCharacterStream not supported");
+  }
+
+  @Override
+  public void updateCharacterStream(String name, Reader x, int length)
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateCharacterStream not supported");
+  }
+
+  @Override
+  public void updateCharacterStream(int index, Reader x, long length)
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateCharacterStream not supported");
+  }
+
+  @Override
+  public void updateCharacterStream(String name, Reader x, long length)
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateCharacterStream not supported");
+  }
+
+  @Override
+  public void updateClob(int index, Clob x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateClob not supported");
+  }
+
+  @Override
+  public void updateClob(String name, Clob x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateClob not supported");
+  }
+
+  @Override
+  public void updateClob(int index, Reader x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateClob not supported");
+  }
+
+  @Override
+  public void updateClob(String name, Reader x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateClob not supported");
+  }
+
+  @Override
+  public void updateClob(int index, Reader x, long length) throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateClob not supported");
+  }
+
+  @Override
+  public void updateClob(String name, Reader x, long length)
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateClob not supported");
+  }
+
+  @Override
+  public void updateDate(int index, Date x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateDate not supported");
+  }
+
+  @Override
+  public void updateDate(String name, Date x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateDate not supported");
+  }
+
+  @Override
+  public void updateDouble(int index, double x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateDouble not supported");
+  }
+
+  @Override
+  public void updateDouble(String name, double x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateDouble not supported");
+  }
+
+  @Override
+  public void updateFloat(int index, float x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateFloat not supported");
+  }
+
+  @Override
+  public void updateFloat(String name, float x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateFloat not supported");
+  }
+
+  @Override
+  public void updateInt(int index, int x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateInt not supported");
+  }
+
+  @Override
+  public void updateInt(String name, int x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateInt not supported");
+  }
+
+  @Override
+  public void updateLong(int index, long x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateLong not supported");
+  }
+
+  @Override
+  public void updateLong(String name, long x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateLong not supported");
+  }
+
+  @Override
+  public void updateNCharacterStream(int index, Reader x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateNCharacterStream not supported");
+  }
+
+  @Override
+  public void updateNCharacterStream(String name, Reader x)
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateNCharacterStream not supported");
+  }
+
+  @Override
+  public void updateNCharacterStream(int index, Reader x, long length)
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateNCharacterStream not supported");
+  }
+
+  @Override
+  public void updateNCharacterStream(String name, Reader x, long length)
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateNCharacterStream not supported");
+  }
+
+  @Override
+  public void updateNClob(int index, NClob x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateNClob not supported");
+  }
+
+  @Override
+  public void updateNClob(String name, NClob x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateNClob not supported");
+  }
+
+  @Override
+  public void updateNClob(int index, Reader x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateNClob not supported");
+  }
+
+  @Override
+  public void updateNClob(String name, Reader x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateNClob not supported");
+  }
+
+  @Override
+  public void updateNClob(int index, Reader x, long length) throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateNClob not supported");
+  }
+
+  @Override
+  public void updateNClob(String name, Reader x, long length)
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateNClob not supported");
+  }
+
+  @Override
+  public void updateNString(int arg0, String x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateNString not supported");
+  }
+
+  @Override
+  public void updateNString(String name, String x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateNString not supported");
+  }
+
+  @Override
+  public void updateNull(int arg0) throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateNull not supported");
+  }
+
+  @Override
+  public void updateNull(String name) throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateNull not supported");
+  }
+
+  @Override
+  public void updateObject(int index, Object x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateObject not supported");
+  }
+
+  @Override
+  public void updateObject(String name, Object x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateObject not supported");
+  }
+
+  @Override
+  public void updateObject(int index, Object x, int length) throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateObject not supported");
+  }
+
+  @Override
+  public void updateObject(String name, Object x, int length)
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateObject not supported");
+  }
+
+  @Override
+  public void updateRef(int index, Ref x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateRef not supported");
+  }
+
+  @Override
+  public void updateRef(String name, Ref x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateRef not supported");
+  }
+
+  @Override
+  public void updateRow() throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateRow not supported");
+  }
+
+  @Override
+  public void updateRowId(int index, RowId arg1) throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateRowId not supported");
+  }
+
+  @Override
+  public void updateRowId(String name, RowId x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateRowId not supported");
+  }
+
+  @Override
+  public void updateSQLXML(int index, SQLXML arg1) throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateSQLXML not supported");
+  }
+
+  @Override
+  public void updateSQLXML(String name, SQLXML x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateSQLXML not supported");
+
+  }
+
+  @Override
+  public void updateShort(int index, short x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateShort not supported");
+
+  }
+
+  @Override
+  public void updateShort(String name, short x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateShort not supported");
+
+  }
+
+  @Override
+  public void updateString(int index, String x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateString not supported");
+
+  }
+
+  @Override
+  public void updateString(String name, String arg1) throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateString not supported");
+
+  }
+
+  @Override
+  public void updateTime(int index, Time x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateTime not supported");
+
+  }
+
+  @Override
+  public void updateTime(String name, Time x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateTime not supported");
+
+  }
+
+  @Override
+  public void updateTimestamp(int index, Timestamp x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateTimestamp not supported");
+
+  }
+
+  @Override
+  public void updateTimestamp(String name, Timestamp x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("updateTimestamp not supported");
+
+  }
+
+  @Override
+  public boolean wasNull() throws SQLException {
+    return wasNull;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/342fd47f/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/jdbc/TajoResultSetMetaData.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/jdbc/TajoResultSetMetaData.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/jdbc/TajoResultSetMetaData.java
new file mode 100644
index 0000000..5a04ad0
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/jdbc/TajoResultSetMetaData.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * 
+ */
+package org.apache.tajo.jdbc;
+
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.common.TajoDataTypes.DataType;
+
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+
+public class TajoResultSetMetaData implements ResultSetMetaData {
+  Schema schema;
+
+  
+  public TajoResultSetMetaData(Schema schema) {
+    this.schema = schema;
+  }
+
+  @Override
+  public boolean isWrapperFor(Class<?> clazz) throws SQLException {
+    throw new SQLFeatureNotSupportedException("isWrapperFor not supported");
+  }
+
+  @Override
+  public <T> T unwrap(Class<T> clazz) throws SQLException {
+    throw new SQLFeatureNotSupportedException("unwrap not supported");
+  }
+
+  @Override
+  public String getCatalogName(int column) throws SQLException {
+    throw new SQLFeatureNotSupportedException("getCatalogName not supported");
+  }
+
+  @Override
+  public String getColumnClassName(int column) throws SQLException {
+    return schema.getColumn(column - 1).getClass().getName();
+  }
+
+  @Override
+  public int getColumnCount() throws SQLException {
+    if(schema == null) {
+      return 0;
+    }
+    return schema.getColumnNum();
+  }
+
+  @Override
+  public int getColumnDisplaySize(int column) throws SQLException {
+    return TajoDriver.columnDisplaySize(getColumnType(column));
+  }
+
+  @Override
+  public String getColumnLabel(int column) throws SQLException {
+    return schema.getColumn(column - 1).getQualifiedName();
+  }
+
+  @Override
+  public String getColumnName(int column) throws SQLException {
+    return schema.getColumn(column - 1).getColumnName();
+  }
+
+  @Override
+  public int getColumnType(int column) throws SQLException {
+    DataType type = schema.getColumn(column - 1).getDataType();
+
+    return TajoDriver.tajoTypeToSqlType(type);
+  }
+
+  @Override
+  public String getColumnTypeName(int column) throws SQLException {
+    DataType type = schema.getColumn(column - 1).getDataType();
+
+    return TajoDriver.toSqlType(type);
+  }
+
+  @Override
+  public int getPrecision(int column) throws SQLException {
+    return TajoDriver.columnDisplaySize(getColumnType(column));
+  }
+
+  @Override
+  public int getScale(int column) throws SQLException {
+    return TajoDriver.columnScale(getColumnType(column));
+  }
+
+  @Override
+  public String getSchemaName(int column) throws SQLException {
+    throw new SQLFeatureNotSupportedException("getSchemaName not supported");
+  }
+
+  @Override
+  public String getTableName(int column) throws SQLException {
+    return schema.getColumn(column - 1).getQualifier();
+  }
+
+  @Override
+  public boolean isAutoIncrement(int column) throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean isCaseSensitive(int column) throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean isCurrency(int column) throws SQLException {
+    throw new SQLFeatureNotSupportedException("isCurrency not supported");
+  }
+
+  @Override
+  public boolean isDefinitelyWritable(int column) throws SQLException {
+    return false;
+  }
+
+  @Override
+  public int isNullable(int column) throws SQLException {
+    return ResultSetMetaData.columnNullable;
+  }
+
+  @Override
+  public boolean isReadOnly(int column) throws SQLException {
+    return true;
+  }
+
+  @Override
+  public boolean isSearchable(int column) throws SQLException {
+    return true;
+  }
+
+  @Override
+  public boolean isSigned(int column) throws SQLException {
+    throw new SQLFeatureNotSupportedException("isSigned not supported");
+  }
+
+  @Override
+  public boolean isWritable(int column) throws SQLException {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/342fd47f/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/jdbc/TajoStatement.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/jdbc/TajoStatement.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/jdbc/TajoStatement.java
new file mode 100644
index 0000000..6002fcd
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/jdbc/TajoStatement.java
@@ -0,0 +1,289 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.jdbc;
+
+import org.apache.tajo.client.TajoClient;
+
+import java.sql.*;
+
+public class TajoStatement implements Statement {
+  private TajoClient tajoClient;
+  private int fetchSize = 200;
+
+  /**
+   * We need to keep a reference to the result set to support the following:
+   * <code>
+   * statement.execute(String sql);
+   * statement.getResultSet();
+   * </code>.
+   */
+  private ResultSet resultSet = null;
+
+  /**
+   * Add SQLWarnings to the warningChain if needed.
+   */
+  private SQLWarning warningChain = null;
+
+  /**
+   * Keep state so we can fail certain calls made after close().
+   */
+  private boolean isClosed = false;
+
+  public TajoStatement(TajoClient tajoClient) {
+    this.tajoClient = tajoClient;
+  }
+
+  @Override
+  public void addBatch(String sql) throws SQLException {
+    throw new SQLFeatureNotSupportedException("addBatch not supported");
+  }
+
+  @Override
+  public void cancel() throws SQLException {
+    throw new SQLFeatureNotSupportedException("cancel not supported");
+  }
+
+  @Override
+  public void clearBatch() throws SQLException {
+    throw new SQLFeatureNotSupportedException("clearBatch not supported");
+  }
+
+  @Override
+  public void clearWarnings() throws SQLException {
+    warningChain = null;
+  }
+
+  @Override
+  public void close() throws SQLException {
+    resultSet = null;
+    isClosed = true;
+  }
+
+  public void closeOnCompletion() throws SQLException {
+     // JDK 1.7
+     throw new SQLFeatureNotSupportedException("closeOnCompletion not supported");
+  }
+
+  @Override
+  public boolean execute(String sql) throws SQLException {
+    resultSet = executeQuery(sql);
+
+    return resultSet != null;
+  }
+
+  @Override
+  public boolean execute(String sql, int autoGeneratedKeys) throws SQLException {
+    throw new SQLFeatureNotSupportedException("execute not supported");
+  }
+
+  @Override
+  public boolean execute(String sql, int[] columnIndexes) throws SQLException {
+    throw new SQLFeatureNotSupportedException("execute not supported");
+  }
+
+  @Override
+  public boolean execute(String sql, String[] columnNames) throws SQLException {
+    throw new SQLFeatureNotSupportedException("execute not supported");
+  }
+
+  @Override
+  public int[] executeBatch() throws SQLException {
+    throw new SQLFeatureNotSupportedException("executeBatch not supported");
+  }
+
+  @Override
+  public ResultSet executeQuery(String sql) throws SQLException {
+    if (isClosed) {
+      throw new SQLFeatureNotSupportedException("Can't execute after statement has been closed");
+    }
+
+    try {
+      return tajoClient.executeQueryAndGetResult(sql);
+    } catch (Exception e) {
+      throw new SQLFeatureNotSupportedException(e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public int executeUpdate(String sql) throws SQLException {
+    try {
+      tajoClient.executeQuery(sql);
+
+      return 1;
+    } catch (Exception ex) {
+      throw new SQLFeatureNotSupportedException(ex.toString());
+    }
+  }
+
+  @Override
+  public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException {
+    throw new SQLFeatureNotSupportedException("executeUpdate not supported");
+  }
+
+  @Override
+  public int executeUpdate(String sql, int[] columnIndexes) throws SQLException {
+    throw new SQLFeatureNotSupportedException("executeUpdate not supported");
+  }
+
+  @Override
+  public int executeUpdate(String sql, String[] columnNames) throws SQLException {
+    throw new SQLFeatureNotSupportedException("executeUpdate not supported");
+  }
+
+  @Override
+  public Connection getConnection() throws SQLException {
+    throw new SQLFeatureNotSupportedException("getConnection not supported");
+  }
+
+  @Override
+  public int getFetchDirection() throws SQLException {
+    throw new SQLFeatureNotSupportedException("getFetchDirection not supported");
+  }
+
+  @Override
+  public int getFetchSize() throws SQLException {
+    return fetchSize;
+  }
+
+  @Override
+  public ResultSet getGeneratedKeys() throws SQLException {
+    throw new SQLFeatureNotSupportedException("getGeneratedKeys not supported");
+  }
+
+  @Override
+  public int getMaxFieldSize() throws SQLException {
+    throw new SQLFeatureNotSupportedException("getMaxFieldSize not supported");
+  }
+
+  @Override
+  public int getMaxRows() throws SQLException {
+    throw new SQLFeatureNotSupportedException("getMaxRows not supported");
+  }
+
+  @Override
+  public boolean getMoreResults() throws SQLException {
+    throw new SQLFeatureNotSupportedException("getMoreResults not supported");
+  }
+
+  @Override
+  public boolean getMoreResults(int current) throws SQLException {
+    throw new SQLFeatureNotSupportedException("getMoreResults not supported");
+  }
+
+  @Override
+  public int getQueryTimeout() throws SQLException {
+    throw new SQLFeatureNotSupportedException("getQueryTimeout not supported");
+  }
+
+  @Override
+  public ResultSet getResultSet() throws SQLException {
+    return resultSet;
+  }
+
+  @Override
+  public int getResultSetConcurrency() throws SQLException {
+    throw new SQLFeatureNotSupportedException("getResultSetConcurrency not supported");
+  }
+
+  @Override
+  public int getResultSetHoldability() throws SQLException {
+    throw new SQLFeatureNotSupportedException("getResultSetHoldability not supported");
+  }
+
+  @Override
+  public int getResultSetType() throws SQLException {
+    throw new SQLFeatureNotSupportedException("getResultSetType not supported");
+  }
+
+  @Override
+  public int getUpdateCount() throws SQLException {
+    return 0;
+  }
+
+  @Override
+  public SQLWarning getWarnings() throws SQLException {
+    return warningChain;
+  }
+
+  @Override
+  public boolean isClosed() throws SQLException {
+    return isClosed;
+  }
+
+  public boolean isCloseOnCompletion() throws SQLException {
+    // JDK 1.7
+    throw new SQLFeatureNotSupportedException("isCloseOnCompletion not supported");
+  }
+
+  @Override
+  public boolean isPoolable() throws SQLException {
+    throw new SQLFeatureNotSupportedException("isPoolable not supported");
+  }
+
+  @Override
+  public void setCursorName(String name) throws SQLException {
+    throw new SQLFeatureNotSupportedException("setCursorName not supported");
+  }
+
+  @Override
+  public void setEscapeProcessing(boolean enable) throws SQLException {
+    throw new SQLFeatureNotSupportedException("setEscapeProcessing not supported");
+  }
+
+  @Override
+  public void setFetchDirection(int direction) throws SQLException {
+    throw new SQLFeatureNotSupportedException("setFetchDirection not supported");
+  }
+
+  @Override
+  public void setFetchSize(int rows) throws SQLException {
+    fetchSize = rows;
+  }
+
+  @Override
+  public void setMaxFieldSize(int max) throws SQLException {
+    throw new SQLFeatureNotSupportedException("setMaxFieldSize not supported");
+  }
+
+  @Override
+  public void setMaxRows(int max) throws SQLException {
+    throw new SQLFeatureNotSupportedException("setMaxRows not supported");
+  }
+
+  @Override
+  public void setPoolable(boolean poolable) throws SQLException {
+    throw new SQLFeatureNotSupportedException("setPoolable not supported");
+  }
+
+  @Override
+  public void setQueryTimeout(int seconds) throws SQLException {
+    throw new SQLFeatureNotSupportedException("setQueryTimeout not supported");
+  }
+
+  @Override
+  public boolean isWrapperFor(Class<?> iface) throws SQLException {
+    throw new SQLFeatureNotSupportedException("isWrapperFor not supported");
+  }
+
+  @Override
+  public <T> T unwrap(Class<T> iface) throws SQLException {
+    throw new SQLFeatureNotSupportedException("unwrap not supported");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/342fd47f/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
index 487d7af..3fd38ec 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
@@ -22,7 +22,6 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.tajo.IntegrationTest;
 import org.apache.tajo.TpchTestBase;
-import org.apache.tajo.client.ResultSetUtil;
 import org.apache.tajo.util.TUtil;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/342fd47f/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestResultSetImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestResultSetImpl.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestResultSetImpl.java
deleted file mode 100644
index 48c424d..0000000
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestResultSetImpl.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * 
- */
-package org.apache.tajo.engine.query;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.IntegrationTest;
-import org.apache.tajo.TajoTestingCluster;
-import org.apache.tajo.TpchTestBase;
-import org.apache.tajo.catalog.*;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.client.ResultSetImpl;
-import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.storage.*;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import java.io.IOException;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-
-import static org.junit.Assert.*;
-
-@Category(IntegrationTest.class)
-public class TestResultSetImpl {
-  private static TajoTestingCluster util;
-  private static TajoConf conf;
-  private static TableDesc desc;
-  private static AbstractStorageManager sm;
-  private static TableMeta scoreMeta;
-  private static Schema scoreSchema;
-
-  @BeforeClass
-  public static void setup() throws Exception {
-    util = TpchTestBase.getInstance().getTestingCluster();
-    conf = util.getConfiguration();
-    sm = StorageManagerFactory.getStorageManager(conf);
-
-    scoreSchema = new Schema();
-    scoreSchema.addColumn("deptname", Type.TEXT);
-    scoreSchema.addColumn("score", Type.INT4);
-    scoreMeta = CatalogUtil.newTableMeta(StoreType.CSV);
-    TableStats stats = new TableStats();
-
-    Path p = sm.getTablePath("score");
-    sm.getFileSystem().mkdirs(p);
-    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(scoreMeta, scoreSchema,
-        new Path(p, "score"));
-    appender.init();
-    int deptSize = 100;
-    int tupleNum = 10000;
-    Tuple tuple;
-    long written = 0;
-    for (int i = 0; i < tupleNum; i++) {
-      tuple = new VTuple(2);
-      String key = "test" + (i % deptSize);
-      tuple.put(0, DatumFactory.createText(key));
-      tuple.put(1, DatumFactory.createInt4(i + 1));
-      written += key.length() + Integer.SIZE;
-      appender.addTuple(tuple);
-    }
-    appender.close();
-    stats.setNumRows(tupleNum);
-    stats.setNumBytes(written);
-    stats.setAvgRows(tupleNum);
-    stats.setNumBlocks(1000);
-    stats.setNumPartitions(100);
-    desc = new TableDesc("score", scoreSchema, scoreMeta, p);
-    desc.setStats(stats);
-  }
-
-  @AfterClass
-  public static void terminate() throws IOException {
-
-  }
-
-  @Test
-  public void test() throws IOException, SQLException {
-    ResultSetImpl rs = new ResultSetImpl(null, null, conf, desc);
-    ResultSetMetaData meta = rs.getMetaData();
-    assertNotNull(meta);
-    Schema schema = scoreSchema;
-    assertEquals(schema.getColumnNum(), meta.getColumnCount());
-    for (int i = 0; i < meta.getColumnCount(); i++) {
-      assertEquals(schema.getColumn(i).getColumnName(), meta.getColumnName(i + 1));
-      assertEquals(schema.getColumn(i).getQualifier(), meta.getTableName(i + 1));
-      assertEquals(schema.getColumn(i).getDataType().getClass().getCanonicalName(),
-          meta.getColumnTypeName(i + 1));
-    }
-
-    int i = 0;
-    assertTrue(rs.isBeforeFirst());
-    for (; rs.next(); i++) {
-      assertEquals("test"+i%100, rs.getString(1));
-      assertEquals("test"+i%100, rs.getString("deptname"));
-      assertEquals(i+1, rs.getInt(2));
-      assertEquals(i+1, rs.getInt("score"));
-    }
-    assertEquals(10000, i);
-    assertTrue(rs.isAfterLast());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/342fd47f/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java
index 264afb7..102414b 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java
@@ -18,7 +18,6 @@
 
 package org.apache.tajo.engine.query;
 
-import org.apache.tajo.client.ResultSetUtil;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/342fd47f/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/jdbc/TestResultSet.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/jdbc/TestResultSet.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/jdbc/TestResultSet.java
new file mode 100644
index 0000000..0052827
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/jdbc/TestResultSet.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * 
+ */
+package org.apache.tajo.jdbc;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.IntegrationTest;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.TpchTestBase;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.jdbc.TajoResultSet;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.storage.*;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+
+import static org.junit.Assert.*;
+
+@Category(IntegrationTest.class)
+public class TestResultSet {
+  private static TajoTestingCluster util;
+  private static TajoConf conf;
+  private static TableDesc desc;
+  private static AbstractStorageManager sm;
+  private static TableMeta scoreMeta;
+  private static Schema scoreSchema;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    util = TpchTestBase.getInstance().getTestingCluster();
+    conf = util.getConfiguration();
+    sm = StorageManagerFactory.getStorageManager(conf);
+
+    scoreSchema = new Schema();
+    scoreSchema.addColumn("deptname", Type.TEXT);
+    scoreSchema.addColumn("score", Type.INT4);
+    scoreMeta = CatalogUtil.newTableMeta(StoreType.CSV);
+    TableStats stats = new TableStats();
+
+    Path p = sm.getTablePath("score");
+    sm.getFileSystem().mkdirs(p);
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(scoreMeta, scoreSchema,
+        new Path(p, "score"));
+    appender.init();
+    int deptSize = 100;
+    int tupleNum = 10000;
+    Tuple tuple;
+    long written = 0;
+    for (int i = 0; i < tupleNum; i++) {
+      tuple = new VTuple(2);
+      String key = "test" + (i % deptSize);
+      tuple.put(0, DatumFactory.createText(key));
+      tuple.put(1, DatumFactory.createInt4(i + 1));
+      written += key.length() + Integer.SIZE;
+      appender.addTuple(tuple);
+    }
+    appender.close();
+    stats.setNumRows(tupleNum);
+    stats.setNumBytes(written);
+    stats.setAvgRows(tupleNum);
+    stats.setNumBlocks(1000);
+    stats.setNumPartitions(100);
+    desc = new TableDesc("score", scoreSchema, scoreMeta, p);
+    desc.setStats(stats);
+  }
+
+  @AfterClass
+  public static void terminate() throws IOException {
+
+  }
+
+  @Test
+  public void test() throws IOException, SQLException {
+    TajoResultSet rs = new TajoResultSet(null, null, conf, desc);
+    ResultSetMetaData meta = rs.getMetaData();
+    assertNotNull(meta);
+    Schema schema = scoreSchema;
+    assertEquals(schema.getColumnNum(), meta.getColumnCount());
+    for (int i = 0; i < meta.getColumnCount(); i++) {
+      assertEquals(schema.getColumn(i).getColumnName(), meta.getColumnName(i + 1));
+      assertEquals(schema.getColumn(i).getQualifier(), meta.getTableName(i + 1));
+//      assertEquals(schema.getColumn(i).getDataType().getClass().getCanonicalName(),
+//          meta.getColumnTypeName(i + 1));
+      System.out.println(">>>>>>>>>>" + meta.getColumnTypeName(i + 1));
+    }
+
+    int i = 0;
+    assertTrue(rs.isBeforeFirst());
+    for (; rs.next(); i++) {
+      assertEquals("test"+i%100, rs.getString(1));
+      assertEquals("test"+i%100, rs.getString("deptname"));
+      assertEquals(i+1, rs.getInt(2));
+      assertEquals(i+1, rs.getInt("score"));
+    }
+    assertEquals(10000, i);
+    assertTrue(rs.isAfterLast());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/342fd47f/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java
new file mode 100644
index 0000000..98a7ed2
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java
@@ -0,0 +1,361 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.jdbc;
+
+import com.google.common.collect.Maps;
+import org.apache.tajo.IntegrationTest;
+import org.apache.tajo.TpchTestBase;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.util.NetUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.net.InetSocketAddress;
+import java.sql.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.*;
+
+@Category(IntegrationTest.class)
+public class TestTajoJdbc {
+  private static TpchTestBase tpch;
+  private static Connection conn;
+
+  private static String connUri;
+  @BeforeClass
+  public static void setUp() throws Exception {
+    tpch = TpchTestBase.getInstance();
+
+    TajoConf tajoConf = tpch.getTestingCluster().getMaster().getContext().getConf();
+    InetSocketAddress tajoMasterAddress =
+        NetUtils.createSocketAddr(tajoConf.getVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS));
+
+    Class.forName("org.apache.tajo.jdbc.TajoDriver").newInstance();
+
+    connUri = "jdbc:tajo://" + tajoMasterAddress.getHostName() + ":" + tajoMasterAddress.getPort();
+    conn = DriverManager.getConnection(connUri);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    if(conn != null) {
+      conn.close();
+    }
+  }
+
+  @Test
+  public void testStatement() throws Exception {
+    Statement stmt = null;
+    ResultSet res = null;
+    try {
+      stmt = conn.createStatement();
+
+      res = stmt.executeQuery("select l_returnflag, l_linestatus, count(*) as count_order from lineitem " +
+          "group by l_returnflag, l_linestatus order by l_returnflag, l_linestatus");
+
+      try {
+        Map<String,Integer> result = Maps.newHashMap();
+        result.put("NO", 3);
+        result.put("RF", 2);
+
+        assertNotNull(res);
+        assertTrue(res.next());
+        assertTrue(result.get(res.getString(1) + res.getString(2)) == res.getInt(3));
+        assertTrue(res.next());
+        assertTrue(result.get(res.getString(1) + res.getString(2)) == res.getInt(3));
+        assertFalse(res.next());
+
+        ResultSetMetaData rsmd = res.getMetaData();
+        assertEquals(3, rsmd.getColumnCount());
+        assertEquals("l_returnflag", rsmd.getColumnName(1));
+        assertEquals("l_linestatus", rsmd.getColumnName(2));
+        assertEquals("count_order", rsmd.getColumnName(3));
+      } finally {
+        res.close();
+      }
+    } finally {
+      if(res != null) {
+        res.close();
+      }
+      if(stmt != null) {
+        stmt.close();
+      }
+    }
+  }
+
+  @Test
+  public void testPreparedStatement() throws Exception {
+    PreparedStatement stmt = null;
+    ResultSet res = null;
+    try {
+      /*
+      test data set
+      1,17.0,N
+      1,36.0,N
+      2,38.0,N
+      3,45.0,R
+      3,49.0,R
+      */
+
+      String sql =
+          "select l_orderkey, l_quantity, l_returnflag from lineitem where l_quantity > ? and l_returnflag = ?";
+
+      stmt = conn.prepareStatement(sql);
+
+      stmt.setInt(1, 20);
+      stmt.setString(2, "N");
+
+      res = stmt.executeQuery();
+
+      ResultSetMetaData rsmd = res.getMetaData();
+      assertEquals(3, rsmd.getColumnCount());
+      assertEquals("l_orderkey", rsmd.getColumnName(1));
+      assertEquals("l_quantity", rsmd.getColumnName(2));
+      assertEquals("l_returnflag", rsmd.getColumnName(3));
+
+      try {
+        int numRows = 0;
+        String[] resultData = {"136.0N", "238.0N"};
+        while(res.next()) {
+          assertEquals(resultData[numRows],
+              ("" + res.getObject(1).toString() + res.getObject(2).toString() + res.getObject(3).toString()));
+          numRows++;
+        }
+        assertEquals(2, numRows);
+      } finally {
+        res.close();
+      }
+
+      stmt.setInt(1, 20);
+      stmt.setString(2, "R");
+
+      res = stmt.executeQuery();
+
+      rsmd = res.getMetaData();
+      assertEquals(3, rsmd.getColumnCount());
+      assertEquals("l_orderkey", rsmd.getColumnName(1));
+      assertEquals("l_quantity", rsmd.getColumnName(2));
+      assertEquals("l_returnflag", rsmd.getColumnName(3));
+
+      try {
+        int numRows = 0;
+        String[] resultData = {"345.0R", "349.0R"};
+        while(res.next()) {
+          assertEquals(resultData[numRows],
+              ("" + res.getObject(1).toString() + res.getObject(2).toString() + res.getObject(3).toString()));
+          numRows++;
+        }
+        assertEquals(2, numRows);
+      } finally {
+        res.close();
+      }
+    } finally {
+      if(res != null) {
+        res.close();
+      }
+      if(stmt != null) {
+        stmt.close();
+      }
+    }
+  }
+
+  @Test
+  public void testDatabaseMetaDataGetTable() throws Exception {
+    DatabaseMetaData dbmd = conn.getMetaData();
+
+    ResultSet rs = null;
+
+    try {
+      rs = dbmd.getTables(null, null, null, null);
+
+      ResultSetMetaData rsmd = rs.getMetaData();
+      int numCols = rsmd.getColumnCount();
+
+      assertEquals(5, numCols);
+      int numTables = 0;
+
+      List<String> tableNames = new ArrayList<String>(
+          tpch.getTestingCluster().getMaster().getCatalog().getAllTableNames());
+
+      Collections.sort(tableNames);
+
+      while(rs.next()) {
+        assertEquals(tableNames.get(numTables), rs.getString("TABLE_NAME"));
+        numTables++;
+      }
+
+      assertEquals(tableNames.size(), numTables);
+    } finally {
+      if(rs != null) {
+        rs.close();
+      }
+    }
+  }
+
+  @Test
+  public void testDatabaseMetaDataGetColumns() throws Exception {
+    DatabaseMetaData dbmd = conn.getMetaData();
+
+    ResultSet rs = null;
+
+    try {
+      String tableName = "lineitem";
+      rs = dbmd.getColumns(null, null, tableName, null);
+
+      ResultSetMetaData rsmd = rs.getMetaData();
+      int numCols = rsmd.getColumnCount();
+
+      assertEquals(22, numCols);
+      int numColumns = 0;
+
+      TableDesc tableDesc = tpch.getTestingCluster().getMaster().getCatalog().getTableDesc(tableName);
+      assertNotNull(tableDesc);
+
+      List<Column> columns = tableDesc.getSchema().getColumns();
+
+      while(rs.next()) {
+        assertEquals(tableName, rs.getString("TABLE_NAME"));
+        System.out.println(">>>>" + rs.getString("COLUMN_NAME"));
+        assertEquals(columns.get(numColumns).getColumnName(), rs.getString("COLUMN_NAME"));
+        //TODO assert type
+        numColumns++;
+      }
+
+      assertEquals(16, numColumns);
+    } finally {
+      if(rs != null) {
+        rs.close();
+      }
+    }
+  }
+
+  @Test
+  public void testMultipleConnections() throws Exception {
+    Connection[] conns = new Connection[2];
+    conns[0] = DriverManager.getConnection(connUri);
+    conns[1] = DriverManager.getConnection(connUri);
+
+    try {
+      for(int i = 0; i < conns.length; i++) {
+        Statement stmt = null;
+        ResultSet res = null;
+        try {
+          stmt = conns[i].createStatement();
+
+          res = stmt.executeQuery("select l_returnflag, l_linestatus, count(*) as count_order from lineitem " +
+              "group by l_returnflag, l_linestatus order by l_returnflag, l_linestatus");
+
+          try {
+            Map<String,Integer> result = Maps.newHashMap();
+            result.put("NO", 3);
+            result.put("RF", 2);
+
+            assertNotNull(res);
+            assertTrue(res.next());
+            assertTrue(result.get(res.getString(1) + res.getString(2)) == res.getInt(3));
+            assertTrue(res.next());
+            assertTrue(result.get(res.getString(1) + res.getString(2)) == res.getInt(3));
+            assertFalse(res.next());
+
+            ResultSetMetaData rsmd = res.getMetaData();
+            assertEquals(3, rsmd.getColumnCount());
+            assertEquals("l_returnflag", rsmd.getColumnName(1));
+            assertEquals("l_linestatus", rsmd.getColumnName(2));
+            assertEquals("count_order", rsmd.getColumnName(3));
+          } finally {
+            res.close();
+          }
+        } finally {
+          if(res != null) {
+            res.close();
+          }
+          if(stmt != null) {
+            stmt.close();
+          }
+        }
+      }
+    } finally {
+      conns[0].close();
+      conns[1].close();
+    }
+  }
+
+  @Test
+  public void testMultipleConnectionsSequentialClose() throws Exception {
+    Connection[] conns = new Connection[2];
+    conns[0] = DriverManager.getConnection(connUri);
+    conns[1] = DriverManager.getConnection(connUri);
+
+    try {
+      for(int i = 0; i < conns.length; i++) {
+        Statement stmt = null;
+        ResultSet res = null;
+        try {
+          stmt = conns[i].createStatement();
+
+          res = stmt.executeQuery("select l_returnflag, l_linestatus, count(*) as count_order from lineitem " +
+              "group by l_returnflag, l_linestatus order by l_returnflag, l_linestatus");
+
+          try {
+            Map<String,Integer> result = Maps.newHashMap();
+            result.put("NO", 3);
+            result.put("RF", 2);
+
+            assertNotNull(res);
+            assertTrue(res.next());
+            assertTrue(result.get(res.getString(1) + res.getString(2)) == res.getInt(3));
+            assertTrue(res.next());
+            assertTrue(result.get(res.getString(1) + res.getString(2)) == res.getInt(3));
+            assertFalse(res.next());
+
+            ResultSetMetaData rsmd = res.getMetaData();
+            assertEquals(3, rsmd.getColumnCount());
+            assertEquals("l_returnflag", rsmd.getColumnName(1));
+            assertEquals("l_linestatus", rsmd.getColumnName(2));
+            assertEquals("count_order", rsmd.getColumnName(3));
+          } finally {
+            res.close();
+          }
+        } finally {
+          if(res != null) {
+            res.close();
+          }
+          if(stmt != null) {
+            stmt.close();
+          }
+          conns[i].close();
+        }
+      }
+    } finally {
+      if(!conns[0].isClosed()) {
+        conns[0].close();
+      }
+      if(!conns[1].isClosed()) {
+        conns[1].close();
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/342fd47f/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
index 88c8244..a03396e 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
@@ -87,7 +87,9 @@ public abstract class NettyClientBase implements Closeable {
   public void close() {
     this.channel.close().awaitUninterruptibly();
     this.bootstrap.releaseExternalResources();
-    LOG.info("Proxy is disconnected from " +
-        addr.getAddress().getHostAddress() + ":" + addr.getPort());
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("Proxy is disconnected from " +
+          addr.getAddress().getHostAddress() + ":" + addr.getPort());
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/342fd47f/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
index 94f1720..8eacaf5 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
@@ -40,9 +40,9 @@ public class RpcConnectionPool {
   }
 
   public synchronized static RpcConnectionPool getPool(TajoConf conf) {
-      if(instance == null) {
-        instance = new RpcConnectionPool(conf);
-      }
+    if(instance == null) {
+      instance = new RpcConnectionPool(conf);
+    }
 
     return instance;
   }
@@ -75,7 +75,9 @@ public class RpcConnectionPool {
     }
 
     try {
-      LOG.info("CloseConnection [" + client.getKey() + "]");
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("CloseConnection [" + client.getKey() + "]");
+      }
       synchronized(connections) {
         connections.remove(client.getKey());
       }
@@ -85,8 +87,10 @@ public class RpcConnectionPool {
     }
   }
 
-  public void close() {
-    LOG.info("Pool Closed");
+  public synchronized void close() {
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("Pool Closed");
+    }
     synchronized(connections) {
       for(NettyClientBase eachClient: connections.values()) {
         try {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/342fd47f/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java
index 50ec204..c5303d8 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java
@@ -35,14 +35,21 @@ public abstract class ServerCallable<T> {
   protected long endTime;
   protected Class protocol;
   protected boolean asyncMode;
+  protected boolean closeConn;
 
   public abstract T call(NettyClientBase client) throws Exception;
 
   public ServerCallable(TajoConf conf, InetSocketAddress addr, Class protocol, boolean asyncMode) {
+    this(conf, addr, protocol, asyncMode, false);
+  }
+
+  public ServerCallable(TajoConf conf, InetSocketAddress addr, Class protocol,
+                        boolean asyncMode, boolean closeConn) {
     this.tajoConf = conf;
     this.addr = addr;
     this.protocol = protocol;
     this.asyncMode = asyncMode;
+    this.closeConn = closeConn;
   }
 
   public void beforeCall() {
@@ -81,8 +88,10 @@ public abstract class ServerCallable<T> {
         }
         return call(client);
       } catch (Throwable t) {
-        RpcConnectionPool.getPool(tajoConf).closeConnection(client);
-        client = null;
+        if(!closeConn) {
+          RpcConnectionPool.getPool(tajoConf).closeConnection(client);
+          client = null;
+        }
         exceptions.add(t);
         if(abort) {
           throw new ServiceException(t.getMessage(), t);
@@ -92,7 +101,11 @@ public abstract class ServerCallable<T> {
         }
       } finally {
         afterCall();
-        RpcConnectionPool.getPool(tajoConf).releaseConnection(client);
+        if(closeConn) {
+          RpcConnectionPool.getPool(tajoConf).closeConnection(client);
+        } else {
+          RpcConnectionPool.getPool(tajoConf).releaseConnection(client);
+        }
       }
       try {
         Thread.sleep(pause * (tries + 1));
@@ -118,8 +131,10 @@ public abstract class ServerCallable<T> {
       client = RpcConnectionPool.getPool(tajoConf).getConnection(addr, protocol, asyncMode);
       return call(client);
     } catch (Throwable t) {
-      RpcConnectionPool.getPool(tajoConf).closeConnection(client);
-      client = null;
+      if(!closeConn) {
+        RpcConnectionPool.getPool(tajoConf).closeConnection(client);
+        client = null;
+      }
       Throwable t2 = translateException(t);
       if (t2 instanceof IOException) {
         throw (IOException)t2;
@@ -128,7 +143,11 @@ public abstract class ServerCallable<T> {
       }
     } finally {
       afterCall();
-      RpcConnectionPool.getPool(tajoConf).releaseConnection(client);
+      if(closeConn) {
+        RpcConnectionPool.getPool(tajoConf).closeConnection(client);
+      } else {
+        RpcConnectionPool.getPool(tajoConf).releaseConnection(client);
+      }
     }
   }
 


Mime
View raw message