tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jihoon...@apache.org
Subject [7/8] tajo git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into index_support
Date Sat, 18 Apr 2015 03:41:13 GMT
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into index_support


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/882297e7
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/882297e7
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/882297e7

Branch: refs/heads/index_support
Commit: 882297e788e13e991a7445a576d3dd73846291e7
Parents: 4d7910a a745385
Author: Jihoon Son <jihoonson@apache.org>
Authored: Sat Apr 18 12:35:34 2015 +0900
Committer: Jihoon Son <jihoonson@apache.org>
Committed: Sat Apr 18 12:35:34 2015 +0900

----------------------------------------------------------------------
 .travis.yml                                     |   2 +-
 CHANGES                                         |  15 +
 tajo-catalog/pom.xml                            |   4 +-
 .../tajo/catalog/AbstractCatalogClient.java     |  90 +-
 .../org/apache/tajo/catalog/CatalogUtil.java    |   3 +-
 .../org/apache/tajo/catalog/FunctionDesc.java   |   4 +-
 .../tajo/function/FunctionInvocation.java       |  24 +-
 .../org/apache/tajo/function/FunctionUtil.java  |  10 +
 .../tajo/function/PythonInvocationDesc.java     |  98 ++
 .../src/main/proto/CatalogProtos.proto          |   6 +
 .../apache/tajo/catalog/TestFunctionDesc.java   |   4 +-
 tajo-catalog/tajo-catalog-drivers/pom.xml       |  34 +-
 .../tajo-catalog-drivers/tajo-hcatalog/pom.xml  | 739 --------------
 .../tajo/catalog/store/HCatalogStore.java       | 993 -------------------
 .../catalog/store/HCatalogStoreClientPool.java  | 170 ----
 .../apache/tajo/catalog/store/HCatalogUtil.java | 147 ---
 .../tajo/catalog/store/TestHCatalogStore.java   | 467 ---------
 .../tajo-catalog-drivers/tajo-hive/pom.xml      | 351 +++++++
 .../tajo/catalog/store/HiveCatalogStore.java    | 980 ++++++++++++++++++
 .../store/HiveCatalogStoreClientPool.java       | 170 ++++
 .../tajo/catalog/store/HiveCatalogUtil.java     | 127 +++
 .../catalog/store/TestHiveCatalogStore.java     | 504 ++++++++++
 .../tajo/catalog/store/AbstractDBStore.java     |   5 +-
 .../org/apache/tajo/catalog/TestCatalog.java    |  10 +-
 .../org/apache/tajo/cli/tsql/SimpleParser.java  |   2 +-
 .../tajo/client/CatalogAdminClientImpl.java     |  68 +-
 .../org/apache/tajo/client/QueryClientImpl.java |  50 +-
 .../apache/tajo/client/SessionConnection.java   |  40 +-
 .../main/java/org/apache/tajo/QueryVars.java    |   2 +-
 .../java/org/apache/tajo/conf/TajoConf.java     |   4 +
 .../java/org/apache/tajo/datum/AnyDatum.java    |  82 ++
 .../java/org/apache/tajo/datum/BlobDatum.java   |   5 +-
 .../org/apache/tajo/datum/DatumFactory.java     |  21 +-
 .../java/org/apache/tajo/json/DatumAdapter.java |   6 +-
 .../java/org/apache/tajo/storage/Tuple.java     |  54 +-
 .../java/org/apache/tajo/util/FileUtil.java     |  23 +
 .../java/org/apache/tajo/util/KeyValueSet.java  |   2 +-
 .../tajo/util/datetime/DateTimeFormat.java      |   2 -
 tajo-core/pom.xml                               | 212 ----
 .../tajo/engine/codegen/EvalCodeGenContext.java |   6 +-
 .../codegen/LegacyFunctionBindingEmitter.java   |   4 +-
 .../engine/codegen/VariablesPreBuilder.java     |   2 +-
 .../tajo/engine/function/FunctionLoader.java    |  59 +-
 .../tajo/engine/function/builtin/StdDev.java    |  94 --
 .../tajo/engine/function/builtin/StdDevPop.java |  10 +-
 .../engine/function/builtin/StdDevSamp.java     |   8 +-
 .../tajo/engine/function/builtin/VarPop.java    |  42 +
 .../engine/function/builtin/VarPopDouble.java   |  39 +
 .../engine/function/builtin/VarPopFloat.java    |  39 +
 .../tajo/engine/function/builtin/VarPopInt.java |  39 +
 .../engine/function/builtin/VarPopLong.java     |  39 +
 .../tajo/engine/function/builtin/VarSamp.java   |  40 +
 .../engine/function/builtin/VarSampDouble.java  |  39 +
 .../engine/function/builtin/VarSampFloat.java   |  39 +
 .../engine/function/builtin/VarSampInt.java     |  39 +
 .../engine/function/builtin/VarSampLong.java    |  39 +
 .../tajo/engine/function/builtin/Variance.java  |  94 ++
 .../apache/tajo/engine/parser/SQLAnalyzer.java  |  10 +
 .../apache/tajo/engine/planner/Projector.java   |   2 +-
 .../rules/GlobalPlanEqualityTester.java         |   2 +-
 .../planner/physical/AggregationExec.java       |   2 +-
 .../planner/physical/BSTIndexScanExec.java      |   2 +-
 .../engine/planner/physical/CommonJoinExec.java |   2 +-
 .../DistinctGroupbyFirstAggregationExec.java    |   2 +-
 .../DistinctGroupbyHashAggregationExec.java     |   2 +-
 .../DistinctGroupbySecondAggregationExec.java   |   2 +-
 .../DistinctGroupbyThirdAggregationExec.java    |   2 +-
 .../engine/planner/physical/EvalExprExec.java   |   2 +-
 .../planner/physical/HashLeftOuterJoinExec.java |   4 +-
 .../engine/planner/physical/HavingExec.java     |   2 +-
 .../engine/planner/physical/SelectionExec.java  |   2 +-
 .../engine/planner/physical/SeqScanExec.java    |   4 +-
 .../engine/planner/physical/WindowAggExec.java  |   2 +-
 .../apache/tajo/engine/query/QueryContext.java  |   2 +-
 .../org/apache/tajo/master/GlobalEngine.java    |   2 -
 .../org/apache/tajo/master/QueryInProgress.java |   6 +-
 .../apache/tajo/master/TajoContainerProxy.java  |  38 +-
 .../java/org/apache/tajo/master/TajoMaster.java |  16 +-
 .../apache/tajo/master/exec/QueryExecutor.java  | 100 +-
 .../apache/tajo/querymaster/QueryMaster.java    |  24 +-
 .../tajo/worker/ExecutionBlockContext.java      |  29 +-
 .../tajo/worker/TajoResourceAllocator.java      |  20 +-
 .../java/org/apache/tajo/worker/TajoWorker.java |  11 +-
 .../main/java/org/apache/tajo/worker/Task.java  | 123 ++-
 .../apache/tajo/worker/TaskAttemptContext.java  |   7 +
 .../java/org/apache/tajo/worker/TaskRunner.java |   2 -
 .../tajo/worker/WorkerHeartbeatService.java     |  10 +-
 .../ConnectivityCheckerRuleForTajoWorker.java   |  26 +-
 tajo-core/src/main/proto/InternalTypes.proto    |   2 +-
 tajo-core/src/main/resources/python/__init__.py |  17 +
 .../src/main/resources/python/controller.py     | 330 ++++++
 .../src/main/resources/python/tajo_util.py      | 103 ++
 .../src/main/resources/webapps/admin/query.jsp  |  30 +-
 .../java/org/apache/tajo/QueryTestCaseBase.java |   4 +-
 .../org/apache/tajo/TajoTestingCluster.java     |  31 +-
 .../org/apache/tajo/cli/tools/TestTajoDump.java |   4 +-
 .../org/apache/tajo/cli/tsql/TestTajoCli.java   |  56 +-
 .../apache/tajo/engine/eval/ExprTestBase.java   |  19 +-
 .../apache/tajo/engine/eval/TestEvalTree.java   |  68 +-
 .../tajo/engine/eval/TestEvalTreeUtil.java      |  12 +-
 .../engine/function/TestBuiltinFunctions.java   | 123 +++
 .../tajo/engine/function/TestMathFunctions.java |  10 +-
 .../engine/function/TestPythonFunctions.java    |  44 +
 .../tajo/engine/query/TestAlterTablespace.java  |   2 +-
 .../apache/tajo/engine/query/TestCTASQuery.java |   6 +-
 .../tajo/engine/query/TestCreateTable.java      |  14 +-
 .../tajo/engine/query/TestGroupByQuery.java     |  14 +
 .../tajo/engine/query/TestInsertQuery.java      |  40 +-
 .../apache/tajo/engine/query/TestNetTypes.java  |  26 +-
 .../tajo/engine/query/TestSelectQuery.java      |  39 +-
 .../apache/tajo/engine/query/TestSortQuery.java |   6 +-
 .../tajo/engine/query/TestTablePartitions.java  |  22 +-
 .../org/apache/tajo/jdbc/TestResultSet.java     |   4 +-
 .../tajo/jdbc/TestTajoDatabaseMetaData.java     |  22 +-
 .../java/org/apache/tajo/jdbc/TestTajoJdbc.java |  12 +-
 tajo-core/src/test/resources/python/__init__.py |  17 +
 .../src/test/resources/python/test_funcs.py     |  33 +
 .../src/test/resources/python/test_funcs.pyc    | Bin 0 -> 1042 bytes
 .../src/test/resources/python/test_funcs2.py    |  32 +
 .../create_table_various_types_for_hcatalog.sql |  50 -
 ...ate_table_various_types_for_hive_catalog.sql |  50 +
 .../testGroupbyWithPythonFunc.sql               |   1 +
 .../testGroupbyWithPythonFunc2.sql              |   1 +
 .../testNestedPythonFunction.sql                |   1 +
 .../TestSelectQuery/testSelectPythonFuncs.sql   |   2 +
 .../testSelectWithParentheses1.sql              |   1 +
 .../testSelectWithParentheses2.sql              |   1 +
 .../testSelectWithPredicateOnPythonFunc.sql     |   1 +
 .../testGroupbyWithPythonFunc.result            |   7 +
 .../testGroupbyWithPythonFunc2.result           |   7 +
 .../testNestedPythonFunction.result             |   7 +
 .../testSelectPythonFuncs.result                |   7 +
 .../testSelectWithParentheses1.result           |   3 +
 .../testSelectWithParentheses2.result           |   3 +
 .../testSelectWithPredicateOnPythonFunc.result  |  17 +
 tajo-dist/pom.xml                               |   4 +-
 tajo-dist/src/main/bin/tajo                     |   3 -
 .../src/main/conf/catalog-site.xml.template     |   6 +-
 tajo-dist/src/main/conf/tajo-env.sh             |   2 +-
 .../configuration/catalog_configuration.rst     |  40 +-
 tajo-docs/src/main/sphinx/functions.rst         |  70 +-
 .../src/main/sphinx/functions/json_func.rst     |   1 +
 .../src/main/sphinx/hcatalog_integration.rst    |  52 -
 tajo-docs/src/main/sphinx/hive_integration.rst  |  42 +
 tajo-docs/src/main/sphinx/index.rst             |   2 +-
 .../org/apache/tajo/plan/ExprAnnotator.java     |  26 +-
 .../org/apache/tajo/plan/LogicalPlanner.java    |   9 +-
 .../apache/tajo/plan/expr/AlgebraicUtil.java    |   6 +-
 .../tajo/plan/expr/BetweenPredicateEval.java    |  28 +-
 .../org/apache/tajo/plan/expr/CastEval.java     |  21 +-
 .../org/apache/tajo/plan/expr/EvalContext.java  |  45 +
 .../org/apache/tajo/plan/expr/EvalNode.java     |   5 +-
 .../org/apache/tajo/plan/expr/EvalTreeUtil.java |   4 +-
 .../org/apache/tajo/plan/expr/FieldEval.java    |   4 +-
 .../org/apache/tajo/plan/expr/FunctionEval.java |   4 +-
 .../tajo/plan/expr/GeneralFunctionEval.java     |  56 +-
 .../plan/expr/PatternMatchPredicateEval.java    |   4 +-
 .../exprrewrite/EvalTreeOptimizationRule.java   |   3 +-
 .../plan/exprrewrite/EvalTreeOptimizer.java     |   1 +
 .../plan/exprrewrite/rules/ConstantFolding.java |  33 +-
 .../tajo/plan/function/FunctionInvoke.java      |  90 ++
 .../plan/function/FunctionInvokeContext.java    |  74 ++
 .../function/LegacyScalarFunctionInvoke.java    |  81 ++
 .../plan/function/PythonFunctionInvoke.java     |  59 ++
 .../function/python/PythonScriptEngine.java     | 368 +++++++
 .../plan/function/python/TajoScriptEngine.java  |  83 ++
 .../tajo/plan/function/stream/BufferPool.java   |  74 ++
 .../function/stream/ByteBufInputChannel.java    |  71 ++
 .../plan/function/stream/ByteBufLineReader.java | 176 ++++
 .../function/stream/CSVLineDeserializer.java    |  99 ++
 .../tajo/plan/function/stream/CSVLineSerDe.java |  42 +
 .../plan/function/stream/CSVLineSerializer.java | 118 +++
 .../stream/FieldSerializerDeserializer.java     |  36 +
 .../function/stream/FieldSplitProcessor.java    |  34 +
 .../tajo/plan/function/stream/InputHandler.java |  78 ++
 .../function/stream/LineSplitProcessor.java     |  45 +
 .../plan/function/stream/OutputHandler.java     | 156 +++
 .../plan/function/stream/StreamingUtil.java     |  91 ++
 .../stream/TextFieldSerializerDeserializer.java | 257 +++++
 .../function/stream/TextLineDeserializer.java   |  60 ++
 .../function/stream/TextLineParsingError.java   |  31 +
 .../plan/function/stream/TextLineSerDe.java     |  65 ++
 .../function/stream/TextLineSerializer.java     |  45 +
 .../plan/rewrite/rules/FilterPushDownRule.java  |   2 +-
 .../rules/LogicalPlanEqualityTester.java        |   2 +-
 .../rewrite/rules/PartitionedTableRewriter.java |   2 +-
 .../tajo/plan/serder/EvalNodeDeserializer.java  |  16 +-
 .../tajo/plan/serder/EvalNodeSerializer.java    |  10 +-
 .../plan/serder/LogicalNodeDeserializer.java    | 143 +--
 tajo-plan/src/main/proto/Plan.proto             |  28 +-
 tajo-project/pom.xml                            |   1 +
 .../main/java/org/apache/tajo/rpc/RpcUtils.java |  34 -
 .../org/apache/tajo/rpc/AsyncRpcClient.java     |  58 +-
 .../org/apache/tajo/rpc/AsyncRpcServer.java     |  82 +-
 .../org/apache/tajo/rpc/BlockingRpcClient.java  |  88 +-
 .../org/apache/tajo/rpc/BlockingRpcServer.java  |  85 +-
 .../tajo/rpc/ConnectionCloseFutureListener.java |  35 +
 .../org/apache/tajo/rpc/NettyClientBase.java    | 124 +--
 .../tajo/rpc/ProtoChannelInitializer.java       |  11 +-
 .../org/apache/tajo/rpc/RpcClientManager.java   | 185 ++++
 .../org/apache/tajo/rpc/RpcConnectionPool.java  | 191 ----
 .../org/apache/tajo/rpc/ServerCallable.java     |  36 +-
 .../java/org/apache/tajo/rpc/TestAsyncRpc.java  |  72 +-
 .../org/apache/tajo/rpc/TestBlockingRpc.java    |  85 +-
 .../apache/tajo/rpc/TestRpcClientManager.java   |  97 ++
 .../org/apache/tajo/storage/TestLazyTuple.java  |   2 +-
 .../testErrorTolerance1.json                    |  12 +-
 .../dataset/TestJsonSerDe/testVariousType.json  |   2 +-
 208 files changed, 7771 insertions(+), 4298 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/882297e7/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
