tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jihoon...@apache.org
Subject [4/4] tajo git commit: TAJO-1344: Python UDF support. (jihoon)
Date Sat, 18 Apr 2015 02:57:57 GMT
TAJO-1344: Python UDF support. (jihoon)

Closes #526


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/a7453853
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/a7453853
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/a7453853

Branch: refs/heads/master
Commit: a7453853975dc8b5f4162ae16a1669aeb7ed995c
Parents: 14a1e53
Author: Jihoon Son <jihoonson@apache.org>
Authored: Sat Apr 18 11:56:47 2015 +0900
Committer: Jihoon Son <jihoonson@apache.org>
Committed: Sat Apr 18 11:57:24 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |   8 +-
 .../org/apache/tajo/catalog/CatalogUtil.java    |   3 +-
 .../org/apache/tajo/catalog/FunctionDesc.java   |   4 +-
 .../tajo/function/FunctionInvocation.java       |  24 +-
 .../org/apache/tajo/function/FunctionUtil.java  |  10 +
 .../tajo/function/PythonInvocationDesc.java     |  98 +++++
 .../src/main/proto/CatalogProtos.proto          |   6 +
 .../apache/tajo/catalog/TestFunctionDesc.java   |   4 +-
 .../tajo/catalog/store/AbstractDBStore.java     |   5 +-
 .../org/apache/tajo/catalog/store/MemStore.java |   1 -
 .../org/apache/tajo/catalog/TestCatalog.java    |   4 +-
 .../main/java/org/apache/tajo/QueryVars.java    |   2 +-
 .../java/org/apache/tajo/conf/TajoConf.java     |   4 +
 .../java/org/apache/tajo/datum/AnyDatum.java    |  82 +++++
 .../java/org/apache/tajo/datum/BlobDatum.java   |   5 +-
 .../org/apache/tajo/datum/DatumFactory.java     |  21 +-
 .../java/org/apache/tajo/json/DatumAdapter.java |   6 +-
 .../java/org/apache/tajo/storage/Tuple.java     |  54 +--
 .../java/org/apache/tajo/util/FileUtil.java     |  23 ++
 .../java/org/apache/tajo/util/KeyValueSet.java  |   2 +-
 .../tajo/util/datetime/DateTimeFormat.java      |   2 -
 .../tajo/engine/codegen/EvalCodeGenContext.java |   6 +-
 .../codegen/LegacyFunctionBindingEmitter.java   |   4 +-
 .../engine/codegen/VariablesPreBuilder.java     |   2 +-
 .../tajo/engine/function/FunctionLoader.java    |  59 ++-
 .../apache/tajo/engine/planner/Projector.java   |   2 +-
 .../rules/GlobalPlanEqualityTester.java         |   2 +-
 .../planner/physical/AggregationExec.java       |   2 +-
 .../planner/physical/BSTIndexScanExec.java      |   2 +-
 .../engine/planner/physical/CommonJoinExec.java |   2 +-
 .../DistinctGroupbyFirstAggregationExec.java    |   2 +-
 .../DistinctGroupbyHashAggregationExec.java     |   2 +-
 .../DistinctGroupbySecondAggregationExec.java   |   2 +-
 .../DistinctGroupbyThirdAggregationExec.java    |   2 +-
 .../engine/planner/physical/EvalExprExec.java   |   2 +-
 .../planner/physical/HashLeftOuterJoinExec.java |   4 +-
 .../engine/planner/physical/HavingExec.java     |   2 +-
 .../engine/planner/physical/SelectionExec.java  |   2 +-
 .../engine/planner/physical/SeqScanExec.java    |   4 +-
 .../engine/planner/physical/WindowAggExec.java  |   2 +-
 .../apache/tajo/engine/query/QueryContext.java  |   2 +-
 .../org/apache/tajo/master/GlobalEngine.java    |   2 -
 .../java/org/apache/tajo/master/TajoMaster.java |  16 +-
 .../apache/tajo/master/exec/QueryExecutor.java  |  81 ++--
 .../java/org/apache/tajo/worker/TajoWorker.java |  11 +-
 .../main/java/org/apache/tajo/worker/Task.java  |  35 +-
 .../apache/tajo/worker/TaskAttemptContext.java  |   7 +
 tajo-core/src/main/resources/python/__init__.py |  17 +
 .../src/main/resources/python/controller.py     | 330 +++++++++++++++++
 .../src/main/resources/python/tajo_util.py      | 103 ++++++
 .../org/apache/tajo/TajoTestingCluster.java     |   3 +
 .../apache/tajo/engine/eval/ExprTestBase.java   |  19 +-
 .../apache/tajo/engine/eval/TestEvalTree.java   |  68 ++--
 .../tajo/engine/eval/TestEvalTreeUtil.java      |  12 +-
 .../tajo/engine/function/TestMathFunctions.java |  10 +-
 .../engine/function/TestPythonFunctions.java    |  44 +++
 .../tajo/engine/query/TestGroupByQuery.java     |  14 +
 .../tajo/engine/query/TestSelectQuery.java      |  21 ++
 tajo-core/src/test/resources/python/__init__.py |  17 +
 .../src/test/resources/python/test_funcs.py     |  33 ++
 .../src/test/resources/python/test_funcs.pyc    | Bin 0 -> 1042 bytes
 .../src/test/resources/python/test_funcs2.py    |  32 ++
 .../testGroupbyWithPythonFunc.sql               |   1 +
 .../testGroupbyWithPythonFunc2.sql              |   1 +
 .../testNestedPythonFunction.sql                |   1 +
 .../TestSelectQuery/testSelectPythonFuncs.sql   |   2 +
 .../testSelectWithPredicateOnPythonFunc.sql     |   1 +
 .../testGroupbyWithPythonFunc.result            |   7 +
 .../testGroupbyWithPythonFunc2.result           |   7 +
 .../testNestedPythonFunction.result             |   7 +
 .../testSelectPythonFuncs.result                |   7 +
 .../testSelectWithPredicateOnPythonFunc.result  |  17 +
 tajo-docs/src/main/sphinx/functions.rst         |  70 +++-
 .../src/main/sphinx/functions/json_func.rst     |   1 +
 .../org/apache/tajo/plan/ExprAnnotator.java     |  26 +-
 .../org/apache/tajo/plan/LogicalPlanner.java    |   9 +-
 .../apache/tajo/plan/expr/AlgebraicUtil.java    |   6 +-
 .../tajo/plan/expr/BetweenPredicateEval.java    |  28 +-
 .../org/apache/tajo/plan/expr/CastEval.java     |  21 +-
 .../org/apache/tajo/plan/expr/EvalContext.java  |  45 +++
 .../org/apache/tajo/plan/expr/EvalNode.java     |   5 +-
 .../org/apache/tajo/plan/expr/EvalTreeUtil.java |   4 +-
 .../org/apache/tajo/plan/expr/FieldEval.java    |   4 +-
 .../org/apache/tajo/plan/expr/FunctionEval.java |   4 +-
 .../tajo/plan/expr/GeneralFunctionEval.java     |  56 +--
 .../plan/expr/PatternMatchPredicateEval.java    |   4 +-
 .../exprrewrite/EvalTreeOptimizationRule.java   |   3 +-
 .../plan/exprrewrite/EvalTreeOptimizer.java     |   1 +
 .../plan/exprrewrite/rules/ConstantFolding.java |  33 +-
 .../tajo/plan/function/FunctionInvoke.java      |  90 +++++
 .../plan/function/FunctionInvokeContext.java    |  74 ++++
 .../function/LegacyScalarFunctionInvoke.java    |  81 ++++
 .../plan/function/PythonFunctionInvoke.java     |  59 +++
 .../function/python/PythonScriptEngine.java     | 368 +++++++++++++++++++
 .../plan/function/python/TajoScriptEngine.java  |  83 +++++
 .../tajo/plan/function/stream/BufferPool.java   |  74 ++++
 .../function/stream/ByteBufInputChannel.java    |  71 ++++
 .../plan/function/stream/ByteBufLineReader.java | 176 +++++++++
 .../function/stream/CSVLineDeserializer.java    |  99 +++++
 .../tajo/plan/function/stream/CSVLineSerDe.java |  42 +++
 .../plan/function/stream/CSVLineSerializer.java | 118 ++++++
 .../stream/FieldSerializerDeserializer.java     |  36 ++
 .../function/stream/FieldSplitProcessor.java    |  34 ++
 .../tajo/plan/function/stream/InputHandler.java |  78 ++++
 .../function/stream/LineSplitProcessor.java     |  45 +++
 .../plan/function/stream/OutputHandler.java     | 156 ++++++++
 .../plan/function/stream/StreamingUtil.java     |  91 +++++
 .../stream/TextFieldSerializerDeserializer.java | 257 +++++++++++++
 .../function/stream/TextLineDeserializer.java   |  60 +++
 .../function/stream/TextLineParsingError.java   |  31 ++
 .../plan/function/stream/TextLineSerDe.java     |  65 ++++
 .../function/stream/TextLineSerializer.java     |  45 +++
 .../rules/LogicalPlanEqualityTester.java        |   2 +-
 .../rewrite/rules/PartitionedTableRewriter.java |   2 +-
 .../tajo/plan/serder/EvalNodeDeserializer.java  |  16 +-
 .../tajo/plan/serder/EvalNodeSerializer.java    |  10 +-
 .../plan/serder/LogicalNodeDeserializer.java    | 137 +++----
 tajo-plan/src/main/proto/Plan.proto             |  28 +-
 .../org/apache/tajo/storage/TestLazyTuple.java  |   2 +-
 .../testErrorTolerance1.json                    |  12 +-
 .../dataset/TestJsonSerDe/testVariousType.json  |   2 +-
 121 files changed, 3816 insertions(+), 349 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 4899ed2..2084737 100644