----------------------------------------------------------------------
diff --cc tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
index 3b72639,49be29a..c967b9d
--- a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
+++ b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
@@@ -373,27 -372,9 +373,27 @@@ public abstract class AbstractCatalogCl
    }
  
    @Override
 +  public List<IndexDescProto> getAllIndexes() {
 +    try {
-       return new ServerCallable<List<IndexDescProto>>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
++      return new ServerCallable<List<IndexDescProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
 +
 +        @Override
 +        public List<IndexDescProto> call(NettyClientBase client) throws Exception {
 +          CatalogProtocolService.BlockingInterface stub = getStub(client);
 +          GetIndexesProto response = stub.getAllIndexes(null, ProtoUtil.NULL_PROTO);
 +          return response.getIndexList();
 +        }
 +      }.withRetries();
 +    } catch (ServiceException e) {
 +      LOG.error(e.getMessage(), e);
 +      return null;
 +    }
 +  }
 +
 +  @Override
    public final PartitionMethodDesc getPartitionMethod(final String databaseName, final String tableName) {
      try {
-       return new ServerCallable<PartitionMethodDesc>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
+       return new ServerCallable<PartitionMethodDesc>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
          public PartitionMethodDesc call(NettyClientBase client) throws ServiceException {
  
            TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder();
@@@ -637,40 -618,17 +637,40 @@@
    }
  
    @Override
 -  public boolean existIndexByColumn(final String databaseName, final String tableName, final String columnName) {
 +  public boolean existIndexByColumns(final String databaseName, final String tableName, final Column [] columns) {
 +    return existIndexByColumnNames(databaseName, tableName, extractColumnNames(columns));
 +  }
 +
 +  @Override
 +  public boolean existIndexByColumnNames(final String databaseName, final String tableName, final String [] columnNames) {
      try {
-       return new ServerCallable<Boolean>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
+       return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
          public Boolean call(NettyClientBase client) throws ServiceException {
  
 -          GetIndexByColumnRequest.Builder builder = GetIndexByColumnRequest.newBuilder();
 +          GetIndexByColumnNamesRequest.Builder builder = GetIndexByColumnNamesRequest.newBuilder();
            builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName, tableName));
 -          builder.setColumnName(columnName);
 +          for (String colunName : columnNames) {
 +            builder.addColumnNames(colunName);
 +          }
  
            CatalogProtocolService.BlockingInterface stub = getStub(client);
 -          return stub.existIndexByColumn(null, builder.build()).getValue();
 +          return stub.existIndexByColumnNames(null, builder.build()).getValue();
 +        }
 +      }.withRetries();
 +    } catch (ServiceException e) {
 +      LOG.error(e.getMessage(), e);
 +      return false;
 +    }
 +  }
 +
 +  @Override
 +  public boolean existIndexesByTable(final String databaseName, final String tableName) {
 +    try {
-       return new ServerCallable<Boolean>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
++      return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
 +        public Boolean call(NettyClientBase client) throws ServiceException {
 +
 +          CatalogProtocolService.BlockingInterface stub = getStub(client);
 +          return stub.existIndexesByTable(null, CatalogUtil.buildTableIdentifier(databaseName, tableName)).getValue();
          }
        }.withRetries();
      } catch (ServiceException e) {
@@@ -699,60 -657,20 +699,60 @@@
      }
    }
  
 +  private static String[] extractColumnNames(Column[] columns) {
 +    String[] columnNames = new String [columns.length];
 +    for (int i = 0; i < columnNames.length; i++) {
 +      columnNames[i] = columns[i].getSimpleName();
 +    }
 +    return columnNames;
 +  }
 +
 +  @Override
 +  public final IndexDesc getIndexByColumns(final String databaseName,
 +                                               final String tableName,
 +                                               final Column [] columns) {
 +    return getIndexByColumnNames(databaseName, tableName, extractColumnNames(columns));
 +  }
 +
    @Override
 -  public final IndexDesc getIndexByColumn(final String databaseName,
 -                                          final String tableName,
 -                                          final String columnName) {
 +  public final IndexDesc getIndexByColumnNames(final String databaseName,
 +                                           final String tableName,
 +                                           final String [] columnNames) {
      try {
-       return new ServerCallable<IndexDesc>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
+       return new ServerCallable<IndexDesc>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
          public IndexDesc call(NettyClientBase client) throws ServiceException {
  
 -          GetIndexByColumnRequest.Builder builder = GetIndexByColumnRequest.newBuilder();
 +          GetIndexByColumnNamesRequest.Builder builder = GetIndexByColumnNamesRequest.newBuilder();
            builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName, tableName));
 -          builder.setColumnName(columnName);
 +          for (String columnName : columnNames) {
 +            builder.addColumnNames(columnName);
 +          }
  
            CatalogProtocolService.BlockingInterface stub = getStub(client);
 -          return new IndexDesc(stub.getIndexByColumn(null, builder.build()));
 +          return new IndexDesc(stub.getIndexByColumnNames(null, builder.build()));
 +        }
 +      }.withRetries();
 +    } catch (ServiceException e) {
 +      LOG.error(e.getMessage(), e);
 +      return null;
 +    }
 +  }
 +
 +  @Override
 +  public final Collection<IndexDesc> getAllIndexesByTable(final String databaseName,
 +                                                          final String tableName) {
 +    try {
-       return new ServerCallable<Collection<IndexDesc>>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
++      return new ServerCallable<Collection<IndexDesc>>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
 +        @Override
 +        public Collection<IndexDesc> call(NettyClientBase client) throws Exception {
 +          TableIdentifierProto proto = CatalogUtil.buildTableIdentifier(databaseName, tableName);
 +          CatalogProtocolService.BlockingInterface stub = getStub(client);
 +          GetAllIndexesResponse response = stub.getAllIndexesByTable(null, proto);
 +          List<IndexDesc> indexDescs = TUtil.newList();
 +          for (IndexDescProto descProto : response.getIndexDescList()) {
 +            indexDescs.add(new IndexDesc(descProto));
 +          }
 +          return indexDescs;
          }
        }.withRetries();
      } catch (ServiceException e) {
@@@ -783,9 -701,27 +783,9 @@@
    }
    
    @Override
 -  public List<IndexProto> getAllIndexes() {
 -    try {
 -      return new ServerCallable<List<IndexProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
 -
 -        @Override
 -        public List<IndexProto> call(NettyClientBase client) throws Exception {
 -          CatalogProtocolService.BlockingInterface stub = getStub(client);
 -          GetIndexesProto response = stub.getAllIndexes(null, ProtoUtil.NULL_PROTO);
 -          return response.getIndexList();
 -        }
 -      }.withRetries();
 -    } catch (ServiceException e) {
 -      LOG.error(e.getMessage(), e);
 -      return null;
 -    }
 -  }
 -
 -  @Override
    public final boolean createFunction(final FunctionDesc funcDesc) {
      try {
-       return new ServerCallable<Boolean>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
+       return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
          public Boolean call(NettyClientBase client) throws ServiceException {
            CatalogProtocolService.BlockingInterface stub = getStub(client);
            return stub.createFunction(null, funcDesc.getProto()).getValue();

http://git-wip-us.apache.org/repos/asf/tajo/blob/882297e7/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
----------------------------------------------------------------------
diff --cc tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
index 3a6014d,dcfad8d..0965bc8
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
@@@ -24,9 -24,7 +24,8 @@@ import org.apache.tajo.DataTypeUtil
  import org.apache.tajo.TajoConstants;
  import org.apache.tajo.catalog.partition.PartitionMethodDesc;
  import org.apache.tajo.catalog.proto.CatalogProtos;
- import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto;
  import org.apache.tajo.catalog.proto.CatalogProtos.SchemaProto;
 +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
  import org.apache.tajo.catalog.proto.CatalogProtos.TableDescProto;
  import org.apache.tajo.common.TajoDataTypes;
  import org.apache.tajo.common.TajoDataTypes.DataType;

http://git-wip-us.apache.org/repos/asf/tajo/blob/882297e7/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/882297e7/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java
----------------------------------------------------------------------
diff --cc tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java
index 0000000,5b1a996..700b327
mode 000000,100644..100644
--- a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java
+++ b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java
@@@ -1,0 -1,964 +1,980 @@@
+ /**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.tajo.catalog.store;
+ 
+ import com.google.common.collect.Lists;
+ 
+ import org.apache.commons.lang.StringEscapeUtils;
+ import org.apache.commons.logging.Log;
+ import org.apache.commons.logging.LogFactory;
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.fs.FileSystem;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.hive.metastore.TableType;
+ import org.apache.hadoop.hive.metastore.api.*;
+ import org.apache.hadoop.hive.serde.serdeConstants;
+ import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
+ import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe;
+ import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
+ import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe;
+ import org.apache.tajo.TajoConstants;
+ import org.apache.tajo.catalog.*;
+ import org.apache.tajo.catalog.exception.*;
+ import org.apache.tajo.catalog.partition.PartitionMethodDesc;
+ import org.apache.tajo.catalog.proto.CatalogProtos;
+ import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto;
+ import org.apache.tajo.catalog.proto.CatalogProtos.DatabaseProto;
 -import org.apache.tajo.catalog.proto.CatalogProtos.IndexProto;
++import org.apache.tajo.catalog.proto.CatalogProtos.IndexDescProto;
+ import org.apache.tajo.catalog.proto.CatalogProtos.TableDescriptorProto;
+ import org.apache.tajo.catalog.proto.CatalogProtos.TableOptionProto;
+ import org.apache.tajo.catalog.proto.CatalogProtos.TablePartitionProto;
+ import org.apache.tajo.catalog.proto.CatalogProtos.TableStatsProto;
+ import org.apache.tajo.catalog.proto.CatalogProtos.TablespaceProto;
+ import org.apache.tajo.catalog.statistics.TableStats;
+ import org.apache.tajo.common.TajoDataTypes;
+ import org.apache.tajo.common.exception.NotImplementedException;
+ import org.apache.tajo.conf.TajoConf;
+ import org.apache.tajo.exception.InternalException;
+ import org.apache.tajo.storage.StorageConstants;
+ import org.apache.tajo.util.KeyValueSet;
+ import org.apache.thrift.TException;
+ 
+ import java.io.IOException;
+ import java.util.*;
+ 
+ import static org.apache.tajo.catalog.proto.CatalogProtos.PartitionType;
+ 
+ public class HiveCatalogStore extends CatalogConstants implements CatalogStore {
+   protected final Log LOG = LogFactory.getLog(getClass());
+ 
+   private static String HIVE_WAREHOUSE_DIR_CONF_KEY = "hive.metastore.warehouse.dir";
+ 
+   protected Configuration conf;
+   private static final int CLIENT_POOL_SIZE = 2;
+   private final HiveCatalogStoreClientPool clientPool;
+   private final String defaultTableSpaceUri;
+ 
+   public HiveCatalogStore(final Configuration conf) throws InternalException {
+     if (!(conf instanceof TajoConf)) {
+       throw new CatalogException("Invalid Configuration Type:" + conf.getClass().getSimpleName());
+     }
+     this.conf = conf;
+     this.defaultTableSpaceUri = TajoConf.getWarehouseDir((TajoConf) conf).toString();
+     this.clientPool = new HiveCatalogStoreClientPool(CLIENT_POOL_SIZE, conf);
+   }
+ 
+   @Override
+   public boolean existTable(final String databaseName, final String tableName) throws CatalogException {
+     boolean exist = false;
+     org.apache.hadoop.hive.ql.metadata.Table table;
+     HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null;
+ 
+     // get table
+     try {
+       client = clientPool.getClient();
+       table = HiveCatalogUtil.getTable(client.getHiveClient(), databaseName, tableName);
+       if (table != null) {
+         exist = true;
+       }
+     } catch (NoSuchObjectException nsoe) {
+       exist = false;
+     } catch (Exception e) {
+       throw new CatalogException(e);
+     } finally {
+       if (client != null) {
+         client.release();
+       }
+     }
+ 
+     return exist;
+   }
+ 
+   @Override
+   public final CatalogProtos.TableDescProto getTable(String databaseName, final String tableName) throws CatalogException {
+     org.apache.hadoop.hive.ql.metadata.Table table = null;
+     HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null;
+     Path path = null;
+     CatalogProtos.StoreType storeType = null;
+     org.apache.tajo.catalog.Schema schema = null;
+     KeyValueSet options = null;
+     TableStats stats = null;
+     PartitionMethodDesc partitions = null;
+ 
+     //////////////////////////////////
+     // set tajo table schema.
+     //////////////////////////////////
+     try {
+       // get hive table schema
+       try {
+         client = clientPool.getClient();
+         table = HiveCatalogUtil.getTable(client.getHiveClient(), databaseName, tableName);
+         path = table.getPath();
+       } catch (NoSuchObjectException nsoe) {
+         throw new CatalogException("Table not found. - tableName:" + tableName, nsoe);
+       } catch (Exception e) {
+         throw new CatalogException(e);
+       }
+ 
+       // convert HiveCatalogStore field schema into tajo field schema.
+       schema = new org.apache.tajo.catalog.Schema();
+ 
+       List<FieldSchema> fieldSchemaList = table.getCols();
+       boolean isPartitionKey = false;
+       for (FieldSchema eachField : fieldSchemaList) {
+         isPartitionKey = false;
+ 
+         if (table.getPartitionKeys() != null) {
+           for (FieldSchema partitionKey : table.getPartitionKeys()) {
+             if (partitionKey.getName().equals(eachField.getName())) {
+               isPartitionKey = true;
+             }
+           }
+         }
+ 
+         if (!isPartitionKey) {
+           String fieldName = databaseName + CatalogConstants.IDENTIFIER_DELIMITER + tableName +
+               CatalogConstants.IDENTIFIER_DELIMITER + eachField.getName();
+           TajoDataTypes.Type dataType = HiveCatalogUtil.getTajoFieldType(eachField.getType().toString());
+           schema.addColumn(fieldName, dataType);
+         }
+       }
+ 
+       // validate field schema.
+       HiveCatalogUtil.validateSchema(table);
+ 
+       stats = new TableStats();
+       options = new KeyValueSet();
+       options.putAll(table.getParameters());
+       options.remove("EXTERNAL");
+ 
+       Properties properties = table.getMetadata();
+       if (properties != null) {
+         // set field delimiter
+         String fieldDelimiter = "", nullFormat = "";
+         if (properties.getProperty(serdeConstants.FIELD_DELIM) != null) {
+           fieldDelimiter = properties.getProperty(serdeConstants.FIELD_DELIM);
+         } else {
+           // if hive table used default row format delimiter, Properties doesn't have it.
+           // So, Tajo must set as follows:
+           fieldDelimiter = "\u0001";
+         }
+ 
+         // set null format
+         if (properties.getProperty(serdeConstants.SERIALIZATION_NULL_FORMAT) != null) {
+           nullFormat = properties.getProperty(serdeConstants.SERIALIZATION_NULL_FORMAT);
+         } else {
+           nullFormat = "\\N";
+         }
+         options.remove(serdeConstants.SERIALIZATION_NULL_FORMAT);
+ 
+         // set file output format
+         String fileOutputformat = properties.getProperty(hive_metastoreConstants.FILE_OUTPUT_FORMAT);
+         storeType = CatalogUtil.getStoreType(HiveCatalogUtil.getStoreType(fileOutputformat));
+ 
+         if (storeType.equals(CatalogProtos.StoreType.TEXTFILE)) {
+           options.set(StorageConstants.TEXT_DELIMITER, StringEscapeUtils.escapeJava(fieldDelimiter));
+           options.set(StorageConstants.TEXT_NULL, StringEscapeUtils.escapeJava(nullFormat));
+         } else if (storeType.equals(CatalogProtos.StoreType.RCFILE)) {
+           options.set(StorageConstants.RCFILE_NULL, StringEscapeUtils.escapeJava(nullFormat));
+           String serde = properties.getProperty(serdeConstants.SERIALIZATION_LIB);
+           if (LazyBinaryColumnarSerDe.class.getName().equals(serde)) {
+             options.set(StorageConstants.RCFILE_SERDE, StorageConstants.DEFAULT_BINARY_SERDE);
+           } else if (ColumnarSerDe.class.getName().equals(serde)) {
+             options.set(StorageConstants.RCFILE_SERDE, StorageConstants.DEFAULT_TEXT_SERDE);
+           }
+         } else if (storeType.equals(CatalogProtos.StoreType.SEQUENCEFILE) ) {
+           options.set(StorageConstants.SEQUENCEFILE_DELIMITER, StringEscapeUtils.escapeJava(fieldDelimiter));
+           options.set(StorageConstants.SEQUENCEFILE_NULL, StringEscapeUtils.escapeJava(nullFormat));
+           String serde = properties.getProperty(serdeConstants.SERIALIZATION_LIB);
+           if (LazyBinarySerDe.class.getName().equals(serde)) {
+             options.set(StorageConstants.SEQUENCEFILE_SERDE, StorageConstants.DEFAULT_BINARY_SERDE);
+           } else if (LazySimpleSerDe.class.getName().equals(serde)) {
+             options.set(StorageConstants.SEQUENCEFILE_SERDE, StorageConstants.DEFAULT_TEXT_SERDE);
+           }
+         }
+ 
+         // set data size
+         long totalSize = 0;
+         if (properties.getProperty("totalSize") != null) {
+           totalSize = Long.parseLong(properties.getProperty("totalSize"));
+         } else {
+           try {
+             FileSystem fs = path.getFileSystem(conf);
+             if (fs.exists(path)) {
+               totalSize = fs.getContentSummary(path).getLength();
+             }
+           } catch (IOException ioe) {
+             throw new CatalogException("Fail to get path. - path:" + path.toString(), ioe);
+           }
+         }
+         stats.setNumBytes(totalSize);
+       }
+ 
+       // set partition keys
+       List<FieldSchema> partitionKeys = table.getPartitionKeys();
+ 
+       if (null != partitionKeys) {
+         org.apache.tajo.catalog.Schema expressionSchema = new org.apache.tajo.catalog.Schema();
+         StringBuilder sb = new StringBuilder();
+         if (partitionKeys.size() > 0) {
+           for (int i = 0; i < partitionKeys.size(); i++) {
+             FieldSchema fieldSchema = partitionKeys.get(i);
+             TajoDataTypes.Type dataType = HiveCatalogUtil.getTajoFieldType(fieldSchema.getType().toString());
+             String fieldName = databaseName + CatalogConstants.IDENTIFIER_DELIMITER + tableName +
+                 CatalogConstants.IDENTIFIER_DELIMITER + fieldSchema.getName();
+             expressionSchema.addColumn(new Column(fieldName, dataType));
+             if (i > 0) {
+               sb.append(",");
+             }
+             sb.append(fieldSchema.getName());
+           }
+           partitions = new PartitionMethodDesc(
+               databaseName,
+               tableName,
+               PartitionType.COLUMN,
+               sb.toString(),
+               expressionSchema);
+         }
+       }
+     } finally {
+       if(client != null) client.release();
+     }
+     TableMeta meta = new TableMeta(storeType, options);
+     TableDesc tableDesc = new TableDesc(databaseName + "." + tableName, schema, meta, path.toUri());
+     if (table.getTableType().equals(TableType.EXTERNAL_TABLE)) {
+       tableDesc.setExternal(true);
+     }
+     if (stats != null) {
+       tableDesc.setStats(stats);
+     }
+     if (partitions != null) {
+       tableDesc.setPartitionMethod(partitions);
+     }
+     return tableDesc.getProto();
+   }
+ 
+ 
+   private TajoDataTypes.Type getDataType(final String typeStr) {
+     try {
+       return Enum.valueOf(TajoDataTypes.Type.class, typeStr);
+     } catch (IllegalArgumentException iae) {
+       LOG.error("Cannot find a matched type against from '" + typeStr + "'");
+       return null;
+     }
+   }
+ 
+   @Override
+   public final List<String> getAllTableNames(String databaseName) throws CatalogException {
+     HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null;
+ 
+     try {
+       client = clientPool.getClient();
+       return client.getHiveClient().getAllTables(databaseName);
+     } catch (TException e) {
+       throw new CatalogException(e);
+     } finally {
+       if(client != null) client.release();
+     }
+   }
+ 
+   @Override
+   public void createTablespace(String spaceName, String spaceUri) throws CatalogException {
+     // SKIP
+   }
+ 
+   @Override
+   public boolean existTablespace(String spaceName) throws CatalogException {
+     // SKIP
+     return spaceName.equals(TajoConstants.DEFAULT_TABLESPACE_NAME);
+   }
+ 
+   @Override
+   public void dropTablespace(String spaceName) throws CatalogException {
+     // SKIP
+   }
+ 
+   @Override
+   public Collection<String> getAllTablespaceNames() throws CatalogException {
+     return Lists.newArrayList(TajoConstants.DEFAULT_TABLESPACE_NAME);
+   }
+ 
+   @Override
+   public TablespaceProto getTablespace(String spaceName) throws CatalogException {
+     if (spaceName.equals(TajoConstants.DEFAULT_TABLESPACE_NAME)) {
+       TablespaceProto.Builder builder = TablespaceProto.newBuilder();
+       builder.setSpaceName(TajoConstants.DEFAULT_TABLESPACE_NAME);
+       builder.setUri(defaultTableSpaceUri);
+       return builder.build();
+     } else {
+       throw new CatalogException("tablespace concept is not supported in HiveCatalogStore");
+     }
+   }
+ 
+   @Override
+   public void updateTableStats(CatalogProtos.UpdateTableStatsProto statsProto) throws
+     CatalogException {
+     // TODO - not implemented yet
+   }
+ 
+   @Override
+   public void alterTablespace(CatalogProtos.AlterTablespaceProto alterProto) throws CatalogException {
+     throw new CatalogException("tablespace concept is not supported in HiveCatalogStore");
+   }
+ 
+   @Override
+   public void createDatabase(String databaseName, String tablespaceName) throws CatalogException {
+     HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null;
+ 
+     try {
+       Database database = new Database(
+           databaseName,
+           "",
+           defaultTableSpaceUri + "/" + databaseName,
+           new HashMap<String, String>());
+       client = clientPool.getClient();
+       client.getHiveClient().createDatabase(database);
+     } catch (AlreadyExistsException e) {
+       throw new AlreadyExistsDatabaseException(databaseName);
+     } catch (Throwable t) {
+       throw new CatalogException(t);
+     } finally {
+       if (client != null) {
+         client.release();
+       }
+     }
+   }
+ 
+   @Override
+   public boolean existDatabase(String databaseName) throws CatalogException {
+     HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null;
+ 
+     try {
+       client = clientPool.getClient();
+       List<String> databaseNames = client.getHiveClient().getAllDatabases();
+       return databaseNames.contains(databaseName);
+     } catch (Throwable t) {
+       throw new CatalogException(t);
+     } finally {
+       if (client != null) {
+         client.release();
+       }
+     }
+   }
+ 
+   @Override
+   public void dropDatabase(String databaseName) throws CatalogException {
+     HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null;
+ 
+     try {
+       client = clientPool.getClient();
+       client.getHiveClient().dropDatabase(databaseName);
+     } catch (NoSuchObjectException e) {
+       throw new NoSuchDatabaseException(databaseName);
+     } catch (Throwable t) {
+       throw new CatalogException(databaseName);
+     } finally {
+       if (client != null) {
+         client.release();
+       }
+     }
+   }
+ 
+   @Override
+   public Collection<String> getAllDatabaseNames() throws CatalogException {
+     HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null;
+ 
+     try {
+       client = clientPool.getClient();
+       return client.getHiveClient().getAllDatabases();
+     } catch (TException e) {
+       throw new CatalogException(e);
+     } finally {
+       if (client != null) {
+         client.release();
+       }
+     }
+   }
+ 
+   @Override
+   public final void createTable(final CatalogProtos.TableDescProto tableDescProto) throws CatalogException {
+     HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null;
+ 
+     TableDesc tableDesc = new TableDesc(tableDescProto);
+     String[] splitted = CatalogUtil.splitFQTableName(tableDesc.getName());
+     String databaseName = splitted[0];
+     String tableName = splitted[1];
+ 
+     try {
+       client = clientPool.getClient();
+ 
+       org.apache.hadoop.hive.metastore.api.Table table = new org.apache.hadoop.hive.metastore.api.Table();
+       table.setDbName(databaseName);
+       table.setTableName(tableName);
+       table.setParameters(new HashMap<String, String>(tableDesc.getMeta().getOptions().getAllKeyValus()));
+       // TODO: set owner
+       //table.setOwner();
+ 
+       StorageDescriptor sd = new StorageDescriptor();
+       sd.setSerdeInfo(new SerDeInfo());
+       sd.getSerdeInfo().setParameters(new HashMap<String, String>());
+       sd.getSerdeInfo().setName(table.getTableName());
+ 
+       // if tajo set location method, thrift client make exception as follows:
+       // Caused by: MetaException(message:java.lang.NullPointerException)
+       // If you want to modify table path, you have to modify on Hive cli.
+       if (tableDesc.isExternal()) {
+         table.setTableType(TableType.EXTERNAL_TABLE.name());
+         table.putToParameters("EXTERNAL", "TRUE");
+ 
+         Path tablePath = new Path(tableDesc.getPath());
+         FileSystem fs = tablePath.getFileSystem(conf);
+         if (fs.isFile(tablePath)) {
+           LOG.warn("A table path is a file, but HiveCatalogStore does not allow a file path.");
+           sd.setLocation(tablePath.getParent().toString());
+         } else {
+           sd.setLocation(tablePath.toString());
+         }
+       }
+ 
+       // set column information
+       List<Column> columns = tableDesc.getSchema().getColumns();
+       ArrayList<FieldSchema> cols = new ArrayList<FieldSchema>(columns.size());
+ 
+       for (Column eachField : columns) {
+         cols.add(new FieldSchema(eachField.getSimpleName(),
+             HiveCatalogUtil.getHiveFieldType(eachField.getDataType()), ""));
+       }
+       sd.setCols(cols);
+ 
+       // set partition keys
+       if (tableDesc.hasPartition() && tableDesc.getPartitionMethod().getPartitionType().equals(PartitionType.COLUMN)) {
+         List<FieldSchema> partitionKeys = new ArrayList<FieldSchema>();
+         for (Column eachPartitionKey : tableDesc.getPartitionMethod().getExpressionSchema().getColumns()) {
+           partitionKeys.add(new FieldSchema(eachPartitionKey.getSimpleName(),
+               HiveCatalogUtil.getHiveFieldType(eachPartitionKey.getDataType()), ""));
+         }
+         table.setPartitionKeys(partitionKeys);
+       }
+ 
+       if (tableDesc.getMeta().getStoreType().equals(CatalogProtos.StoreType.RCFILE)) {
+         String serde = tableDesc.getMeta().getOption(StorageConstants.RCFILE_SERDE);
+         sd.setInputFormat(org.apache.hadoop.hive.ql.io.RCFileInputFormat.class.getName());
+         sd.setOutputFormat(org.apache.hadoop.hive.ql.io.RCFileOutputFormat.class.getName());
+         if (StorageConstants.DEFAULT_TEXT_SERDE.equals(serde)) {
+           sd.getSerdeInfo().setSerializationLib(org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe.class.getName());
+         } else {
+           sd.getSerdeInfo().setSerializationLib(
+               org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe.class.getName());
+         }
+ 
+         if (tableDesc.getMeta().getOptions().containsKey(StorageConstants.RCFILE_NULL)) {
+           table.putToParameters(serdeConstants.SERIALIZATION_NULL_FORMAT,
+               StringEscapeUtils.unescapeJava(tableDesc.getMeta().getOption(StorageConstants.RCFILE_NULL)));
+         }
+       } else if (tableDesc.getMeta().getStoreType().equals(CatalogProtos.StoreType.CSV)
+           || tableDesc.getMeta().getStoreType().equals(CatalogProtos.StoreType.TEXTFILE)) {
+         sd.getSerdeInfo().setSerializationLib(org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.class.getName());
+         sd.setInputFormat(org.apache.hadoop.mapred.TextInputFormat.class.getName());
+         sd.setOutputFormat(org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat.class.getName());
+ 
+         String fieldDelimiter = tableDesc.getMeta().getOption(StorageConstants.TEXT_DELIMITER,
+             StorageConstants.DEFAULT_FIELD_DELIMITER);
+ 
+         // User can use an unicode for filed delimiter such as \u0001, \001.
+         // In this case, java console will convert this value into "\\u001".
+         // And hive will un-espace this value again.
+         // As a result, user can use right field delimiter.
+         // So, we have to un-escape this value.
+         sd.getSerdeInfo().putToParameters(serdeConstants.SERIALIZATION_FORMAT,
+             StringEscapeUtils.unescapeJava(fieldDelimiter));
+         sd.getSerdeInfo().putToParameters(serdeConstants.FIELD_DELIM,
+             StringEscapeUtils.unescapeJava(fieldDelimiter));
+         table.getParameters().remove(StorageConstants.TEXT_DELIMITER);
+ 
+         if (tableDesc.getMeta().containsOption(StorageConstants.TEXT_NULL)) {
+           table.putToParameters(serdeConstants.SERIALIZATION_NULL_FORMAT,
+               StringEscapeUtils.unescapeJava(tableDesc.getMeta().getOption(StorageConstants.TEXT_NULL)));
+           table.getParameters().remove(StorageConstants.TEXT_NULL);
+         }
+       } else if (tableDesc.getMeta().getStoreType().equals(CatalogProtos.StoreType.SEQUENCEFILE)) {
+         String serde = tableDesc.getMeta().getOption(StorageConstants.SEQUENCEFILE_SERDE);
+         sd.setInputFormat(org.apache.hadoop.mapred.SequenceFileInputFormat.class.getName());
+         sd.setOutputFormat(org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat.class.getName());
+ 
+         if (StorageConstants.DEFAULT_TEXT_SERDE.equals(serde)) {
+           sd.getSerdeInfo().setSerializationLib(org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.class.getName());
+ 
+           String fieldDelimiter = tableDesc.getMeta().getOption(StorageConstants.SEQUENCEFILE_DELIMITER,
+               StorageConstants.DEFAULT_FIELD_DELIMITER);
+ 
+           // User can use an unicode for filed delimiter such as \u0001, \001.
+           // In this case, java console will convert this value into "\\u001".
+           // And hive will un-espace this value again.
+           // As a result, user can use right field delimiter.
+           // So, we have to un-escape this value.
+           sd.getSerdeInfo().putToParameters(serdeConstants.SERIALIZATION_FORMAT,
+               StringEscapeUtils.unescapeJava(fieldDelimiter));
+           sd.getSerdeInfo().putToParameters(serdeConstants.FIELD_DELIM,
+               StringEscapeUtils.unescapeJava(fieldDelimiter));
+           table.getParameters().remove(StorageConstants.SEQUENCEFILE_DELIMITER);
+         } else {
+           sd.getSerdeInfo().setSerializationLib(org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe.class.getName());
+         }
+ 
+         if (tableDesc.getMeta().containsOption(StorageConstants.SEQUENCEFILE_NULL)) {
+           table.putToParameters(serdeConstants.SERIALIZATION_NULL_FORMAT,
+               StringEscapeUtils.unescapeJava(tableDesc.getMeta().getOption(StorageConstants.SEQUENCEFILE_NULL)));
+           table.getParameters().remove(StorageConstants.SEQUENCEFILE_NULL);
+         }
+       } else {
+         if (tableDesc.getMeta().getStoreType().equals(CatalogProtos.StoreType.PARQUET)) {
+           sd.setInputFormat(parquet.hive.DeprecatedParquetInputFormat.class.getName());
+           sd.setOutputFormat(parquet.hive.DeprecatedParquetOutputFormat.class.getName());
+           sd.getSerdeInfo().setSerializationLib(parquet.hive.serde.ParquetHiveSerDe.class.getName());
+         } else {
+           throw new CatalogException(new NotImplementedException(tableDesc.getMeta().getStoreType
+               ().name()));
+         }
+       }
+ 
+       sd.setSortCols(new ArrayList<Order>());
+ 
+       table.setSd(sd);
+       client.getHiveClient().createTable(table);
+     } catch (RuntimeException e) {
+       throw e;
+     } catch (Exception e) {
+       throw new CatalogException(e);
+     } finally {
+       if(client != null) client.release();
+     }
+   }
+ 
+   @Override
+   public final void dropTable(String databaseName, final String tableName) throws CatalogException {
+     HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null;
+ 
+     try {
+       client = clientPool.getClient();
+       client.getHiveClient().dropTable(databaseName, tableName, false, false);
+     } catch (NoSuchObjectException nsoe) {
+     } catch (Exception e) {
+       throw new CatalogException(e);
+     } finally {
+       if (client != null) {
+         client.release();
+       }
+     }
+   }
+ 
+ 
+   @Override
+   public void alterTable(final CatalogProtos.AlterTableDescProto alterTableDescProto) throws CatalogException {
+     final String[] split = CatalogUtil.splitFQTableName(alterTableDescProto.getTableName());
+ 
+     if (split.length == 1) {
+       throw new IllegalArgumentException("alterTable() requires a qualified table name, but it is \""
+           + alterTableDescProto.getTableName() + "\".");
+     }
+ 
+     final String databaseName = split[0];
+     final String tableName = split[1];
+     String partitionName = null;
+     CatalogProtos.PartitionDescProto partitionDesc = null;
+ 
+     switch (alterTableDescProto.getAlterTableType()) {
+       case RENAME_TABLE:
+         if (existTable(databaseName,alterTableDescProto.getNewTableName().toLowerCase())) {
+           throw new AlreadyExistsTableException(alterTableDescProto.getNewTableName());
+         }
+         renameTable(databaseName, tableName, alterTableDescProto.getNewTableName().toLowerCase());
+         break;
+       case RENAME_COLUMN:
+         if (existColumn(databaseName,tableName, alterTableDescProto.getAlterColumnName().getNewColumnName())) {
+           throw new ColumnNameAlreadyExistException(alterTableDescProto.getAlterColumnName().getNewColumnName());
+         }
+         renameColumn(databaseName, tableName, alterTableDescProto.getAlterColumnName());
+         break;
+       case ADD_COLUMN:
+         if (existColumn(databaseName,tableName, alterTableDescProto.getAddColumn().getName())) {
+           throw new ColumnNameAlreadyExistException(alterTableDescProto.getAddColumn().getName());
+         }
+         addNewColumn(databaseName, tableName, alterTableDescProto.getAddColumn());
+         break;
+       case ADD_PARTITION:
+         partitionName = alterTableDescProto.getPartitionDesc().getPartitionName();
+         partitionDesc = getPartition(databaseName, tableName, partitionName);
+         if(partitionDesc != null) {
+           throw new AlreadyExistsPartitionException(databaseName, tableName, partitionName);
+         }
+         addPartition(databaseName, tableName, alterTableDescProto.getPartitionDesc());
+         break;
+       case DROP_PARTITION:
+         partitionName = alterTableDescProto.getPartitionDesc().getPartitionName();
+         partitionDesc = getPartition(databaseName, tableName, partitionName);
+         if(partitionDesc == null) {
+           throw new NoSuchPartitionException(databaseName, tableName, partitionName);
+         }
+         dropPartition(databaseName, tableName, partitionDesc);
+         break;
+       case SET_PROPERTY:
+         // TODO - not implemented yet
+         break;
+       default:
+         //TODO
+     }
+   }
+ 
+ 
+   private void renameTable(String databaseName, String tableName, String newTableName) {
+     HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null;
+     try {
+       client = clientPool.getClient();
+       Table newTable = client.getHiveClient().getTable(databaseName, tableName);
+       newTable.setTableName(newTableName);
+       client.getHiveClient().alter_table(databaseName, tableName, newTable);
+ 
+     } catch (NoSuchObjectException nsoe) {
+     } catch (Exception e) {
+       throw new CatalogException(e);
+     } finally {
+       if (client != null) {
+         client.release();
+       }
+     }
+   }
+ 
+   private void renameColumn(String databaseName, String tableName, CatalogProtos.AlterColumnProto alterColumnProto) {
+     HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null;
+     try {
+ 
+       client = clientPool.getClient();
+       Table table = client.getHiveClient().getTable(databaseName, tableName);
+       List<FieldSchema> columns = table.getSd().getCols();
+ 
+       for (final FieldSchema currentColumn : columns) {
+         if (currentColumn.getName().equalsIgnoreCase(alterColumnProto.getOldColumnName())) {
+           currentColumn.setName(alterColumnProto.getNewColumnName());
+         }
+       }
+       client.getHiveClient().alter_table(databaseName, tableName, table);
+ 
+     } catch (NoSuchObjectException nsoe) {
+     } catch (Exception e) {
+       throw new CatalogException(e);
+     } finally {
+       if (client != null) {
+         client.release();
+       }
+     }
+   }
+ 
+ 
+   private void addNewColumn(String databaseName, String tableName, CatalogProtos.ColumnProto columnProto) {
+     HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null;
+     try {
+ 
+       client = clientPool.getClient();
+       Table table = client.getHiveClient().getTable(databaseName, tableName);
+       List<FieldSchema> columns = table.getSd().getCols();
+       columns.add(new FieldSchema(columnProto.getName(),
+           HiveCatalogUtil.getHiveFieldType(columnProto.getDataType()), ""));
+       client.getHiveClient().alter_table(databaseName, tableName, table);
+ 
+ 
+     } catch (NoSuchObjectException nsoe) {
+     } catch (Exception e) {
+       throw new CatalogException(e);
+     } finally {
+       if (client != null) {
+         client.release();
+       }
+     }
+   }
+ 
+   private void addPartition(String databaseName, String tableName, CatalogProtos.PartitionDescProto
+     partitionDescProto) {
+     HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null;
+     try {
+ 
+       client = clientPool.getClient();
+ 
+       Partition partition = new Partition();
+       partition.setDbName(databaseName);
+       partition.setTableName(tableName);
+ 
+       List<String> values = Lists.newArrayList();
+       for(CatalogProtos.PartitionKeyProto keyProto : partitionDescProto.getPartitionKeysList()) {
+         values.add(keyProto.getPartitionValue());
+       }
+       partition.setValues(values);
+ 
+       Table table = client.getHiveClient().getTable(databaseName, tableName);
+       StorageDescriptor sd = table.getSd();
+       sd.setLocation(partitionDescProto.getPath());
+       partition.setSd(sd);
+ 
+       client.getHiveClient().add_partition(partition);
+     } catch (Exception e) {
+       throw new CatalogException(e);
+     } finally {
+       if (client != null) {
+         client.release();
+       }
+     }
+   }
+ 
+   private void dropPartition(String databaseName, String tableName, CatalogProtos.PartitionDescProto
+     partitionDescProto) {
+     HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null;
+     try {
+ 
+       client = clientPool.getClient();
+ 
+       List<String> values = Lists.newArrayList();
+       for(CatalogProtos.PartitionKeyProto keyProto : partitionDescProto.getPartitionKeysList()) {
+         values.add(keyProto.getPartitionValue());
+       }
+       client.getHiveClient().dropPartition(databaseName, tableName, values, true);
+     } catch (Exception e) {
+       throw new CatalogException(e);
+     } finally {
+       if (client != null) {
+         client.release();
+       }
+     }
+   }
+ 
+   @Override
+   public void addPartitionMethod(CatalogProtos.PartitionMethodProto partitionMethodProto) throws CatalogException {
+     // TODO - not implemented yet
++    throw new UnsupportedOperationException();
+   }
+ 
+   @Override
+   public CatalogProtos.PartitionMethodProto getPartitionMethod(String databaseName, String tableName)
+       throws CatalogException {
 -    return null;  // TODO - not implemented yet
++    // TODO - not implemented yet
++    throw new UnsupportedOperationException();
+   }
+ 
+   @Override
+   public boolean existPartitionMethod(String databaseName, String tableName) throws CatalogException {
 -    return false;  // TODO - not implemented yet
++    // TODO - not implemented yet
++    throw new UnsupportedOperationException();
+   }
+ 
+   @Override
+   public void dropPartitionMethod(String databaseName, String tableName) throws CatalogException {
+     // TODO - not implemented yet
++    throw new UnsupportedOperationException();
+   }
+ 
+   @Override
+   public List<CatalogProtos.PartitionDescProto> getPartitions(String databaseName,
+                                                          String tableName) throws CatalogException {
+     throw new UnsupportedOperationException();
+   }
+ 
+ 
+   @Override
+   public CatalogProtos.PartitionDescProto getPartition(String databaseName, String tableName,
+                                                        String partitionName) throws CatalogException {
+     HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null;
+     CatalogProtos.PartitionDescProto.Builder builder = null;
+ 
+     try {
+       client = clientPool.getClient();
+ 
+       Partition partition = client.getHiveClient().getPartition(databaseName, tableName, partitionName);
+       builder = CatalogProtos.PartitionDescProto.newBuilder();
+       builder.setPartitionName(partitionName);
+       builder.setPath(partition.getSd().getLocation());
+ 
+       String[] partitionNames = partitionName.split("/");
+ 
+       for (int i = 0; i < partition.getValues().size(); i++) {
+         String value = partition.getValues().get(i);
+         CatalogProtos.PartitionKeyProto.Builder keyBuilder = CatalogProtos.PartitionKeyProto.newBuilder();
+ 
+         String columnName = partitionNames[i].split("=")[0];
+         keyBuilder.setColumnName(columnName);
+         keyBuilder.setPartitionValue(value);
+         builder.addPartitionKeys(keyBuilder);
+       }
+     } catch (NoSuchObjectException e) {
+       return null;
+     } catch (Exception e) {
+       throw new CatalogException(e);
+     } finally {
+       if (client != null) {
+         client.release();
+       }
+     }
+     return builder.build();
+   }
+ 
+   @Override
+   public final void addFunction(final FunctionDesc func) throws CatalogException {
+     // TODO - not implemented yet
++    throw new UnsupportedOperationException();
+   }
+ 
+   @Override
+   public final void deleteFunction(final FunctionDesc func) throws CatalogException {
+     // TODO - not implemented yet
++    throw new UnsupportedOperationException();
+   }
+ 
+   @Override
+   public final void existFunction(final FunctionDesc func) throws CatalogException {
+     // TODO - not implemented yet
++    throw new UnsupportedOperationException();
+   }
+ 
+   @Override
+   public final List<String> getAllFunctionNames() throws CatalogException {
+     // TODO - not implemented yet
 -    return null;
++    throw new UnsupportedOperationException();
+   }
+ 
+   @Override
 -  public void dropIndex(String databaseName, String indexName) throws CatalogException {
++  public void createIndex(CatalogProtos.IndexDescProto proto) throws CatalogException {
+     // TODO - not implemented yet
++    throw new UnsupportedOperationException();
+   }
+ 
+   @Override
 -  public boolean existIndexByName(String databaseName, String indexName) throws CatalogException {
++  public void dropIndex(String databaseName, String indexName) throws CatalogException {
+     // TODO - not implemented yet
 -    return false;
++    throw new UnsupportedOperationException();
+   }
+ 
+   @Override
 -  public CatalogProtos.IndexDescProto[] getIndexes(String databaseName, String tableName) throws CatalogException {
++  public CatalogProtos.IndexDescProto getIndexByName(String databaseName, String indexName) throws CatalogException {
+     // TODO - not implemented yet
 -    return null;
++    throw new UnsupportedOperationException();
+   }
+ 
+   @Override
 -  public void createIndex(CatalogProtos.IndexDescProto proto) throws CatalogException {
++  public CatalogProtos.IndexDescProto getIndexByColumns(String databaseName, String tableName, String[] columnNames)
++      throws CatalogException {
+     // TODO - not implemented yet
++    throw new UnsupportedOperationException();
+   }
+ 
+   @Override
 -  public CatalogProtos.IndexDescProto getIndexByName(String databaseName, String indexName) throws CatalogException {
++  public boolean existIndexByName(String databaseName, String indexName) throws CatalogException {
+     // TODO - not implemented yet
 -    return null;
++    throw new UnsupportedOperationException();
+   }
+ 
+   @Override
 -  public CatalogProtos.IndexDescProto getIndexByColumn(String databaseName, String tableName, String columnName)
++  public boolean existIndexByColumns(String databaseName, String tableName, String[] columnNames)
+       throws CatalogException {
+     // TODO - not implemented yet
 -    return null;
++    throw new UnsupportedOperationException();
+   }
+ 
+   @Override
 -  public boolean existIndexByColumn(String databaseName, String tableName, String columnName) throws CatalogException {
++  public List<String> getAllIndexNamesByTable(String databaseName, String tableName) throws CatalogException {
+     // TODO - not implemented yet
 -    return false;
++    throw new UnsupportedOperationException();
++  }
++
++  @Override
++  public boolean existIndexesByTable(String databaseName, String tableName) throws CatalogException {
++    // TODO - not implemented yet
++    throw new UnsupportedOperationException();
+   }
+ 
+   @Override
+   public final void close() {
+     clientPool.close();
+   }
+ 
+   private boolean existColumn(final String databaseName ,final String tableName , final String columnName) throws CatalogException {
+     boolean exist = false;
+     HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null;
+ 
+     try {
+ 
+       client = clientPool.getClient();
+       Table table = client.getHiveClient().getTable(databaseName, tableName);
+       List<FieldSchema> columns = table.getSd().getCols();
+ 
+       for (final FieldSchema currentColumn : columns) {
+         if (currentColumn.getName().equalsIgnoreCase(columnName)) {
+           exist = true;
+         }
+       }
+       client.getHiveClient().alter_table(databaseName, tableName, table);
+ 
+     } catch (NoSuchObjectException nsoe) {
+     } catch (Exception e) {
+       throw new CatalogException(e);
+     } finally {
+       if (client != null) {
+         client.release();
+       }
+     }
+ 
+     return exist;
+   }
+ 
+   @Override
+   public List<ColumnProto> getAllColumns() throws CatalogException {
+     throw new UnsupportedOperationException();
+   }
+ 
+   @Override
+   public List<DatabaseProto> getAllDatabases() throws CatalogException {
+     throw new UnsupportedOperationException();
+   }
+ 
+   @Override
 -  public List<IndexProto> getAllIndexes() throws CatalogException {
++  public List<IndexDescProto> getAllIndexes() throws CatalogException {
+     throw new UnsupportedOperationException();
+   }
+ 
+   @Override
+   public List<TablePartitionProto> getAllPartitions() throws CatalogException {
+     throw new UnsupportedOperationException();
+   }
+ 
+   @Override
+   public List<TableOptionProto> getAllTableOptions() throws CatalogException {
+     throw new UnsupportedOperationException();
+   }
+ 
+   @Override
+   public List<TableStatsProto> getAllTableStats() throws CatalogException {
+     throw new UnsupportedOperationException();
+   }
+ 
+   @Override
+   public List<TableDescriptorProto> getAllTables() throws CatalogException {
+     throw new UnsupportedOperationException();
+   }
+ 
+   @Override
+   public List<TablespaceProto> getTablespaces() throws CatalogException {
+     throw new UnsupportedOperationException();
+   }
+ }

http://git-wip-us.apache.org/repos/asf/tajo/blob/882297e7/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/882297e7/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/882297e7/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java
----------------------------------------------------------------------
diff --cc tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java
index 90a51b6,9d0e427..5fa1c67
--- a/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java
@@@ -259,124 -260,6 +259,124 @@@ public class CatalogAdminClientImpl imp
    }
  
    @Override
 +  public IndexDescProto getIndex(final String indexName) throws ServiceException {
-     return new ServerCallable<IndexDescProto>(connection.connPool,
-         connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
++    return new ServerCallable<IndexDescProto>(connection.manager,
++        connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
 +
 +      @Override
 +      public IndexDescProto call(NettyClientBase client) throws Exception {
 +        BlockingInterface tajoMasterService = client.getStub();
 +        return tajoMasterService.getIndexWithName(null,
 +            connection.convertSessionedString(indexName));
 +      }
 +    }.withRetries();
 +  }
 +
 +  @Override
 +  public boolean existIndex(final String indexName) throws ServiceException {
-     return new ServerCallable<Boolean>(connection.connPool,
-         connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
++    return new ServerCallable<Boolean>(connection.manager,
++        connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
 +
 +      @Override
 +      public Boolean call(NettyClientBase client) throws Exception {
 +        BlockingInterface tajoMasterService = client.getStub();
 +        return tajoMasterService.existIndexWithName(null,
 +            connection.convertSessionedString(indexName)).getValue();
 +      }
 +    }.withRetries();
 +  }
 +
 +  @Override
 +  public List<IndexDescProto> getIndexes(final String tableName) throws ServiceException {
-     return new ServerCallable<List<IndexDescProto>>(connection.connPool,
-         connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
++    return new ServerCallable<List<IndexDescProto>>(connection.manager,
++        connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
 +
 +      @Override
 +      public List<IndexDescProto> call(NettyClientBase client) throws Exception {
 +        BlockingInterface tajoMasterService = client.getStub();
 +        GetIndexesResponse response = tajoMasterService.getIndexesForTable(null,
 +            connection.convertSessionedString(tableName));
 +        if (response.getResult().getResultCode() == ResultCode.OK) {
 +          return response.getIndexesList();
 +        } else {
 +          throw new SQLException(response.getResult().getErrorMessage());
 +        }
 +      }
 +    }.withRetries();
 +  }
 +
 +  @Override
 +  public boolean hasIndexes(final String tableName) throws ServiceException {
-     return new ServerCallable<Boolean>(connection.connPool,
-         connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
++    return new ServerCallable<Boolean>(connection.manager,
++        connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
 +
 +      @Override
 +      public Boolean call(NettyClientBase client) throws Exception {
 +        BlockingInterface tajoMasterService = client.getStub();
 +        return tajoMasterService.existIndexesForTable(null,
 +            connection.convertSessionedString(tableName)).getValue();
 +      }
 +    }.withRetries();
 +  }
 +
 +  @Override
 +  public IndexDescProto getIndex(final String tableName, final String[] columnNames) throws ServiceException {
-     return new ServerCallable<IndexDescProto>(connection.connPool,
-         connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
++    return new ServerCallable<IndexDescProto>(connection.manager,
++        connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
 +
 +      @Override
 +      public IndexDescProto call(NettyClientBase client) throws Exception {
 +        BlockingInterface tajoMasterService = client.getStub();
 +        GetIndexWithColumnsRequest.Builder builder = GetIndexWithColumnsRequest.newBuilder();
 +        builder.setSessionId(connection.sessionId);
 +        builder.setTableName(tableName);
 +        for (String eachColumnName : columnNames) {
 +          builder.addColumnNames(eachColumnName);
 +        }
 +        GetIndexWithColumnsResponse response = tajoMasterService.getIndexWithColumns(null, builder.build());
 +        if (response.getResult().getResultCode() == ResultCode.OK) {
 +          return response.getIndexDesc();
 +        } else {
 +          throw new SQLException(response.getResult().getErrorMessage());
 +        }
 +      }
 +    }.withRetries();
 +  }
 +
 +  @Override
 +  public boolean existIndex(final String tableName, final String[] columnName) throws ServiceException {
-     return new ServerCallable<Boolean>(connection.connPool,
-         connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
++    return new ServerCallable<Boolean>(connection.manager,
++        connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
 +
 +      @Override
 +      public Boolean call(NettyClientBase client) throws Exception {
 +        BlockingInterface tajoMasterService = client.getStub();
 +        GetIndexWithColumnsRequest.Builder builder = GetIndexWithColumnsRequest.newBuilder();
 +        builder.setSessionId(connection.sessionId);
 +        builder.setTableName(tableName);
 +        for (String eachColumnName : columnName) {
 +          builder.addColumnNames(eachColumnName);
 +        }
 +        return tajoMasterService.existIndexWithColumns(null, builder.build()).getValue();
 +      }
 +    }.withRetries();
 +  }
 +
 +  @Override
 +  public boolean dropIndex(final String indexName) throws ServiceException {
-     return new ServerCallable<Boolean>(connection.connPool,
-         connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
++    return new ServerCallable<Boolean>(connection.manager,
++        connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
 +
 +      @Override
 +      public Boolean call(NettyClientBase client) throws Exception {
 +        BlockingInterface tajoMasterService = client.getStub();
 +        return tajoMasterService.dropIndex(null,
 +            connection.convertSessionedString(indexName)).getValue();
 +      }
 +    }.withRetries();
 +  }
 +
 +  @Override
    public void close() throws IOException {
    }
  }

http://git-wip-us.apache.org/repos/asf/tajo/blob/882297e7/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/882297e7/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/882297e7/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/882297e7/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/882297e7/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/882297e7/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/882297e7/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
----------------------------------------------------------------------
diff --cc tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
index 02ba3cd,ad1a8e3..5f30fdd
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
@@@ -50,8 -47,11 +50,12 @@@ import org.apache.tajo.master.*
  import org.apache.tajo.master.exec.prehook.CreateTableHook;
  import org.apache.tajo.master.exec.prehook.DistributedQueryHookManager;
  import org.apache.tajo.master.exec.prehook.InsertIntoHook;
+ import org.apache.tajo.plan.expr.EvalContext;
+ import org.apache.tajo.plan.expr.GeneralFunctionEval;
+ import org.apache.tajo.plan.function.python.PythonScriptEngine;
+ import org.apache.tajo.plan.function.python.TajoScriptEngine;
  import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
 +import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRuleContext;
  import org.apache.tajo.querymaster.*;
  import org.apache.tajo.session.Session;
  import org.apache.tajo.plan.LogicalPlan;
@@@ -272,40 -262,69 +274,88 @@@ public class QueryExecutor 
      response.setQueryId(queryInfo.getQueryId().getProto());
      response.setMaxRowNum(maxRow);
      response.setTableDesc(desc.getProto());
 -    response.setResultCode(ClientProtos.ResultCode.OK);
 +    response.setResult(IPCUtil.buildOkRequestResult());
    }
  
-   public void execNonFromQuery(QueryContext queryContext, Session session, String query,
-                                LogicalPlan plan, SubmitQueryResponse.Builder responseBuilder) throws Exception {
+   public void execNonFromQuery(QueryContext queryContext, LogicalPlan plan, SubmitQueryResponse.Builder responseBuilder)
+       throws Exception {
      LogicalRootNode rootNode = plan.getRootBlock().getRoot();
  
+     EvalContext evalContext = new EvalContext();
      Target[] targets = plan.getRootBlock().getRawTargets();
      if (targets == null) {
        throw new PlanningException("No targets");
      }
-     final Tuple outTuple = new VTuple(targets.length);
+     try {
+       // start script executor
+       startScriptExecutors(queryContext, evalContext, targets);
+       final Tuple outTuple = new VTuple(targets.length);
+       for (int i = 0; i < targets.length; i++) {
+         EvalNode eval = targets[i].getEvalTree();
+         eval.bind(evalContext, null);
+         outTuple.put(i, eval.eval(null));
+       }
+       boolean isInsert = rootNode.getChild() != null && rootNode.getChild().getType() == NodeType.INSERT;
+       if (isInsert) {
+         InsertNode insertNode = rootNode.getChild();
+         insertNonFromQuery(queryContext, insertNode, responseBuilder);
+       } else {
+         Schema schema = PlannerUtil.targetToSchema(targets);
+         RowStoreUtil.RowStoreEncoder encoder = RowStoreUtil.createEncoder(schema);
+         byte[] serializedBytes = encoder.toBytes(outTuple);
+         ClientProtos.SerializedResultSet.Builder serializedResBuilder = ClientProtos.SerializedResultSet.newBuilder();
+         serializedResBuilder.addSerializedTuples(ByteString.copyFrom(serializedBytes));
+         serializedResBuilder.setSchema(schema.getProto());
+         serializedResBuilder.setBytesNum(serializedBytes.length);
+ 
+         responseBuilder.setResultSet(serializedResBuilder);
+         responseBuilder.setMaxRowNum(1);
+         responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
 -        responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
++        responseBuilder.setResult(IPCUtil.buildOkRequestResult());
+       }
+     } finally {
+       // stop script executor
+       stopScriptExecutors(evalContext);
+     }
+   }
+ 
+   public static void startScriptExecutors(QueryContext queryContext, EvalContext evalContext, Target[] targets)
+       throws IOException {
      for (int i = 0; i < targets.length; i++) {
        EvalNode eval = targets[i].getEvalTree();
-       eval.bind(null);
-       outTuple.put(i, eval.eval(null));
+       if (eval instanceof GeneralFunctionEval) {
+         GeneralFunctionEval functionEval = (GeneralFunctionEval) eval;
+         if (functionEval.getFuncDesc().getInvocation().hasPython()) {
+           TajoScriptEngine scriptExecutor = new PythonScriptEngine(functionEval.getFuncDesc());
+           evalContext.addScriptEngine(eval, scriptExecutor);
+           scriptExecutor.start(queryContext.getConf());
+         }
+       }
      }
-     boolean isInsert = rootNode.getChild() != null && rootNode.getChild().getType() == NodeType.INSERT;
-     if (isInsert) {
-       InsertNode insertNode = rootNode.getChild();
-       insertNonFromQuery(queryContext, insertNode, responseBuilder);
-     } else {
-       Schema schema = PlannerUtil.targetToSchema(targets);
-       RowStoreUtil.RowStoreEncoder encoder = RowStoreUtil.createEncoder(schema);
-       byte[] serializedBytes = encoder.toBytes(outTuple);
-       ClientProtos.SerializedResultSet.Builder serializedResBuilder = ClientProtos.SerializedResultSet.newBuilder();
-       serializedResBuilder.addSerializedTuples(ByteString.copyFrom(serializedBytes));
-       serializedResBuilder.setSchema(schema.getProto());
-       serializedResBuilder.setBytesNum(serializedBytes.length);
- 
-       responseBuilder.setResultSet(serializedResBuilder);
-       responseBuilder.setMaxRowNum(1);
-       responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
-       responseBuilder.setResult(IPCUtil.buildOkRequestResult());
++//<<<<<<< HEAD
++//    boolean isInsert = rootNode.getChild() != null && rootNode.getChild().getType() == NodeType.INSERT;
++//    if (isInsert) {
++//      InsertNode insertNode = rootNode.getChild();
++//      insertNonFromQuery(queryContext, insertNode, responseBuilder);
++//    } else {
++//      Schema schema = PlannerUtil.targetToSchema(targets);
++//      RowStoreUtil.RowStoreEncoder encoder = RowStoreUtil.createEncoder(schema);
++//      byte[] serializedBytes = encoder.toBytes(outTuple);
++//      ClientProtos.SerializedResultSet.Builder serializedResBuilder = ClientProtos.SerializedResultSet.newBuilder();
++//      serializedResBuilder.addSerializedTuples(ByteString.copyFrom(serializedBytes));
++//      serializedResBuilder.setSchema(schema.getProto());
++//      serializedResBuilder.setBytesNum(serializedBytes.length);
++//
++//      responseBuilder.setResultSet(serializedResBuilder);
++//      responseBuilder.setMaxRowNum(1);
++//      responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
++//      responseBuilder.setResult(IPCUtil.buildOkRequestResult());
++//=======
+   }
+ 
+   public static void stopScriptExecutors(EvalContext evalContext) {
+     for (TajoScriptEngine executor : evalContext.getAllScriptEngines()) {
+       executor.shutdown();
      }
    }
  

http://git-wip-us.apache.org/repos/asf/tajo/blob/882297e7/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --cc tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index 7d95f8b,a983f78..5544d53
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@@ -135,10 -134,22 +134,25 @@@ public class Task 
    }
  
    public void initPlan() throws IOException {
-     plan = LogicalNodeDeserializer.deserialize(queryContext, request.getPlan());
+     plan = LogicalNodeDeserializer.deserialize(queryContext, context.getEvalContext(), request.getPlan());
 +    updateDescsForScanNodes(NodeType.SCAN);
 +    updateDescsForScanNodes(NodeType.PARTITIONS_SCAN);
 +    updateDescsForScanNodes(NodeType.INDEX_SCAN);
+     LogicalNode [] scanNode = PlannerUtil.findAllNodes(plan, NodeType.SCAN);
+     if (scanNode != null) {
+       for (LogicalNode node : scanNode) {
+         ScanNode scan = (ScanNode) node;
+         descs.put(scan.getCanonicalName(), scan.getTableDesc());
+       }
+     }
+ 
+     LogicalNode [] partitionScanNode = PlannerUtil.findAllNodes(plan, NodeType.PARTITIONS_SCAN);
+     if (partitionScanNode != null) {
+       for (LogicalNode node : partitionScanNode) {
+         PartitionedTableScanNode scan = (PartitionedTableScanNode) node;
+         descs.put(scan.getCanonicalName(), scan.getTableDesc());
+       }
+     }
  
      interQuery = request.getProto().getInterQuery();
      if (interQuery) {
@@@ -178,19 -189,21 +192,32 @@@
      LOG.info("==================================");
    }
  
 +  private void updateDescsForScanNodes(NodeType nodeType) {
 +    assert nodeType == NodeType.SCAN || nodeType == NodeType.PARTITIONS_SCAN || nodeType == NodeType.INDEX_SCAN;
 +    LogicalNode[] scanNodes = PlannerUtil.findAllNodes(plan, nodeType);
 +    if (scanNodes != null) {
 +      for (LogicalNode node : scanNodes) {
 +        ScanNode scanNode = (ScanNode) node;
 +        descs.put(scanNode.getCanonicalName(), scanNode.getTableDesc());
 +      }
 +    }
 +  }
 +
+   private void startScriptExecutors() throws IOException {
+     for (TajoScriptEngine executor : context.getEvalContext().getAllScriptEngines()) {
+       executor.start(systemConf);
+     }
+   }
+ 
+   private void stopScriptExecutors() {
+     for (TajoScriptEngine executor : context.getEvalContext().getAllScriptEngines()) {
+       executor.shutdown();
+     }
+   }
+ 
    public void init() throws IOException {
      initPlan();
+     startScriptExecutors();
  
      if (context.getState() == TaskAttemptState.TA_PENDING) {
        // initialize a task temporal dir

http://git-wip-us.apache.org/repos/asf/tajo/blob/882297e7/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/882297e7/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/882297e7/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
----------------------------------------------------------------------
diff --cc tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
index c0b9deb,36ffd0c..6872ca5
--- a/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
@@@ -36,8 -36,10 +36,11 @@@ import org.apache.tajo.engine.codegen.T
  import org.apache.tajo.engine.function.FunctionLoader;
  import org.apache.tajo.engine.json.CoreGsonHelper;
  import org.apache.tajo.engine.parser.SQLAnalyzer;
 +import org.apache.tajo.engine.query.QueryContext;
+ import org.apache.tajo.function.FunctionSignature;
+ import org.apache.tajo.master.exec.QueryExecutor;
  import org.apache.tajo.plan.*;
+ import org.apache.tajo.plan.expr.EvalContext;
  import org.apache.tajo.plan.expr.EvalNode;
  import org.apache.tajo.plan.serder.EvalNodeDeserializer;
  import org.apache.tajo.plan.serder.EvalNodeSerializer;

http://git-wip-us.apache.org/repos/asf/tajo/blob/882297e7/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/882297e7/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/882297e7/tajo-plan/src/main/java/org/apache/tajo/plan/expr/AlgebraicUtil.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/882297e7/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalTreeUtil.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/882297e7/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilterPushDownRule.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/882297e7/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/LogicalPlanEqualityTester.java
----------------------------------------------------------------------
diff --cc tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/LogicalPlanEqualityTester.java
index 35e7a91,43a8618..89efdc8
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/LogicalPlanEqualityTester.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/LogicalPlanEqualityTester.java
@@@ -45,11 -45,10 +45,11 @@@ public class LogicalPlanEqualityTester 
    }
  
    @Override
 -  public LogicalPlan rewrite(OverridableConf queryContext, LogicalPlan plan) throws PlanningException {
 +  public LogicalPlan rewrite(LogicalPlanRewriteRuleContext context) throws PlanningException {
 +    LogicalPlan plan = context.getPlan();
      LogicalNode root = plan.getRootBlock().getRoot();
      PlanProto.LogicalNodeTree serialized = LogicalNodeSerializer.serialize(plan.getRootBlock().getRoot());
-     LogicalNode deserialized = LogicalNodeDeserializer.deserialize(context.getQueryContext(), serialized);
 -    LogicalNode deserialized = LogicalNodeDeserializer.deserialize(queryContext, null, serialized);
++    LogicalNode deserialized = LogicalNodeDeserializer.deserialize(context.getQueryContext(), null, serialized);
      assert root.deepEquals(deserialized);
      return plan;
    }

http://git-wip-us.apache.org/repos/asf/tajo/blob/882297e7/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java
----------------------------------------------------------------------


Mime
View raw message