--- a/CHANGES
+++ b/CHANGES
@@ -4,14 +4,16 @@ Release 0.11.0 - unreleased
 
   NEW FEATURES
 
+    TAJO-1344: Python UDF support. (jihoon)
+
+    TAJO-923: Add VAR_SAMP and VAR_POP window functions. 
+    (Contributed by Dongjoon Hyun, Committed by jihoon)
+
     TAJO-1494: Add SeekableScanner support to DelimitedTextFileScanner.
     (jinho)
 
     TAJO-921: Add STDDEV_SAMP and STDDEV_POP window functions. (Keuntae Park)
 
-    TAJO-923: Add VAR_SAMP and VAR_POP window functions. 
-    (Contributed by Dongjoon Hyun, Committed by jihoon)
-
     TAJO-1135: Implement queryable virtual table for cluster information.
     (jihun)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
index 628d710..dcfad8d 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
@@ -24,7 +24,6 @@ import org.apache.tajo.DataTypeUtil;
 import org.apache.tajo.TajoConstants;
 import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
-import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto;
 import org.apache.tajo.catalog.proto.CatalogProtos.SchemaProto;
 import org.apache.tajo.catalog.proto.CatalogProtos.TableDescProto;
 import org.apache.tajo.common.TajoDataTypes;
@@ -418,7 +417,7 @@ public class CatalogUtil {
     //     (a)                    ()
     //    (a,b)                   (a)
 
-    int definedSize = definedTypes == null ? 0 : definedTypes.size();
+    int definedSize = definedTypes.size();
     int givenParamSize = givenTypes == null ? 0 : givenTypes.size();
     int paramDiff = givenParamSize - definedSize;
     if (paramDiff < 0) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/FunctionDesc.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/FunctionDesc.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/FunctionDesc.java
index 23d39f2..6ea6ac6 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/FunctionDesc.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/FunctionDesc.java
@@ -92,7 +92,7 @@ public class FunctionDesc implements ProtoObject<FunctionDescProto>, Cloneable,
    */
   public Function newInstance() throws InternalException {
     try {
-      Constructor<? extends Function> cons = getFuncClass().getConstructor();
+      Constructor<? extends Function> cons = getLegacyFuncClass().getConstructor();
       return cons.newInstance();
     } catch (Exception ioe) {
       throw new InternalException("Cannot initiate function " + signature);
@@ -124,7 +124,7 @@ public class FunctionDesc implements ProtoObject<FunctionDescProto>, Cloneable,
   ////////////////////////////////////////
 
   @SuppressWarnings("unchecked")
-  public Class<? extends Function> getFuncClass() {
+  public Class<? extends Function> getLegacyFuncClass() {
     return invocation.getLegacy().getFunctionClass();
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/function/FunctionInvocation.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/function/FunctionInvocation.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/function/FunctionInvocation.java
index 653bdb6..911d5dd 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/function/FunctionInvocation.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/function/FunctionInvocation.java
@@ -35,6 +35,8 @@ public class FunctionInvocation implements ProtoObject<FunctionInvocationProto>
   StaticMethodInvocationDesc scalarJIT;
   @Expose
   ClassBaseInvocationDesc<?> aggregationJIT;
+  @Expose
+  PythonInvocationDesc python;
 
   public FunctionInvocation() {
   }
@@ -55,6 +57,9 @@ public class FunctionInvocation implements ProtoObject<FunctionInvocationProto>
     if (proto.hasAggregationJIT()) {
       this.aggregationJIT = new ClassBaseInvocationDesc(proto.getAggregation());
     }
+    if (proto.hasPython()) {
+      this.python = new PythonInvocationDesc(proto.getPython());
+    }
   }
 
   public boolean isAvailable() {
@@ -121,6 +126,18 @@ public class FunctionInvocation implements ProtoObject<FunctionInvocationProto>
     return aggregationJIT;
   }
 
+  public boolean hasPython() {
+    return python != null;
+  }
+
+  public void setPython(PythonInvocationDesc python) {
+    this.python = python;
+  }
+
+  public PythonInvocationDesc getPython() {
+    return python;
+  }
+
   @Override
   public FunctionInvocationProto getProto() {
     FunctionInvocationProto.Builder builder = FunctionInvocationProto.newBuilder();
@@ -139,16 +156,19 @@ public class FunctionInvocation implements ProtoObject<FunctionInvocationProto>
     if (hasAggregationJIT()) {
       builder.setAggregationJIT(aggregationJIT.getProto());
     }
+    if (hasPython()) {
+      builder.setPython(python.getProto());
+    }
     return builder.build();
   }
 
   @Override
   public int hashCode() {
-    return Objects.hashCode(legacy, scalar, scalarJIT);
+    return Objects.hashCode(legacy, scalar, scalarJIT, python);
   }
 
   public String toString() {
     return "legacy=" + hasLegacy() + ",scalar=" + hasScalar() + ",agg=" + hasAggregation() +
-        ",scalarJIT=" + hasScalarJIT() + ",aggJIT=" + hasAggregationJIT();
+        ",scalarJIT=" + hasScalarJIT() + ",aggJIT=" + hasAggregationJIT() + ",python=" + hasPython();
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/function/FunctionUtil.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/function/FunctionUtil.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/function/FunctionUtil.java
index dee5d1c..ef70428 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/function/FunctionUtil.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/function/FunctionUtil.java
@@ -18,6 +18,8 @@
 
 package org.apache.tajo.function;
 
+import org.apache.tajo.catalog.FunctionDesc;
+
 import java.util.Collection;
 
 import static org.apache.tajo.common.TajoDataTypes.DataType;
@@ -53,4 +55,12 @@ public class FunctionUtil {
   public static boolean isNullableParam(Class<?> clazz) {
     return !clazz.isPrimitive();
   }
+
+  public static boolean isLegacyFunction(FunctionDesc desc) {
+    return desc.getInvocation().hasLegacy();
+  }
+
+  public static boolean isScriptFunction(FunctionDesc desc) {
+    return desc.getInvocation().hasPython();
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/function/PythonInvocationDesc.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/function/PythonInvocationDesc.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/function/PythonInvocationDesc.java
new file mode 100644
index 0000000..160b169
--- /dev/null
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/function/PythonInvocationDesc.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.function;
+
+import com.google.common.base.Objects;
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.catalog.proto.CatalogProtos.PythonInvocationDescProto;
+import org.apache.tajo.common.ProtoObject;
+import org.apache.tajo.util.TUtil;
+
+/**
+ * <code>PythonInvocationDesc</code> describes a function name
+ * and a file path to the script where the function is defined.
+ */
+public class PythonInvocationDesc implements ProtoObject<PythonInvocationDescProto>, Cloneable {
+  @Expose private String funcName;
+  @Expose private String filePath;
+
+  public PythonInvocationDesc() {
+
+  }
+
+  public PythonInvocationDesc(String funcName, String filePath) {
+    this.funcName = funcName;
+    this.filePath = filePath;
+  }
+
+  public void setFuncName(String funcName) {
+    this.funcName = funcName;
+  }
+
+  public void setFilePath(String filePath) {
+    this.filePath = filePath;
+  }
+
+  public PythonInvocationDesc(PythonInvocationDescProto proto) {
+    this(proto.getFuncName(), proto.getFilePath());
+  }
+
+  public String getName() {
+    return funcName;
+  }
+
+  public String getPath() {
+    return filePath;
+  }
+
+  @Override
+  public PythonInvocationDescProto getProto() {
+    PythonInvocationDescProto.Builder builder = PythonInvocationDescProto.newBuilder();
+    builder.setFuncName(funcName).setFilePath(filePath);
+    return builder.build();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o instanceof PythonInvocationDesc) {
+      PythonInvocationDesc other = (PythonInvocationDesc) o;
+      return TUtil.checkEquals(funcName, other.funcName) &&
+          TUtil.checkEquals(filePath, other.filePath);
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(funcName, filePath);
+  }
+
+  @Override
+  public String toString() {
+    return funcName + " at " + filePath;
+  }
+
+  @Override
+  public Object clone() throws CloneNotSupportedException {
+    PythonInvocationDesc clone = (PythonInvocationDesc) super.clone();
+    clone.funcName = funcName == null ? null : funcName;
+    clone.filePath = filePath == null ? null : filePath;
+    return clone;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
index 1402bbc..fd2cb19 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
+++ b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
@@ -411,6 +411,7 @@ message FunctionInvocationProto {
   optional ClassBaseInvocationDescProto aggregation = 3;
   optional StaticMethodInvocationDescProto scalarJIT = 4;
   optional ClassBaseInvocationDescProto aggregationJIT = 5;
+  optional PythonInvocationDescProto python = 6;
 }
 
 message ClassBaseInvocationDescProto {
@@ -423,3 +424,8 @@ message StaticMethodInvocationDescProto {
   required string returnClass = 3;
   repeated string paramClasses = 4;
 }
+
+message PythonInvocationDescProto {
+  required string funcName = 1;
+  required string filePath = 2;
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestFunctionDesc.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestFunctionDesc.java b/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestFunctionDesc.java
index 92d2aa4..4a67ce6 100644
--- a/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestFunctionDesc.java
+++ b/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestFunctionDesc.java
@@ -68,7 +68,7 @@ public class TestFunctionDesc {
     desc.setDetail("detail");
 
     assertEquals("sum", desc.getFunctionName());
-    assertEquals(TestSum.class, desc.getFuncClass());
+    assertEquals(TestSum.class, desc.getLegacyFuncClass());
     assertEquals(FunctionType.GENERAL, desc.getFuncType());
     assertEquals(Type.INT4, desc.getReturnType().getType());
     assertArrayEquals(CatalogUtil.newSimpleDataTypeArray(Type.INT4, Type.INT8),
@@ -84,7 +84,7 @@ public class TestFunctionDesc {
     FunctionDesc newDesc = new FunctionDesc(proto);
 
     assertEquals("sum", newDesc.getFunctionName());
-    assertEquals(TestSum.class, newDesc.getFuncClass());
+    assertEquals(TestSum.class, newDesc.getLegacyFuncClass());
     assertEquals(FunctionType.GENERAL, newDesc.getFuncType());
     assertEquals(Type.INT4, newDesc.getReturnType().getType());
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
index 1773033..b10e5a5 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
@@ -62,8 +62,6 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
 
   private Connection conn;
   
-  protected Map<String, Boolean> baseTableMaps = new HashMap<String, Boolean>();
-  
   protected XMLCatalogSchemaManager catalogSchemaManager;
 
   protected abstract String getCatalogDriverName();
@@ -1266,11 +1264,11 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
 
       conn = getConnection();
       pstmt = conn.prepareStatement(ADD_PARTITION_SQL);
-
       pstmt.setInt(1, tableId);
       pstmt.setString(2, partition.getPartitionName());
       pstmt.setString(3, partition.getPath());
       pstmt.executeUpdate();
+      pstmt.close();
 
       if (partition.getPartitionKeysCount() > 0) {
         pstmt = conn.prepareStatement(ADD_PARTITION_KEYS_SQL);
@@ -1349,6 +1347,7 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
       pstmt = conn.prepareStatement(sqlDeletePartitionKeys);
       pstmt.setInt(1, partitionId);
       pstmt.executeUpdate();
+      pstmt.close();
 
       pstmt = conn.prepareStatement(sqlDeletePartition);
       pstmt.setInt(1, partitionId);

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java
index b058504..821b00c 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java
@@ -30,7 +30,6 @@ import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.FunctionDesc;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.exception.*;
-import org.apache.tajo.catalog.partition.PartitionDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto;
 import org.apache.tajo.catalog.proto.CatalogProtos.DatabaseProto;

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
index 0a2a8cc..306f581 100644
--- a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
+++ b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
@@ -666,7 +666,7 @@ public class TestCatalog {
     FunctionDesc retrived = catalog.getFunction("test10", CatalogUtil.newSimpleDataTypeArray(Type.INT4, Type.BLOB));
 
     assertEquals(retrived.getFunctionName(), "test10");
-    assertEquals(retrived.getFuncClass(), TestFunc2.class);
+    assertEquals(retrived.getLegacyFuncClass(), TestFunc2.class);
     assertEquals(retrived.getFuncType(), FunctionType.GENERAL);
 
     assertFalse(catalog.containFunction("test10", CatalogUtil.newSimpleDataTypeArray(Type.BLOB, Type.INT4)));
@@ -685,7 +685,7 @@ public class TestCatalog {
 		FunctionDesc retrived = catalog.getFunction("test2", CatalogUtil.newSimpleDataTypeArray(Type.INT4));
 
 		assertEquals(retrived.getFunctionName(),"test2");
-		assertEquals(retrived.getFuncClass(),TestFunc1.class);
+		assertEquals(retrived.getLegacyFuncClass(),TestFunc1.class);
 		assertEquals(retrived.getFuncType(),FunctionType.UDF);
 	}
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-common/src/main/java/org/apache/tajo/QueryVars.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/QueryVars.java b/tajo-common/src/main/java/org/apache/tajo/QueryVars.java
index ba76d63..55ca700 100644
--- a/tajo-common/src/main/java/org/apache/tajo/QueryVars.java
+++ b/tajo-common/src/main/java/org/apache/tajo/QueryVars.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 8e2b3d2..bfba290 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -293,6 +293,10 @@ public class TajoConf extends Configuration {
     // Geo IP
     GEOIP_DATA("tajo.function.geoip-database-location", ""),
 
+    // Python UDF
+    PYTHON_CODE_DIR("tajo.function.python.code-dir", ""),
+    PYTHON_CONTROLLER_LOG_DIR("tajo.function.python.controller.log-dir", ""),
+
     /////////////////////////////////////////////////////////////////////////////////
     // User Session Configuration
     //

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-common/src/main/java/org/apache/tajo/datum/AnyDatum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/AnyDatum.java b/tajo-common/src/main/java/org/apache/tajo/datum/AnyDatum.java
new file mode 100644
index 0000000..0771a6e
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/AnyDatum.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.datum;
+
+import com.google.gson.annotations.Expose;
+
+import static org.apache.tajo.common.TajoDataTypes.Type.ANY;
+
+/**
+ * <code>AnyDatum</code> can contain any types of datum.
+ */
+public class AnyDatum extends Datum {
+  @Expose Datum val;
+
+  public AnyDatum(Datum val) {
+    super(ANY);
+    this.val = val;
+  }
+
+  public Datum getActual() {
+    return val;
+  }
+
+  @Override
+  public int size() {
+    return this.val.size();
+  }
+
+  @Override
+  public int hashCode() {
+    return val.hashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof AnyDatum) {
+      AnyDatum other = (AnyDatum) obj;
+      return val.equals(other.val);
+    }
+    return false;
+  }
+
+  @Override
+  public Datum equalsTo(Datum datum) {
+    if (datum.type() == ANY) {
+      AnyDatum other = (AnyDatum) datum;
+      return val.equalsTo(other.val);
+    }
+    return DatumFactory.createBool(false);
+  }
+
+  @Override
+  public int compareTo(Datum datum) {
+    if (datum.type() == ANY) {
+      AnyDatum other = (AnyDatum) datum;
+      return val.compareTo(other.val);
+    }
+    // Any datums will be lastly appeared.
+    return 1;
+  }
+
+  @Override
+  public String toString() {
+    return val.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-common/src/main/java/org/apache/tajo/datum/BlobDatum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/BlobDatum.java b/tajo-common/src/main/java/org/apache/tajo/datum/BlobDatum.java
index 4f296a1..cf190e2 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/BlobDatum.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/BlobDatum.java
@@ -23,6 +23,7 @@ package org.apache.tajo.datum;
 
 import com.google.gson.annotations.Expose;
 import org.apache.tajo.exception.InvalidOperationException;
+import org.apache.tajo.util.TUtil;
 
 import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
@@ -130,8 +131,8 @@ public class BlobDatum extends Datum {
       BlobDatum other = (BlobDatum) obj;
       initFromBytes();
       other.initFromBytes();
-      return bb.equals(other.bb);
-    }
+			return Arrays.equals(this.val, other.val);
+		}
     
     return false;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java b/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java
index 9f48cad..bd1b88f 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.datum;
 
 import com.google.protobuf.Message;
+import org.apache.commons.codec.binary.Base64;
 import org.apache.tajo.common.TajoDataTypes.DataType;
 import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.exception.InvalidCastException;
@@ -66,7 +67,7 @@ public class DatumFactory {
       case INET4:
         return Inet4Datum.class;
       case ANY:
-        return NullDatum.class;
+        return AnyDatum.class;
       case NULL_TYPE:
         return NullDatum.class;
       default:
@@ -377,16 +378,16 @@ public class DatumFactory {
     return new TimestampDatum(DateTimeUtil.toJulianTimestampWithTZ(str, tz));
   }
 
-  public static BlobDatum createBlob(byte[] val) {
-    return new BlobDatum(val);
+  public static BlobDatum createBlob(byte[] encoded) {
+    return new BlobDatum(encoded);
   }
 
-  public static BlobDatum createBlob(byte[] val, int offset, int length) {
-    return new BlobDatum(val, offset, length);
+  public static BlobDatum createBlob(byte[] encoded, int offset, int length) {
+    return new BlobDatum(encoded, offset, length);
   }
 
-  public static BlobDatum createBlob(String val) {
-    return new BlobDatum(val.getBytes());
+  public static BlobDatum createBlob(String plainString) {
+    return new BlobDatum(Base64.encodeBase64(plainString.getBytes()));
   }
 
   public static Inet4Datum createInet4(int encoded) {
@@ -405,6 +406,10 @@ public class DatumFactory {
     return new Inet4Datum(val);
   }
 
+  public static AnyDatum createAny(Datum val) {
+    return new AnyDatum(val);
+  }
+
   public static Datum cast(Datum operandDatum, DataType target, @Nullable TimeZone tz) {
     switch (target.getType()) {
     case BOOLEAN:
@@ -454,6 +459,8 @@ public class DatumFactory {
       return DatumFactory.createBlob(operandDatum.asByteArray());
     case INET4:
       return DatumFactory.createInet4(operandDatum.asByteArray());
+    case ANY:
+      return DatumFactory.createAny(operandDatum);
     default:
       throw new InvalidCastException(operandDatum.type(), target.getType());
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-common/src/main/java/org/apache/tajo/json/DatumAdapter.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/json/DatumAdapter.java b/tajo-common/src/main/java/org/apache/tajo/json/DatumAdapter.java
index d65559d..b9d8aef 100644
--- a/tajo-common/src/main/java/org/apache/tajo/json/DatumAdapter.java
+++ b/tajo-common/src/main/java/org/apache/tajo/json/DatumAdapter.java
@@ -41,8 +41,9 @@ public class DatumAdapter implements GsonSerDerAdapter<Datum> {
       return new TimestampDatum(CommonGsonHelper.getOrDie(jsonObject, "value").getAsLong());
     case INTERVAL:
       String[] values = CommonGsonHelper.getOrDie(jsonObject, "value").getAsString().split(",");
-
       return new IntervalDatum(Integer.parseInt(values[0]), Long.parseLong(values[1]));
+    case ANY:
+      return new AnyDatum(deserialize(CommonGsonHelper.getOrDie(jsonObject, "actual"), typeOfT, context));
     default:
       return context.deserialize(CommonGsonHelper.getOrDie(jsonObject, "body"),
           DatumFactory.getDatumClass(TajoDataTypes.Type.valueOf(typeName)));
@@ -67,6 +68,9 @@ public class DatumAdapter implements GsonSerDerAdapter<Datum> {
       IntervalDatum interval = (IntervalDatum)src;
       jsonObj.addProperty("value", interval.getMonths() + "," + interval.getMilliSeconds());
       break;
+    case ANY:
+      jsonObj.add("actual", serialize(((AnyDatum) src).getActual(), typeOfSrc, context));
+      break;
     default:
       jsonObj.add("body", context.serialize(src));
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-common/src/main/java/org/apache/tajo/storage/Tuple.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/Tuple.java b/tajo-common/src/main/java/org/apache/tajo/storage/Tuple.java
index 1ba1926..aec784f 100644
--- a/tajo-common/src/main/java/org/apache/tajo/storage/Tuple.java
+++ b/tajo-common/src/main/java/org/apache/tajo/storage/Tuple.java
@@ -22,58 +22,58 @@ import org.apache.tajo.datum.Datum;
 
 public interface Tuple extends Cloneable {
   
-	public int size();
+	int size();
 	
-	public boolean contains(int fieldid);
+	boolean contains(int fieldid);
 
-  public boolean isNull(int fieldid);
+  boolean isNull(int fieldid);
 
   @SuppressWarnings("unused")
-  public boolean isNotNull(int fieldid);
+  boolean isNotNull(int fieldid);
 	
-	public void clear();
+	void clear();
 	
-	public void put(int fieldId, Datum value);
+	void put(int fieldId, Datum value);
 
-  public void put(int fieldId, Datum[] values);
+  void put(int fieldId, Datum[] values);
 
-  public void put(int fieldId, Tuple tuple);
+  void put(int fieldId, Tuple tuple);
 	
-	public void put(Datum[] values);
+	void put(Datum[] values);
 	
-	public Datum get(int fieldId);
+	Datum get(int fieldId);
 	
-	public void setOffset(long offset);
+	void setOffset(long offset);
 	
-	public long getOffset();
+	long getOffset();
 
-	public boolean getBool(int fieldId);
+	boolean getBool(int fieldId);
 
-	public byte getByte(int fieldId);
+	byte getByte(int fieldId);
 
-  public char getChar(int fieldId);
+  char getChar(int fieldId);
 	
-	public byte [] getBytes(int fieldId);
+	byte [] getBytes(int fieldId);
 	
-	public short getInt2(int fieldId);
+	short getInt2(int fieldId);
 	
-	public int getInt4(int fieldId);
+	int getInt4(int fieldId);
 	
-	public long getInt8(int fieldId);
+	long getInt8(int fieldId);
 	
-	public float getFloat4(int fieldId);
+	float getFloat4(int fieldId);
 	
-	public double getFloat8(int fieldId);
+	double getFloat8(int fieldId);
 	
-	public String getText(int fieldId);
+	String getText(int fieldId);
 
-  public Datum getProtobufDatum(int fieldId);
+  Datum getProtobufDatum(int fieldId);
 
-  public Datum getInterval(int fieldId);
+  Datum getInterval(int fieldId);
 
-  public char [] getUnicodeChars(int fieldId);
+  char [] getUnicodeChars(int fieldId);
 
-  public Tuple clone() throws CloneNotSupportedException;
+  Tuple clone() throws CloneNotSupportedException;
 
-  public Datum[] getValues();
+  Datum[] getValues();
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-common/src/main/java/org/apache/tajo/util/FileUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/FileUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/FileUtil.java
index 9403a2f..3e3d3a2 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/FileUtil.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/FileUtil.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.util;
 
 import com.google.protobuf.Message;
+import org.apache.commons.logging.Log;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.io.IOUtils;
 
@@ -143,4 +144,26 @@ public class FileUtil {
   public static boolean isLocalPath(Path path) {
     return path.toUri().getScheme().equals("file");
   }
+
+
+  /**
+   * Close the Closeable objects and <b>ignore</b> any {@link IOException} or
+   * null pointers. Must only be used for cleanup in exception handlers.
+   *
+   * @param log the log to record problems to at debug level. Can be null.
+   * @param closeables the objects to close
+   */
+  public static void cleanup(Log log, java.io.Closeable... closeables) {
+    for (java.io.Closeable c : closeables) {
+      if (c != null) {
+        try {
+          c.close();
+        } catch(IOException e) {
+          if (log != null && log.isDebugEnabled()) {
+            log.debug("Exception in closing " + c, e);
+          }
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java b/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java
index 6af0c9e..5dba9e2 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java
@@ -104,7 +104,7 @@ public class KeyValueSet implements ProtoObject<KeyValueSetProto>, Cloneable, Gs
     } else if (defaultVal != null) {
       return defaultVal;
     } else {
-      throw new IllegalArgumentException("No such a config key: "  + key);
+      throw new IllegalArgumentException("No such config key: "  + key);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-common/src/main/java/org/apache/tajo/util/datetime/DateTimeFormat.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/datetime/DateTimeFormat.java b/tajo-common/src/main/java/org/apache/tajo/util/datetime/DateTimeFormat.java
index c3aa71e..798b9c5 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/datetime/DateTimeFormat.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/datetime/DateTimeFormat.java
@@ -477,7 +477,6 @@ public class DateTimeFormat {
       KeyWord keyword = new KeyWord();
       keyword.name = (String)eachKeywordValue[0];
       keyword.len = ((Integer)eachKeywordValue[1]).intValue();
-      keyword.id = ((DCH_poz)eachKeywordValue[2]).getValue();
       keyword.idType = ((DCH_poz)eachKeywordValue[2]);
       keyword.is_digit = ((Boolean)eachKeywordValue[3]).booleanValue();
       keyword.date_mode = (FromCharDateMode)eachKeywordValue[4];
@@ -513,7 +512,6 @@ public class DateTimeFormat {
   static class KeyWord {
     String name;
     int len;
-    int id;
     DCH_poz idType;
     boolean is_digit;
     FromCharDateMode date_mode;

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/main/java/org/apache/tajo/engine/codegen/EvalCodeGenContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/codegen/EvalCodeGenContext.java b/tajo-core/src/main/java/org/apache/tajo/engine/codegen/EvalCodeGenContext.java
index c8197b7..32fb562 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/codegen/EvalCodeGenContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/codegen/EvalCodeGenContext.java
@@ -174,7 +174,7 @@ public class EvalCodeGenContext extends TajoGeneratorAdapter {
 
       } else if (entry.getKey().getType() == EvalType.FUNCTION) {
         GeneralFunctionEval function = (GeneralFunctionEval) entry.getKey();
-        final String internalName = TajoGeneratorAdapter.getInternalName(function.getFuncDesc().getFuncClass());
+        final String internalName = TajoGeneratorAdapter.getInternalName(function.getFuncDesc().getLegacyFuncClass());
 
         // new and initialization of function
         initMethod.visitTypeInsn(Opcodes.NEW, internalName);
@@ -198,12 +198,12 @@ public class EvalCodeGenContext extends TajoGeneratorAdapter {
 
         initMethod.visitVarInsn(Opcodes.ALOAD, FUNCTION);
         consAdapter.aload(PARAM_TYPE_ARRAY);
-        consAdapter.invokeVirtual(function.getFuncDesc().getFuncClass(), "init", void.class, new Class[] {FunctionEval.ParamType[].class});
+        consAdapter.invokeVirtual(function.getFuncDesc().getLegacyFuncClass(), "init", void.class, new Class[] {FunctionEval.ParamType[].class});
 
         initMethod.visitVarInsn(Opcodes.ALOAD, 0);
         initMethod.visitVarInsn(Opcodes.ALOAD, FUNCTION);
         initMethod.visitFieldInsn(Opcodes.PUTFIELD, this.owner, entry.getValue(),
-            "L" + TajoGeneratorAdapter.getInternalName(function.getFuncDesc().getFuncClass()) + ";");
+            "L" + TajoGeneratorAdapter.getInternalName(function.getFuncDesc().getLegacyFuncClass()) + ";");
 
       }
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/main/java/org/apache/tajo/engine/codegen/LegacyFunctionBindingEmitter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/codegen/LegacyFunctionBindingEmitter.java b/tajo-core/src/main/java/org/apache/tajo/engine/codegen/LegacyFunctionBindingEmitter.java
index 36dfe4f..6b47149 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/codegen/LegacyFunctionBindingEmitter.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/codegen/LegacyFunctionBindingEmitter.java
@@ -63,12 +63,12 @@ public class LegacyFunctionBindingEmitter {
     FunctionDesc desc = func.getFuncDesc();
 
     String fieldName = context.symbols.get(func);
-    String funcDescName = "L" + TajoGeneratorAdapter.getInternalName(desc.getFuncClass()) + ";";
+    String funcDescName = "L" + TajoGeneratorAdapter.getInternalName(desc.getLegacyFuncClass()) + ";";
 
     context.aload(0);
     context.methodvisitor.visitFieldInsn(Opcodes.GETFIELD, context.owner, fieldName, funcDescName);
     context.aload(TUPLE);
-    context.invokeVirtual(desc.getFuncClass(), "eval", Datum.class, new Class[] {Tuple.class});
+    context.invokeVirtual(desc.getLegacyFuncClass(), "eval", Datum.class, new Class[] {Tuple.class});
 
     context.convertToPrimitive(func.getValueType());
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/main/java/org/apache/tajo/engine/codegen/VariablesPreBuilder.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/codegen/VariablesPreBuilder.java b/tajo-core/src/main/java/org/apache/tajo/engine/codegen/VariablesPreBuilder.java
index a055b04..95ec371 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/codegen/VariablesPreBuilder.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/codegen/VariablesPreBuilder.java
@@ -75,7 +75,7 @@ class VariablesPreBuilder extends SimpleEvalNodeVisitor<EvalCodeGenContext> {
       String fieldName = function.getFuncDesc().getFunctionName() + "_" + context.seqId++;
       context.symbols.put(function, fieldName);
       context.classWriter.visitField(Opcodes.ACC_PRIVATE, fieldName,
-          "L" + TajoGeneratorAdapter.getInternalName(function.getFuncDesc().getFuncClass()) + ";", null, null);
+          "L" + TajoGeneratorAdapter.getInternalName(function.getFuncDesc().getLegacyFuncClass()) + ";", null, null);
     }
 
     return function;

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/main/java/org/apache/tajo/engine/function/FunctionLoader.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/FunctionLoader.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/FunctionLoader.java
index 3b3e7c7..6061d1b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/FunctionLoader.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/FunctionLoader.java
@@ -25,16 +25,24 @@ import com.google.common.collect.Sets;
 import org.apache.commons.collections.Predicate;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.tajo.annotation.Nullable;
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.FunctionDesc;
 import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.function.annotation.Description;
 import org.apache.tajo.engine.function.annotation.ParamOptionTypes;
 import org.apache.tajo.engine.function.annotation.ParamTypes;
 import org.apache.tajo.function.*;
+import org.apache.tajo.plan.function.python.PythonScriptEngine;
 import org.apache.tajo.util.ClassUtil;
+import org.apache.tajo.util.TUtil;
 
+import java.io.IOException;
 import java.lang.reflect.Method;
 import java.lang.reflect.Modifier;
 import java.util.*;
@@ -44,8 +52,14 @@ import static org.apache.tajo.catalog.proto.CatalogProtos.FunctionType.GENERAL;
 public class FunctionLoader {
 
   private static Log LOG = LogFactory.getLog(FunctionLoader.class);
+  public static final String PYTHON_FUNCTION_NAMESPACE = "python";
 
-  public static Collection<FunctionDesc> load() {
+  /**
+   * Load built-in functions
+   *
+   * @return
+   */
+  public static Map<FunctionSignature, FunctionDesc> load() {
     Map<FunctionSignature, FunctionDesc> map = Maps.newHashMap();
 
     List<FunctionDesc> dd = Lists.newArrayList();
@@ -66,7 +80,48 @@ public class FunctionLoader {
       }
     }
 
-    return map.values();
+    return map;
+  }
+
+  /**
+   * Load functions that are defined by users.
+   *
+   * @param conf
+   * @param functionMap
+   * @return
+   * @throws IOException
+   */
+  public static Map<FunctionSignature, FunctionDesc> loadUserDefinedFunctions(TajoConf conf,
+                                                                              Map<FunctionSignature, FunctionDesc> functionMap)
+      throws IOException {
+
+    String[] codePaths = conf.getStrings(TajoConf.ConfVars.PYTHON_CODE_DIR.varname);
+    if (codePaths != null) {
+      FileSystem localFS = FileSystem.getLocal(conf);
+      for (String codePathStr : codePaths) {
+        Path codePath = new Path(codePathStr);
+        List<Path> filePaths = TUtil.newList();
+        if (localFS.isDirectory(codePath)) {
+          for (FileStatus file : localFS.listStatus(codePath, new PathFilter() {
+            @Override
+            public boolean accept(Path path) {
+              return path.getName().endsWith(PythonScriptEngine.FILE_EXTENSION);
+            }
+          })) {
+            filePaths.add(file.getPath());
+          }
+        } else {
+          filePaths.add(codePath);
+        }
+        for (Path filePath : filePaths) {
+          for (FunctionDesc f : PythonScriptEngine.registerFunctions(filePath.toUri(),
+              FunctionLoader.PYTHON_FUNCTION_NAMESPACE)) {
+            functionMap.put(f.getSignature(), f);
+          }
+        }
+      }
+    }
+    return functionMap;
   }
 
   public static Set<FunctionDesc> findScalarFunctions() {

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/main/java/org/apache/tajo/engine/planner/Projector.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/Projector.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/Projector.java
index cec1862..a73478f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/Projector.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/Projector.java
@@ -63,7 +63,7 @@ public class Projector {
 
   public void init() {
     for (EvalNode eval : evals) {
-      eval.bind(inSchema);
+      eval.bind(context.getEvalContext(), inSchema);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanEqualityTester.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanEqualityTester.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanEqualityTester.java
index e2fd47f..e55a258 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanEqualityTester.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanEqualityTester.java
@@ -51,7 +51,7 @@ public class GlobalPlanEqualityTester implements GlobalPlanRewriteRule {
         LogicalNode node = eb.getPlan();
         if (node != null) {
           PlanProto.LogicalNodeTree tree = LogicalNodeSerializer.serialize(node);
-          LogicalNode deserialize = LogicalNodeDeserializer.deserialize(plan.getContext(), tree);
+          LogicalNode deserialize = LogicalNodeDeserializer.deserialize(plan.getContext(), null, tree);
           assert node.deepEquals(deserialize);
         }
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java
index 6d9e38a..4b53b39 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java
@@ -63,7 +63,7 @@ public abstract class AggregationExec extends UnaryPhysicalExec {
   public void init() throws IOException {
     super.init();
     for (EvalNode aggFunction : aggFunctions) {
-      aggFunction.bind(inSchema);
+      aggFunction.bind(context.getEvalContext(), inSchema);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
index be6c046..806d34c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
@@ -72,7 +72,7 @@ public class BSTIndexScanExec extends PhysicalExec {
     super.init();
     progress = 0.0f;
     if (qual != null) {
-      qual.bind(inSchema);
+      qual.bind(context.getEvalContext(), inSchema);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonJoinExec.java
index 0781041..2535edf 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonJoinExec.java
@@ -54,7 +54,7 @@ public abstract class CommonJoinExec extends BinaryPhysicalExec {
   public void init() throws IOException {
     super.init();
     if (hasJoinQual) {
-      joinQual.bind(inSchema);
+      joinQual.bind(context.getEvalContext(), inSchema);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java
index 37bc5a7..94429a0 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java
@@ -250,7 +250,7 @@ public class DistinctGroupbyFirstAggregationExec extends UnaryPhysicalExec {
       }
 
       for (AggregationFunctionCallEval eachFunction: aggFunctions) {
-        eachFunction.bind(inSchema);
+        eachFunction.bind(context.getEvalContext(), inSchema);
         eachFunction.setFirstPhase();
       }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java
index e96e750..0f25d6c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java
@@ -375,7 +375,7 @@ public class DistinctGroupbyHashAggregationExec extends UnaryPhysicalExec {
       }
 
       for (EvalNode aggFunction : aggFunctions) {
-        aggFunction.bind(schema);
+        aggFunction.bind(context.getEvalContext(), schema);
       }
 
       tupleSize = groupingKeyIds.length + aggFunctionsNum;

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java
index 7b01a9b..b394390 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java
@@ -119,7 +119,7 @@ public class DistinctGroupbySecondAggregationExec extends UnaryPhysicalExec {
         nonDistinctAggrFunctions = eachGroupby.getAggFunctions();
         if (nonDistinctAggrFunctions != null) {
           for (AggregationFunctionCallEval eachFunction: nonDistinctAggrFunctions) {
-            eachFunction.bind(inSchema);
+            eachFunction.bind(context.getEvalContext(), inSchema);
             eachFunction.setIntermediatePhase();
           }
           nonDistinctAggrContexts = new FunctionContext[nonDistinctAggrFunctions.length];

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java
index 7bd71e2..e71976c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java
@@ -252,7 +252,7 @@ public class DistinctGroupbyThirdAggregationExec extends UnaryPhysicalExec {
       aggrFunctions = groupbyNode.getAggFunctions();
       if (aggrFunctions != null) {
         for (AggregationFunctionCallEval eachFunction: aggrFunctions) {
-          eachFunction.bind(inSchema);
+          eachFunction.bind(context.getEvalContext(), inSchema);
           eachFunction.setFinalPhase();
         }
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java
index 32ec772..4581b4a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java
@@ -41,7 +41,7 @@ public class EvalExprExec extends PhysicalExec {
     super.init();
     progress = 0.0f;
     for (Target target : plan.getTargets()) {
-      target.getEvalTree().bind(inSchema);
+      target.getEvalTree().bind(context.getEvalContext(), inSchema);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
index fa9ba94..6f573d0 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
@@ -121,9 +121,9 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
 
     rightNumCols = rightChild.getSchema().size();
 
-    joinQual.bind(inSchema);
+    joinQual.bind(context.getEvalContext(), inSchema);
     if (joinFilter != null) {
-      joinFilter.bind(inSchema);
+      joinFilter.bind(context.getEvalContext(), inSchema);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java
index b71c770..6897e92 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java
@@ -39,7 +39,7 @@ public class HavingExec extends UnaryPhysicalExec  {
   @Override
   public void init() throws IOException {
     super.init();
-    qual.bind(inSchema);
+    qual.bind(context.getEvalContext(), inSchema);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java
index c090fa7..7f5bbe9 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java
@@ -39,7 +39,7 @@ public class SelectionExec extends UnaryPhysicalExec  {
   @Override
   public void init() throws IOException {
     super.init();
-    qual.bind(inSchema);
+    qual.bind(context.getEvalContext(), inSchema);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
index 671555c..ff9477f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
@@ -109,7 +109,7 @@ public class SeqScanExec extends ScanExec {
       FieldEval targetExpr = new FieldEval(column);
       Datum datum = NullDatum.get();
       if (partitionRow != null) {
-        targetExpr.bind(columnPartitionSchema);
+        targetExpr.bind(context.getEvalContext(), columnPartitionSchema);
         datum = targetExpr.eval(partitionRow);
       }
       ConstEval constExpr = new ConstEval(datum);
@@ -163,7 +163,7 @@ public class SeqScanExec extends ScanExec {
     super.init();
 
     if (plan.hasQual()) {
-      qual.bind(inSchema);
+      qual.bind(context.getEvalContext(), inSchema);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java
index 2f1fc46..05b0418 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java
@@ -183,7 +183,7 @@ public class WindowAggExec extends UnaryPhysicalExec {
   public void init() throws IOException {
     super.init();
     for (EvalNode functionEval : functions) {
-      functionEval.bind(inSchema);
+      functionEval.bind(context.getEvalContext(), inSchema);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java
index 7b3c00d..ee50221 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java
@@ -25,8 +25,8 @@ import org.apache.tajo.QueryVars;
 import org.apache.tajo.SessionVars;
 import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.session.Session;
 import org.apache.tajo.plan.logical.NodeType;
+import org.apache.tajo.session.Session;
 
 import static org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.KeyValueSetProto;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
index 08403ff..074f34e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -44,7 +44,6 @@ import org.apache.tajo.ipc.ClientProtos;
 import org.apache.tajo.master.TajoMaster.MasterContext;
 import org.apache.tajo.master.exec.DDLExecutor;
 import org.apache.tajo.master.exec.QueryExecutor;
-import org.apache.tajo.master.exec.prehook.DistributedQueryHookManager;
 import org.apache.tajo.session.Session;
 import org.apache.tajo.plan.*;
 import org.apache.tajo.plan.logical.InsertNode;
@@ -77,7 +76,6 @@ public class GlobalEngine extends AbstractService {
   private LogicalPlanner planner;
   private LogicalOptimizer optimizer;
   private LogicalPlanVerifier annotatedPlanVerifier;
-  private DistributedQueryHookManager hookManager;
 
   private QueryExecutor queryExecutor;
   private DDLExecutor ddlExecutor;

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
index 51f82f8..0a5de58 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
@@ -21,10 +21,7 @@ package org.apache.tajo.master;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.service.CompositeService;
@@ -36,10 +33,12 @@ import org.apache.hadoop.yarn.util.RackResolver;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.tajo.catalog.CatalogServer;
 import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.catalog.FunctionDesc;
 import org.apache.tajo.catalog.LocalCatalogWrapper;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.engine.function.FunctionLoader;
+import org.apache.tajo.function.FunctionSignature;
 import org.apache.tajo.master.rm.TajoWorkerResourceManager;
 import org.apache.tajo.master.rm.WorkerResourceManager;
 import org.apache.tajo.metrics.CatalogMetricsGaugeSet;
@@ -68,7 +67,9 @@ import java.lang.management.ThreadMXBean;
 import java.lang.reflect.Constructor;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 
 import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
 import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
@@ -177,7 +178,7 @@ public class TajoMaster extends CompositeService {
       diagnoseTajoMaster();
       this.storeManager = StorageManager.getFileStorageManager(systemConf);
 
-      catalogServer = new CatalogServer(FunctionLoader.load());
+      catalogServer = new CatalogServer(loadFunctions());
       addIfService(catalogServer);
       catalog = new LocalCatalogWrapper(catalogServer, systemConf);
 
@@ -207,6 +208,11 @@ public class TajoMaster extends CompositeService {
     LOG.info("Tajo Master is initialized.");
   }
 
+  private Collection<FunctionDesc> loadFunctions() throws IOException {
+    Map<FunctionSignature, FunctionDesc> functionMap = FunctionLoader.load();
+    return FunctionLoader.loadUserDefinedFunctions(systemConf, functionMap).values();
+  }
+
   private void initSystemMetrics() {
     systemMetrics = new TajoSystemMetrics(systemConf, METRICS_GROUP_NAME, getMasterName());
     systemMetrics.start();

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
index 2eb3c5f..ad1a8e3 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
@@ -47,6 +47,10 @@ import org.apache.tajo.master.*;
 import org.apache.tajo.master.exec.prehook.CreateTableHook;
 import org.apache.tajo.master.exec.prehook.DistributedQueryHookManager;
 import org.apache.tajo.master.exec.prehook.InsertIntoHook;
+import org.apache.tajo.plan.expr.EvalContext;
+import org.apache.tajo.plan.expr.GeneralFunctionEval;
+import org.apache.tajo.plan.function.python.PythonScriptEngine;
+import org.apache.tajo.plan.function.python.TajoScriptEngine;
 import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
 import org.apache.tajo.querymaster.*;
 import org.apache.tajo.session.Session;
@@ -113,11 +117,9 @@ public class QueryExecutor {
     } else if (PlannerUtil.checkIfSimpleQuery(plan)) {
       execSimpleQuery(queryContext, session, sql, plan, response);
 
-
       // NonFromQuery indicates a form of 'select a, x+y;'
     } else if (PlannerUtil.checkIfNonFromQuery(plan)) {
-      execNonFromQuery(queryContext, session, sql, plan, response);
-
+      execNonFromQuery(queryContext, plan, response);
 
     } else { // it requires distributed execution. So, the query is forwarded to a query master.
       executeDistributedQuery(queryContext, session, plan, sql, jsonExpr, response);
@@ -263,37 +265,66 @@ public class QueryExecutor {
     response.setResultCode(ClientProtos.ResultCode.OK);
   }
 
-  public void execNonFromQuery(QueryContext queryContext, Session session, String query,
-                               LogicalPlan plan, SubmitQueryResponse.Builder responseBuilder) throws Exception {
+  public void execNonFromQuery(QueryContext queryContext, LogicalPlan plan, SubmitQueryResponse.Builder responseBuilder)
+      throws Exception {
     LogicalRootNode rootNode = plan.getRootBlock().getRoot();
 
+    EvalContext evalContext = new EvalContext();
     Target[] targets = plan.getRootBlock().getRawTargets();
     if (targets == null) {
       throw new PlanningException("No targets");
     }
-    final Tuple outTuple = new VTuple(targets.length);
+    try {
+      // start script executor
+      startScriptExecutors(queryContext, evalContext, targets);
+      final Tuple outTuple = new VTuple(targets.length);
+      for (int i = 0; i < targets.length; i++) {
+        EvalNode eval = targets[i].getEvalTree();
+        eval.bind(evalContext, null);
+        outTuple.put(i, eval.eval(null));
+      }
+      boolean isInsert = rootNode.getChild() != null && rootNode.getChild().getType() == NodeType.INSERT;
+      if (isInsert) {
+        InsertNode insertNode = rootNode.getChild();
+        insertNonFromQuery(queryContext, insertNode, responseBuilder);
+      } else {
+        Schema schema = PlannerUtil.targetToSchema(targets);
+        RowStoreUtil.RowStoreEncoder encoder = RowStoreUtil.createEncoder(schema);
+        byte[] serializedBytes = encoder.toBytes(outTuple);
+        ClientProtos.SerializedResultSet.Builder serializedResBuilder = ClientProtos.SerializedResultSet.newBuilder();
+        serializedResBuilder.addSerializedTuples(ByteString.copyFrom(serializedBytes));
+        serializedResBuilder.setSchema(schema.getProto());
+        serializedResBuilder.setBytesNum(serializedBytes.length);
+
+        responseBuilder.setResultSet(serializedResBuilder);
+        responseBuilder.setMaxRowNum(1);
+        responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
+        responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
+      }
+    } finally {
+      // stop script executor
+      stopScriptExecutors(evalContext);
+    }
+  }
+
+  public static void startScriptExecutors(QueryContext queryContext, EvalContext evalContext, Target[] targets)
+      throws IOException {
     for (int i = 0; i < targets.length; i++) {
       EvalNode eval = targets[i].getEvalTree();
-      eval.bind(null);
-      outTuple.put(i, eval.eval(null));
+      if (eval instanceof GeneralFunctionEval) {
+        GeneralFunctionEval functionEval = (GeneralFunctionEval) eval;
+        if (functionEval.getFuncDesc().getInvocation().hasPython()) {
+          TajoScriptEngine scriptExecutor = new PythonScriptEngine(functionEval.getFuncDesc());
+          evalContext.addScriptEngine(eval, scriptExecutor);
+          scriptExecutor.start(queryContext.getConf());
+        }
+      }
     }
-    boolean isInsert = rootNode.getChild() != null && rootNode.getChild().getType() == NodeType.INSERT;
-    if (isInsert) {
-      InsertNode insertNode = rootNode.getChild();
-      insertNonFromQuery(queryContext, insertNode, responseBuilder);
-    } else {
-      Schema schema = PlannerUtil.targetToSchema(targets);
-      RowStoreUtil.RowStoreEncoder encoder = RowStoreUtil.createEncoder(schema);
-      byte[] serializedBytes = encoder.toBytes(outTuple);
-      ClientProtos.SerializedResultSet.Builder serializedResBuilder = ClientProtos.SerializedResultSet.newBuilder();
-      serializedResBuilder.addSerializedTuples(ByteString.copyFrom(serializedBytes));
-      serializedResBuilder.setSchema(schema.getProto());
-      serializedResBuilder.setBytesNum(serializedBytes.length);
-
-      responseBuilder.setResultSet(serializedResBuilder);
-      responseBuilder.setMaxRowNum(1);
-      responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
-      responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
+  }
+
+  public static void stopScriptExecutors(EvalContext evalContext) {
+    for (TajoScriptEngine executor : evalContext.getAllScriptEngines()) {
+      executor.shutdown();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
index 1d0293b..17af71a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -33,7 +33,10 @@ import org.apache.hadoop.yarn.util.RackResolver;
 import org.apache.tajo.TajoConstants;
 import org.apache.tajo.catalog.CatalogClient;
 import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.catalog.FunctionDesc;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.function.FunctionLoader;
+import org.apache.tajo.function.FunctionSignature;
 import org.apache.tajo.service.ServiceTracker;
 import org.apache.tajo.service.ServiceTrackerFactory;
 import org.apache.tajo.service.TajoMasterInfo;
@@ -51,10 +54,7 @@ import org.apache.tajo.rule.SelfDiagnosisRuleEngine;
 import org.apache.tajo.rule.SelfDiagnosisRuleSession;
 import org.apache.tajo.storage.HashShuffleAppenderManager;
 import org.apache.tajo.storage.StorageManager;
-import org.apache.tajo.util.CommonTestingUtil;
-import org.apache.tajo.util.JvmPauseMonitor;
-import org.apache.tajo.util.NetUtils;
-import org.apache.tajo.util.StringUtils;
+import org.apache.tajo.util.*;
 import org.apache.tajo.util.history.HistoryReader;
 import org.apache.tajo.util.history.HistoryWriter;
 import org.apache.tajo.util.metrics.TajoSystemMetrics;
@@ -65,6 +65,7 @@ import java.lang.management.ManagementFactory;
 import java.lang.management.ThreadInfo;
 import java.lang.management.ThreadMXBean;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -235,6 +236,8 @@ public class TajoWorker extends CompositeService {
     taskHistoryWriter.init(conf);
 
     historyReader = new HistoryReader(workerContext.getWorkerName(), this.systemConf);
+
+    FunctionLoader.loadUserDefinedFunctions(systemConf, new HashMap<FunctionSignature, FunctionDesc>());
     
     diagnoseTajoWorker();
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index 61a05dc..a983f78 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -21,7 +21,7 @@ package org.apache.tajo.worker;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-
+import io.netty.handler.codec.http.QueryStringDecoder;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -30,25 +30,26 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.TajoProtos;
 import org.apache.tajo.TajoProtos.TaskAttemptState;
+import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.master.cluster.WorkerConnectionInfo;
-import org.apache.tajo.plan.serder.LogicalNodeDeserializer;
-import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.engine.planner.physical.PhysicalExec;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.engine.query.TaskRequest;
 import org.apache.tajo.ipc.QueryMasterProtocol;
 import org.apache.tajo.ipc.TajoWorkerProtocol.*;
 import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty.EnforceType;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.plan.function.python.TajoScriptEngine;
 import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.plan.serder.LogicalNodeDeserializer;
+import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.pullserver.TajoPullServerService;
 import org.apache.tajo.pullserver.retriever.FileChunk;
 import org.apache.tajo.rpc.NettyClientBase;
@@ -57,8 +58,6 @@ import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.util.NetUtils;
 
-import io.netty.handler.codec.http.QueryStringDecoder;
-
 import java.io.File;
 import java.io.IOException;
 import java.net.InetAddress;
@@ -135,7 +134,7 @@ public class Task {
   }
 
   public void initPlan() throws IOException {
-    plan = LogicalNodeDeserializer.deserialize(queryContext, request.getPlan());
+    plan = LogicalNodeDeserializer.deserialize(queryContext, context.getEvalContext(), request.getPlan());
     LogicalNode [] scanNode = PlannerUtil.findAllNodes(plan, NodeType.SCAN);
     if (scanNode != null) {
       for (LogicalNode node : scanNode) {
@@ -173,7 +172,7 @@ public class Task {
     LOG.info("==================================");
     LOG.info("* Stage " + request.getId() + " is initialized");
     LOG.info("* InterQuery: " + interQuery
-        + (interQuery ? ", Use " + this.shuffleType + " shuffle":"") +
+        + (interQuery ? ", Use " + this.shuffleType + " shuffle" : "") +
         ", Fragments (num: " + request.getFragments().size() + ")" +
         ", Fetches (total:" + request.getFetches().size() + ") :");
 
@@ -190,8 +189,21 @@ public class Task {
     LOG.info("==================================");
   }
 
+  private void startScriptExecutors() throws IOException {
+    for (TajoScriptEngine executor : context.getEvalContext().getAllScriptEngines()) {
+      executor.start(systemConf);
+    }
+  }
+
+  private void stopScriptExecutors() {
+    for (TajoScriptEngine executor : context.getEvalContext().getAllScriptEngines()) {
+      executor.shutdown();
+    }
+  }
+
   public void init() throws IOException {
     initPlan();
+    startScriptExecutors();
 
     if (context.getState() == TaskAttemptState.TA_PENDING) {
       // initialize a task temporal dir
@@ -257,11 +269,13 @@ public class Task {
   }
 
   public void kill() {
+    stopScriptExecutors();
     context.setState(TaskAttemptState.TA_KILLED);
     context.stop();
   }
 
   public void abort() {
+    stopScriptExecutors();
     context.stop();
   }
 
@@ -410,6 +424,7 @@ public class Task {
     } catch (Throwable e) {
       error = e ;
       LOG.error(e.getMessage(), e);
+      stopScriptExecutors();
       context.stop();
     } finally {
       if (executor != null) {
@@ -487,6 +502,7 @@ public class Task {
     }
 
     executionBlockContext.getWorkerContext().getTaskHistoryWriter().appendHistory(taskHistory);
+    stopScriptExecutors();
   }
 
   public TaskHistory createTaskHistory() {
@@ -630,6 +646,7 @@ public class Task {
           if (retryNum == maxRetryNum) {
             LOG.error("ERROR: the maximum retry (" + retryNum + ") on the fetch exceeded (" + fetcher.getURI() + ")");
           }
+          stopScriptExecutors();
           context.stop(); // retry task
           ctx.getFetchLatch().countDown();
         }

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
index 706e9b8..58028ac 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
@@ -32,6 +32,7 @@ import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.enforce.Enforcer;
 import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.plan.expr.EvalContext;
 import org.apache.tajo.plan.expr.EvalNode;
 import org.apache.tajo.storage.HashShuffleAppenderManager;
 import org.apache.tajo.storage.fragment.FileFragment;
@@ -83,6 +84,8 @@ public class TaskAttemptContext {
   private Map<Integer, Long> partitionOutputVolume;
   private HashShuffleAppenderManager hashShuffleAppenderManager;
 
+  private EvalContext evalContext = new EvalContext();
+
   public TaskAttemptContext(QueryContext queryContext, final ExecutionBlockContext executionBlockContext,
                             final TaskAttemptId queryId,
                             final FragmentProto[] fragments,
@@ -403,4 +406,8 @@ public class TaskAttemptContext {
   public HashShuffleAppenderManager getHashShuffleAppenderManager() {
     return hashShuffleAppenderManager;
   }
+
+  public EvalContext getEvalContext() {
+    return evalContext;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/a7453853/tajo-core/src/main/resources/python/__init__.py
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/python/__init__.py b/tajo-core/src/main/resources/python/__init__.py
new file mode 100644
index 0000000..8093a2f
--- /dev/null
+++ b/tajo-core/src/main/resources/python/__init__.py
@@ -0,0 +1,17 @@
+#!/usr/bin/python
+
+############################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
\ No newline at end of file


Mime
View raw message