tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject [29/51] [partial] TAJO-752: Escalate sub modules in tajo-core into the top-level modules. (hyunsik)
Date Fri, 18 Apr 2014 09:19:51 GMT
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/QueryRewriteEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/QueryRewriteEngine.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/QueryRewriteEngine.java
new file mode 100644
index 0000000..cb66582
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/QueryRewriteEngine.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.rewrite;
+
+import org.apache.tajo.engine.planner.LogicalPlan;
+import org.apache.tajo.engine.planner.PlanningException;
+
+public interface QueryRewriteEngine {
+  /**
+   * Rewrite a logical plan with all query rewrite rules added to this engine.
+   *
+   * @param plan The plan to be rewritten with all query rewrite rule.
+   * @return The rewritten plan.
+   */
+  LogicalPlan rewrite(LogicalPlan plan) throws PlanningException;
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/RewriteRule.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/RewriteRule.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/RewriteRule.java
new file mode 100644
index 0000000..89854df
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/RewriteRule.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.rewrite;
+
+import org.apache.tajo.engine.planner.LogicalPlan;
+import org.apache.tajo.engine.planner.PlanningException;
+
+/**
+ * An interface for a rewrite rule.
+ */
+public interface RewriteRule {
+
+  /**
+   * It returns the rewrite rule name. It will be used for debugging and
+   * building a optimization history.
+   *
+   * @return The rewrite rule name
+   */
+  String getName();
+
+  /**
+   * This method checks if this rewrite rule can be applied to a given query plan.
+   * For example, the selection push down can not be applied to the query plan without any filter.
+   * In such case, it will return false.
+   *
+   * @param plan The plan to be checked
+   * @return True if this rule can be applied to a given plan. Otherwise, false.
+   */
+  boolean isEligible(LogicalPlan plan);
+
+  /**
+   * Updates a logical plan and returns an updated logical plan rewritten by this rule.
+   * It must be guaranteed that the input logical plan is not modified even after rewrite.
+   * In other words, the rewrite has to modify an plan copied from the input plan.
+   *
+   * @param plan Input logical plan. It will not be modified.
+   * @return The rewritten logical plan.
+   */
+  LogicalPlan rewrite(LogicalPlan plan) throws PlanningException;
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java
new file mode 100644
index 0000000..44c4ddc
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.query;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.Options;
+import org.apache.tajo.catalog.partition.PartitionMethodDesc;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.planner.logical.NodeType;
+
+import static org.apache.tajo.catalog.proto.CatalogProtos.KeyValueSetProto;
+
+public class QueryContext extends Options {
+  public static final String COMMAND_TYPE = "tajo.query.command";
+
+  public static final String STAGING_DIR = "tajo.query.staging_dir";
+
+  public static final String USER_NAME = "tajo.query.username";
+
+  public static final String OUTPUT_TABLE_NAME = "tajo.query.output.table";
+  public static final String OUTPUT_TABLE_PATH = "tajo.query.output.path";
+  public static final String OUTPUT_PARTITIONS = "tajo.query.output.partitions";
+  public static final String OUTPUT_OVERWRITE = "tajo.query.output.overwrite";
+  public static final String OUTPUT_AS_DIRECTORY = "tajo.query.output.asdirectory";
+
+  public static final String TRUE_VALUE = "1";
+  public static final String FALSE_VALUE = "0";
+
+  public QueryContext() {}
+
+  public QueryContext(KeyValueSetProto proto) {
+    super(proto);
+  }
+
+  public void put(TajoConf.ConfVars key, String value) {
+    put(key.varname, value);
+  }
+
+  public String get(TajoConf.ConfVars key) {
+    return get(key.varname);
+  }
+
+  public String get(String key) {
+    return super.get(key);
+  }
+
+  public void setBool(String key, boolean val) {
+    put(key, val ? TRUE_VALUE : FALSE_VALUE);
+  }
+
+  public boolean getBool(String key) {
+    String strVal = get(key);
+    return strVal != null ? strVal.equals(TRUE_VALUE) : false;
+  }
+
+  public void setUser(String username) {
+    put(USER_NAME, username);
+  }
+
+  public String getUser() {
+    return get(USER_NAME);
+  }
+
+  public void setStagingDir(Path path) {
+    put(STAGING_DIR, path.toUri().toString());
+  }
+
+  public Path getStagingDir() {
+    String strVal = get(STAGING_DIR);
+    return strVal != null ? new Path(strVal) : null;
+  }
+
+  /**
+   * The fact that QueryContext has an output table means this query has a target table.
+   * In other words, this query is 'CREATE TABLE' or 'INSERT (OVERWRITE) INTO <table name>' statement.
+   * This config is not set if a query has INSERT (OVERWRITE) INTO LOCATION '/path/..'.
+   */
+  public boolean hasOutputTable() {
+    return get(OUTPUT_TABLE_NAME) != null;
+  }
+
+  /**
+   * Set a target table name
+   *
+   * @param tableName The target table name
+   */
+  public void setOutputTable(String tableName) {
+    put(OUTPUT_TABLE_NAME, tableName);
+  }
+
+  public String getOutputTable() {
+    String strVal = get(OUTPUT_TABLE_NAME);
+    return strVal != null ? strVal : null;
+  }
+
+  /**
+   * The fact that QueryContext has an output path means this query will write the output to a specific directory.
+   * In other words, this query is 'CREATE TABLE' or 'INSERT (OVERWRITE) INTO (<table name>|LOCATION)' statement.
+   *
+   * @return
+   */
+  public boolean hasOutputPath() {
+    return get(OUTPUT_TABLE_PATH) != null;
+  }
+
+  public void setOutputPath(Path path) {
+    put(OUTPUT_TABLE_PATH, path.toUri().toString());
+  }
+
+  public Path getOutputPath() {
+    String strVal = get(OUTPUT_TABLE_PATH);
+    return strVal != null ? new Path(strVal) : null;
+  }
+
+  public boolean hasPartition() {
+    return get(OUTPUT_PARTITIONS) != null;
+  }
+
+  public void setPartitionMethod(PartitionMethodDesc partitionMethodDesc) {
+    put(OUTPUT_PARTITIONS, partitionMethodDesc != null ? partitionMethodDesc.toJson() : null);
+  }
+
+  public PartitionMethodDesc getPartitionMethod() {
+    return PartitionMethodDesc.fromJson(get(OUTPUT_PARTITIONS));
+  }
+
+  public void setOutputOverwrite() {
+    setBool(OUTPUT_OVERWRITE, true);
+  }
+
+  public boolean isOutputOverwrite() {
+    return getBool(OUTPUT_OVERWRITE);
+  }
+
+  public void setFileOutput() {
+    setBool(OUTPUT_AS_DIRECTORY, true);
+  }
+
+  public boolean isFileOutput() {
+    return getBool(OUTPUT_AS_DIRECTORY);
+  }
+
+  public void setCommandType(NodeType nodeType) {
+    put(COMMAND_TYPE, nodeType.name());
+  }
+
+  public NodeType getCommandType() {
+    String strVal = get(COMMAND_TYPE);
+    return strVal != null ? NodeType.valueOf(strVal) : null;
+  }
+
+  public void setCreateTable() {
+    setCommandType(NodeType.CREATE_TABLE);
+  }
+
+  public boolean isCreateTable() {
+    return getCommandType() == NodeType.CREATE_TABLE;
+  }
+
+  public void setInsert() {
+    setCommandType(NodeType.INSERT);
+  }
+
+  public boolean isInsert() {
+    return getCommandType() == NodeType.INSERT;
+  }
+
+  public void setHiveQueryMode() {
+    setBool("hive.query.mode", true);
+  }
+
+  public boolean isHiveQueryMode() {
+    return getBool("hive.query.mode");
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequest.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequest.java b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequest.java
new file mode 100644
index 0000000..383a787
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequest.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * 
+ */
+package org.apache.tajo.engine.query;
+
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.common.ProtoObject;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.global.DataChannel;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+
+import java.net.URI;
+import java.util.List;
+
+public interface QueryUnitRequest extends ProtoObject<TajoWorkerProtocol.QueryUnitRequestProto> {
+
+	public QueryUnitAttemptId getId();
+	public List<CatalogProtos.FragmentProto> getFragments();
+	public String getOutputTableId();
+	public boolean isClusteredOutput();
+	public String getSerializedData();
+	public boolean isInterQuery();
+	public void setInterQuery();
+	public void addFetch(String name, URI uri);
+	public List<TajoWorkerProtocol.Fetch> getFetches();
+  public boolean shouldDie();
+  public void setShouldDie();
+  public QueryContext getQueryContext();
+  public DataChannel getDataChannel();
+  public Enforcer getEnforcer();
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
new file mode 100644
index 0000000..d4006e0
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
@@ -0,0 +1,328 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.query;
+
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.global.DataChannel;
+import org.apache.tajo.ipc.TajoWorkerProtocol.Fetch;
+import org.apache.tajo.ipc.TajoWorkerProtocol.QueryUnitRequestProto;
+import org.apache.tajo.ipc.TajoWorkerProtocol.QueryUnitRequestProtoOrBuilder;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+
+public class QueryUnitRequestImpl implements QueryUnitRequest {
+	
+  private QueryUnitAttemptId id;
+  private List<FragmentProto> fragments;
+  private String outputTable;
+	private boolean isUpdated;
+	private boolean clusteredOutput;
+	private String serializedData;     // logical node
+	private Boolean interQuery;
+	private List<Fetch> fetches;
+  private Boolean shouldDie;
+  private QueryContext queryContext;
+  private DataChannel dataChannel;
+  private Enforcer enforcer;
+	
+	private QueryUnitRequestProto proto = QueryUnitRequestProto.getDefaultInstance();
+	private QueryUnitRequestProto.Builder builder = null;
+	private boolean viaProto = false;
+	
+	public QueryUnitRequestImpl() {
+		builder = QueryUnitRequestProto.newBuilder();
+		this.id = null;
+		this.isUpdated = false;
+	}
+	
+	public QueryUnitRequestImpl(QueryUnitAttemptId id, List<FragmentProto> fragments,
+			String outputTable, boolean clusteredOutput,
+			String serializedData, QueryContext queryContext, DataChannel channel, Enforcer enforcer) {
+		this();
+		this.set(id, fragments, outputTable, clusteredOutput, serializedData, queryContext, channel, enforcer);
+	}
+	
+	public QueryUnitRequestImpl(QueryUnitRequestProto proto) {
+		this.proto = proto;
+		viaProto = true;
+		id = null;
+		isUpdated = false;
+	}
+	
+	public void set(QueryUnitAttemptId id, List<FragmentProto> fragments,
+			String outputTable, boolean clusteredOutput,
+			String serializedData, QueryContext queryContext, DataChannel dataChannel, Enforcer enforcer) {
+		this.id = id;
+		this.fragments = fragments;
+		this.outputTable = outputTable;
+		this.clusteredOutput = clusteredOutput;
+		this.serializedData = serializedData;
+		this.isUpdated = true;
+    this.queryContext = queryContext;
+    this.queryContext = queryContext;
+    this.dataChannel = dataChannel;
+    this.enforcer = enforcer;
+	}
+
+	@Override
+	public QueryUnitRequestProto getProto() {
+		mergeLocalToProto();
+		proto = viaProto ? proto : builder.build();
+		viaProto = true;
+		return proto;
+	}
+
+	@Override
+	public QueryUnitAttemptId getId() {
+		QueryUnitRequestProtoOrBuilder p = viaProto ? proto : builder;
+		if (id != null) {
+			return this.id;
+		}
+		if (!p.hasId()) {
+			return null;
+		}
+		this.id = new QueryUnitAttemptId(p.getId());
+		return this.id;
+	}
+
+	@Override
+	public List<FragmentProto> getFragments() {
+		QueryUnitRequestProtoOrBuilder p = viaProto ? proto : builder;
+		if (fragments != null) {
+			return fragments;
+		}
+		if (fragments == null) {
+			fragments = new ArrayList<FragmentProto>();
+		}
+		for (int i = 0; i < p.getFragmentsCount(); i++) {
+			fragments.add(p.getFragments(i));
+		}
+		return this.fragments;
+	}
+
+	@Override
+	public String getOutputTableId() {
+		QueryUnitRequestProtoOrBuilder p = viaProto ? proto : builder;
+		if (outputTable != null) {
+			return this.outputTable;
+		}
+		if (!p.hasOutputTable()) {
+			return null;
+		}
+		this.outputTable = p.getOutputTable();
+		return this.outputTable;
+	}
+
+	@Override
+	public boolean isClusteredOutput() {
+		QueryUnitRequestProtoOrBuilder p = viaProto ? proto : builder;
+		if (isUpdated) {
+			return this.clusteredOutput;
+		}
+		if (!p.hasClusteredOutput()) {
+			return false;
+		}
+		this.clusteredOutput = p.getClusteredOutput();
+		this.isUpdated = true;
+		return this.clusteredOutput;
+	}
+
+	@Override
+	public String getSerializedData() {
+		QueryUnitRequestProtoOrBuilder p = viaProto ? proto : builder;
+		if (this.serializedData != null) {
+			return this.serializedData;
+		}
+		if (!p.hasSerializedData()) {
+			return null;
+		}
+		this.serializedData = p.getSerializedData();
+		return this.serializedData;
+	}
+	
+	public boolean isInterQuery() {
+	  QueryUnitRequestProtoOrBuilder p = viaProto ? proto : builder;
+    if (interQuery != null) {
+      return interQuery;
+    }
+    if (!p.hasInterQuery()) {
+      return false;
+    }
+    this.interQuery = p.getInterQuery();
+    return this.interQuery;
+	}
+	
+	public void setInterQuery() {
+	  maybeInitBuilder();
+	  this.interQuery = true;
+	}
+	
+	public void addFetch(String name, URI uri) {
+	  maybeInitBuilder();
+	  initFetches();
+	  fetches.add(
+	  Fetch.newBuilder()
+	    .setName(name)
+	    .setUrls(uri.toString()).build());
+	  
+	}
+
+  public QueryContext getQueryContext() {
+    QueryUnitRequestProtoOrBuilder p = viaProto ? proto : builder;
+    if (queryContext != null) {
+      return queryContext;
+    }
+    if (!p.hasQueryContext()) {
+      return null;
+    }
+    this.queryContext = new QueryContext(p.getQueryContext());
+    return this.queryContext;
+  }
+
+  public void setQueryContext(QueryContext queryContext) {
+    maybeInitBuilder();
+    this.queryContext = queryContext;
+  }
+
+  public void setDataChannel(DataChannel dataChannel) {
+    maybeInitBuilder();
+    this.dataChannel = dataChannel;
+  }
+
+  @Override
+  public DataChannel getDataChannel() {
+    QueryUnitRequestProtoOrBuilder p = viaProto ? proto : builder;
+    if (dataChannel != null) {
+      return dataChannel;
+    }
+    if (!p.hasDataChannel()) {
+      return null;
+    }
+    this.dataChannel = new DataChannel(p.getDataChannel());
+    return this.dataChannel;
+  }
+
+  @Override
+  public Enforcer getEnforcer() {
+    QueryUnitRequestProtoOrBuilder p = viaProto ? proto : builder;
+    if (enforcer != null) {
+      return enforcer;
+    }
+    if (!p.hasEnforcer()) {
+      return null;
+    }
+    this.enforcer = new Enforcer(p.getEnforcer());
+    return this.enforcer;
+  }
+
+  public List<Fetch> getFetches() {
+	  initFetches();    
+
+    return this.fetches;
+	}
+	
+	private void initFetches() {
+	  if (this.fetches != null) {
+      return;
+    }
+    QueryUnitRequestProtoOrBuilder p = viaProto ? proto : builder;
+    this.fetches = new ArrayList<Fetch>();
+    for(Fetch fetch : p.getFetchesList()) {
+      fetches.add(fetch);
+    }
+	}
+
+  @Override
+  public boolean shouldDie() {
+    QueryUnitRequestProtoOrBuilder p = viaProto ? proto : builder;
+    if (shouldDie != null) {
+      return shouldDie;
+    }
+    if (!p.hasShouldDie()) {
+      return false;
+    }
+    this.shouldDie = p.getShouldDie();
+    return this.shouldDie;
+  }
+
+  @Override
+  public void setShouldDie() {
+    maybeInitBuilder();
+    shouldDie = true;
+  }
+
+  private void maybeInitBuilder() {
+		if (viaProto || builder == null) {
+			builder = QueryUnitRequestProto.newBuilder(proto);
+		}
+		viaProto = true;
+	}
+	
+	private void mergeLocalToBuilder() {
+		if (id != null) {
+			builder.setId(this.id.getProto());
+		}
+		if (fragments != null) {
+			for (int i = 0; i < fragments.size(); i++) {
+				builder.addFragments(fragments.get(i));
+			}
+		}
+		if (this.outputTable != null) {
+			builder.setOutputTable(this.outputTable);
+		}
+		if (this.isUpdated) {
+			builder.setClusteredOutput(this.clusteredOutput);
+		}
+		if (this.serializedData != null) {
+			builder.setSerializedData(this.serializedData);
+		}
+		if (this.interQuery != null) {
+		  builder.setInterQuery(this.interQuery);
+		}
+		if (this.fetches != null) {
+		  builder.addAllFetches(this.fetches);
+		}
+    if (this.shouldDie != null) {
+      builder.setShouldDie(this.shouldDie);
+    }
+    if (this.queryContext != null) {
+      builder.setQueryContext(queryContext.getProto());
+    }
+    if (this.dataChannel != null) {
+      builder.setDataChannel(dataChannel.getProto());
+    }
+    if (this.enforcer != null) {
+      builder.setEnforcer(enforcer.getProto());
+    }
+	}
+
+	private void mergeLocalToProto() {
+		if(viaProto) {
+			maybeInitBuilder();
+		}
+		mergeLocalToBuilder();
+		proto = builder.build();
+		viaProto = true;
+	}
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/utils/SchemaUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/SchemaUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/SchemaUtil.java
new file mode 100644
index 0000000..c882607
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/utils/SchemaUtil.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.utils;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
+
+public class SchemaUtil {
+  public static Schema merge(Schema left, Schema right) {
+    Schema merged = new Schema();
+    for(Column col : left.getColumns()) {
+      if (!merged.containsByQualifiedName(col.getQualifiedName())) {
+        merged.addColumn(col);
+      }
+    }
+    for(Column col : right.getColumns()) {
+      if (!merged.containsByQualifiedName(col.getQualifiedName())) {
+        merged.addColumn(col);
+      }
+    }
+    
+    return merged;
+  }
+
+  /**
+   * Get common columns to be used as join keys of natural joins.
+   */
+  public static Schema getNaturalJoinColumns(Schema left, Schema right) {
+    Schema common = new Schema();
+    for (Column outer : left.getColumns()) {
+      if (!common.containsByName(outer.getSimpleName()) && right.containsByName(outer.getSimpleName())) {
+        common.addColumn(new Column(outer.getSimpleName(), outer.getDataType()));
+      }
+    }
+    
+    return common;
+  }
+
+  public static Schema getQualifiedLogicalSchema(TableDesc tableDesc, String tableName) {
+    Schema logicalSchema = new Schema(tableDesc.getLogicalSchema());
+    if (tableName != null) {
+      logicalSchema.setQualifier(tableName);
+    }
+    return logicalSchema;
+  }
+
+  public static <T extends Schema> T clone(Schema schema) {
+    try {
+      T copy = (T) schema.clone();
+      return copy;
+    } catch (CloneNotSupportedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/utils/ThreadUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/ThreadUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/ThreadUtil.java
new file mode 100644
index 0000000..23b1e5d
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/utils/ThreadUtil.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.utils;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import java.io.PrintWriter;
+import java.lang.Thread.UncaughtExceptionHandler;
+
+public class ThreadUtil {
+	protected static final Log LOG = LogFactory.getLog(ThreadUtil.class);
+
+	  /**
+	   * Utility method that sets name, daemon status and starts passed thread.
+	   * @param t thread to run
+	   * @return Returns the passed Thread <code>t</code>.
+	   */
+	  public static Thread setDaemonThreadRunning(final Thread t) {
+	    return setDaemonThreadRunning(t, t.getName());
+	  }
+
+	  /**
+	   * Utility method that sets name, daemon status and starts passed thread.
+	   * @param t thread to frob
+	   * @param name new name
+	   * @return Returns the passed Thread <code>t</code>.
+	   */
+	  public static Thread setDaemonThreadRunning(final Thread t,
+	    final String name) {
+	    return setDaemonThreadRunning(t, name, null);
+	  }
+
+	  /**
+	   * Utility method that sets name, daemon status and starts passed thread.
+	   * @param t thread to frob
+	   * @param name new name
+	   * @param handler A handler to set on the thread.  Pass null if want to
+	   * use default handler.
+	   * @return Returns the passed Thread <code>t</code>.
+	   */
+	  public static Thread setDaemonThreadRunning(final Thread t,
+	    final String name, final UncaughtExceptionHandler handler) {
+	    t.setName(name);
+	    if (handler != null) {
+	      t.setUncaughtExceptionHandler(handler);
+	    }
+	    t.setDaemon(true);
+	    t.start();
+	    return t;
+	  }
+
+	  /**
+	   * Shutdown passed thread using isAlive and join.
+	   * @param t Thread to shutdown
+	   */
+	  public static void shutdown(final Thread t) {
+	    shutdown(t, 0);
+	  }
+
+	  /**
+	   * Shutdown passed thread using isAlive and join.
+	   * @param joinwait Pass 0 if we're to wait forever.
+	   * @param t Thread to shutdown
+	   */
+	  public static void shutdown(final Thread t, final long joinwait) {
+	    if (t == null) return;
+	    while (t.isAlive()) {
+	      try {
+	        t.join(joinwait);
+	      } catch (InterruptedException e) {
+	        LOG.warn(t.getName() + "; joinwait=" + joinwait, e);
+	      }
+	    }
+	  }
+
+
+	  /**
+	   * @param t Waits on the passed thread to die dumping a threaddump every
+	   * minute while its up.
+	   * @throws InterruptedException
+	   */
+	  public static void threadDumpingIsAlive(final Thread t)
+	  throws InterruptedException {
+	    if (t == null) {
+	      return;
+	    }
+
+	    while (t.isAlive()) {
+	      t.join(60 * 1000);
+	      if (t.isAlive()) {
+	        ReflectionUtils.printThreadInfo(new PrintWriter(System.out),
+	            "Automatic Stack Trace every 60 seconds waiting on " +
+	            t.getName());
+	      }
+	    }
+	  }
+
+	  /**
+	   * @param millis How long to sleep for in milliseconds.
+	   */
+	  public static void sleep(int millis) {
+	    try {
+	      Thread.sleep(millis);
+	    } catch (InterruptedException e) {
+	      e.printStackTrace();
+	    }
+	  }
+
+	  /**
+	   * Sleeps for the given amount of time even if interrupted. Preserves
+	   * the interrupt status.
+	   * @param msToWait the amount of time to sleep in milliseconds
+	   */
+	  public static void sleepWithoutInterrupt(final long msToWait) {
+	    long timeMillis = System.currentTimeMillis();
+	    long endTime = timeMillis + msToWait;
+	    boolean interrupted = false;
+	    while (timeMillis < endTime) {
+	      try {
+	        Thread.sleep(endTime - timeMillis);
+	      } catch (InterruptedException ex) {
+	        interrupted = true;
+	      }
+	      timeMillis = System.currentTimeMillis();
+	    }
+
+	    if (interrupted) {
+	      Thread.currentThread().interrupt();
+	    }
+	  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCache.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCache.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCache.java
new file mode 100644
index 0000000..00647b5
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCache.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.utils;
+
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.storage.Tuple;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TupleCache {
+  private static TupleCache instance;
+
+  private Map<TupleCacheKey, List<Tuple>> broadcastTupleCacheData
+      = new HashMap<TupleCacheKey, List<Tuple>>();
+  private Map<TupleCacheKey, TupleCacheStatus> broadcastTupleCacheStatus
+      = new HashMap<TupleCacheKey, TupleCacheStatus>();
+
+  private Object lockMonitor = new Object();
+
+  public static enum TupleCacheStatus {
+    STARTED,
+    ENDED
+  };
+
+  private TupleCache() {
+  }
+
+  public static synchronized TupleCache getInstance() {
+    if (instance == null) {
+      instance = new TupleCache();
+    }
+    return instance;
+  }
+
+  public Object getLockMonitor() {
+    return lockMonitor;
+  }
+
+  public void removeBroadcastCache(ExecutionBlockId ebId) {
+    if (ebId == null) {
+      return;
+    }
+    synchronized (lockMonitor) {
+      TupleCacheKey matchedKey = null;
+      for (TupleCacheKey eachKey: broadcastTupleCacheStatus.keySet()) {
+        if (eachKey.ebId.equals(ebId.toString())) {
+          matchedKey = eachKey;
+          break;
+        }
+      }
+      if (matchedKey != null) {
+        broadcastTupleCacheStatus.remove(matchedKey);
+        broadcastTupleCacheData.remove(matchedKey);
+      }
+    }
+  }
+
+  public void addBroadcastCache(TupleCacheKey cacheKey, List<Tuple> cacheData) {
+    synchronized (lockMonitor) {
+      if (broadcastTupleCacheStatus.containsKey(cacheKey) &&
+          broadcastTupleCacheStatus.get(cacheKey) == TupleCacheStatus.ENDED) {
+        return;
+      }
+      broadcastTupleCacheData.put(cacheKey, cacheData);
+      broadcastTupleCacheStatus.put(cacheKey, TupleCacheStatus.ENDED);
+      lockMonitor.notifyAll();
+    }
+  }
+
+  public boolean lockBroadcastScan(TupleCacheKey cacheKey) {
+    synchronized (lockMonitor) {
+      if (broadcastTupleCacheStatus.containsKey(cacheKey)) {
+        return false;
+      } else {
+        broadcastTupleCacheStatus.put(cacheKey, TupleCacheStatus.STARTED);
+        return true;
+      }
+    }
+  }
+
+  public boolean isBroadcastCacheReady(TupleCacheKey cacheKey) {
+    synchronized (lockMonitor) {
+      if (!broadcastTupleCacheStatus.containsKey(cacheKey)) {
+        return false;
+      }
+      return broadcastTupleCacheStatus.get(cacheKey) == TupleCacheStatus.ENDED;
+    }
+  }
+
+  public TupleCacheScanner openCacheScanner(TupleCacheKey cacheKey, Schema schema) throws IOException {
+    synchronized (lockMonitor) {
+      List<Tuple> cacheData = broadcastTupleCacheData.get(cacheKey);
+      if (cacheData != null) {
+        TupleCacheScanner scanner = new TupleCacheScanner(cacheData, schema);
+        scanner.init();
+        return scanner;
+      } else {
+        return null;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCacheKey.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCacheKey.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCacheKey.java
new file mode 100644
index 0000000..ad9204f
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCacheKey.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.utils;
+
+public class TupleCacheKey {
+  String ebId;
+  String tableName;
+
+  public TupleCacheKey(String ebId, String tableName) {
+    this.ebId = ebId;
+    this.tableName = tableName;
+  }
+
+  public String getEbId() {
+    return ebId;
+  }
+
+  public void setEbId(String ebId) {
+    this.ebId = ebId;
+  }
+
+  public String getTableName() {
+    return tableName;
+  }
+
+  public void setTableName(String tableName) {
+    this.tableName = tableName;
+  }
+
+  @Override
+  public int hashCode() {
+    return toString().hashCode();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    return toString().equals(o.toString());
+  }
+
+  @Override
+  public String toString() {
+    return ebId + "," + tableName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCacheScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCacheScanner.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCacheScanner.java
new file mode 100644
index 0000000..3b91f94
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCacheScanner.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.utils;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.storage.Scanner;
+import org.apache.tajo.storage.Tuple;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+public class TupleCacheScanner implements Scanner {
+  List<Tuple> cacheData;
+  Schema schema;
+  Iterator<Tuple> it;
+  int count;
+  TableStats inputStats = new TableStats();
+
+  public TupleCacheScanner(List<Tuple> cacheData, Schema schema) {
+    this.cacheData = cacheData;
+    this.schema = schema;
+  }
+  @Override
+  public void init() throws IOException {
+    inputStats.setNumRows(cacheData.size());
+    inputStats.setReadBytes(0);
+    it = cacheData.iterator();
+    count = 0;
+  }
+
+  @Override
+  public Tuple next() throws IOException {
+    if (it.hasNext()) {
+      count++;
+      return it.next();
+    } else {
+      return null;
+    }
+  }
+
+  @Override
+  public void reset() throws IOException {
+    init();
+  }
+
+  @Override
+  public void close() throws IOException {
+  }
+
+  @Override
+  public boolean isProjectable() {
+    return true;
+  }
+
+  @Override
+  public void setTarget(Column[] targets) {
+  }
+
+  @Override
+  public boolean isSelectable() {
+    return true;
+  }
+
+  @Override
+  public void setSearchCondition(Object expr) {
+  }
+
+  @Override
+  public boolean isSplittable() {
+    return false;
+  }
+
+  @Override
+  public float getProgress() {
+    if (cacheData.size() == 0) {
+      return 1.0f;
+    }
+    return count / cacheData.size();
+  }
+
+  @Override
+  public TableStats getInputStats() {
+    return inputStats;
+  }
+
+  @Override
+  public Schema getSchema() {
+    return schema;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
new file mode 100644
index 0000000..86f4935
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.utils;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.catalog.statistics.ColumnStats;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.storage.RowStoreUtil;
+import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.TupleRange;
+import org.apache.tajo.storage.VTuple;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class TupleUtil {
+
+  public static String rangeToQuery(Schema schema, TupleRange range, boolean last)
+      throws UnsupportedEncodingException {
+    return rangeToQuery(range, last, RowStoreUtil.createEncoder(schema));
+  }
+
+  public static String rangeToQuery(TupleRange range, boolean last, RowStoreEncoder encoder)
+      throws UnsupportedEncodingException {
+    StringBuilder sb = new StringBuilder();
+    byte [] firstKeyBytes = encoder.toBytes(range.getStart());
+    byte [] endKeyBytes = encoder.toBytes(range.getEnd());
+
+    String firstKeyBase64 = new String(Base64.encodeBase64(firstKeyBytes));
+    String lastKeyBase64 = new String(Base64.encodeBase64(endKeyBytes));
+
+    sb.append("start=")
+        .append(URLEncoder.encode(firstKeyBase64, "utf-8"))
+        .append("&")
+        .append("end=")
+        .append(URLEncoder.encode(lastKeyBase64, "utf-8"));
+
+    if (last) {
+      sb.append("&final=true");
+    }
+
+    return sb.toString();
+  }
+
+  public static TupleRange columnStatToRange(SortSpec [] sortSpecs, Schema target, List<ColumnStats> colStats) {
+
+    Map<Column, ColumnStats> statSet = Maps.newHashMap();
+    for (ColumnStats stat : colStats) {
+      statSet.put(stat.getColumn(), stat);
+    }
+
+    for (Column col : target.getColumns()) {
+      Preconditions.checkState(statSet.containsKey(col),
+          "ERROR: Invalid Column Stats (column stats: " + colStats + ", there exists not target " + col);
+    }
+
+    Tuple startTuple = new VTuple(target.size());
+    Tuple endTuple = new VTuple(target.size());
+    int i = 0;
+
+    // In outer join, empty table could be searched.
+    // As a result, min value and max value would be null.
+    // So, we should put NullDatum for this case.
+    for (Column col : target.getColumns()) {
+      if (sortSpecs[i].isAscending()) {
+        if (statSet.get(col).getMinValue() != null)
+          startTuple.put(i, statSet.get(col).getMinValue());
+        else
+          startTuple.put(i, DatumFactory.createNullDatum());
+
+        if (statSet.get(col).getMaxValue() != null)
+          endTuple.put(i, statSet.get(col).getMaxValue());
+        else
+          endTuple.put(i, DatumFactory.createNullDatum());
+      } else {
+        if (statSet.get(col).getMaxValue() != null)
+          startTuple.put(i, statSet.get(col).getMaxValue());
+        else
+          startTuple.put(i, DatumFactory.createNullDatum());
+
+        if (statSet.get(col).getMinValue() != null)
+          endTuple.put(i, statSet.get(col).getMinValue());
+        else
+          endTuple.put(i, DatumFactory.createNullDatum());
+      }
+      i++;
+    }
+    return new TupleRange(sortSpecs, startTuple, endTuple);
+  }
+
+  /**
+   * It creates a tuple of a given size filled with NULL values in all fields
+   * It is usually used in outer join algorithms.
+   *
+   * @param size The number of columns of a creating tuple
+   * @return The created tuple filled with NULL values
+   */
+  public static Tuple createNullPaddedTuple(int size){
+    VTuple aTuple = new VTuple(size);
+    int i;
+    for(i = 0; i < size; i++){
+      aTuple.put(i, DatumFactory.createNullDatum());
+    }
+    return aTuple;
+  }
+
+  @SuppressWarnings("unused")
+  public static Collection<Tuple> filterTuple(Schema schema, Collection<Tuple> tupleBlock, EvalNode filterCondition) {
+    TupleBlockFilterScanner filter = new TupleBlockFilterScanner(schema, tupleBlock, filterCondition);
+    return filter.nextBlock();
+  }
+
+  private static class TupleBlockFilterScanner {
+    private EvalNode qual;
+    private Iterator<Tuple> iterator;
+    private Schema schema;
+
+    public TupleBlockFilterScanner(Schema schema, Collection<Tuple> tuples, EvalNode qual) {
+      this.schema = schema;
+      this.qual = qual;
+      this.iterator = tuples.iterator();
+    }
+
+    public List<Tuple> nextBlock() {
+      List<Tuple> results = Lists.newArrayList();
+
+      Tuple tuple;
+      while (iterator.hasNext()) {
+        tuple = iterator.next();
+        if (qual.eval(schema, tuple).isTrue()) {
+          results.add(tuple);
+        }
+      }
+      return results;
+    }
+  }
+
+  /**
+   * Take a look at a column partition path. A partition path consists
+   * of a table path part and column values part. This method transforms
+   * a partition path into a tuple with a given partition column schema.
+   *
+   * hdfs://192.168.0.1/tajo/warehouse/table1/col1=abc/col2=def/col3=ghi
+   *                   ^^^^^^^^^^^^^^^^^^^^^  ^^^^^^^^^^^^^^^^^^^^^^^^^^
+   *                      table path part        column values part
+   *
+   * When a file path is given, it can perform two ways depending on beNullIfFile flag.
+   * If it is true, it returns NULL when a given path is a file.
+   * Otherwise, it returns a built tuple regardless of file or directory.
+   *
+   * @param partitionColumnSchema The partition column schema
+   * @param partitionPath The partition path
+   * @param beNullIfFile If true, this method returns NULL when a given path is a file.
+   * @return The tuple transformed from a column values part.
+   */
+  public static Tuple buildTupleFromPartitionPath(Schema partitionColumnSchema, Path partitionPath,
+                                                  boolean beNullIfFile) {
+    int startIdx = partitionPath.toString().indexOf(getColumnPartitionPathPrefix(partitionColumnSchema));
+
+    if (startIdx == -1) { // if there is no partition column in the patch
+      return null;
+    }
+    String columnValuesPart = partitionPath.toString().substring(startIdx);
+
+    String [] columnValues = columnValuesPart.split("/");
+
+    // true means this is a file.
+    if (beNullIfFile && partitionColumnSchema.size() < columnValues.length) {
+      return null;
+    }
+
+    Tuple tuple = new VTuple(partitionColumnSchema.size());
+    int i = 0;
+    for (; i < columnValues.length && i < partitionColumnSchema.size(); i++) {
+      String [] parts = columnValues[i].split("=");
+      if (parts.length != 2) {
+        return null;
+      }
+      int columnId = partitionColumnSchema.getColumnIdByName(parts[0]);
+      Column keyColumn = partitionColumnSchema.getColumn(columnId);
+      tuple.put(columnId, DatumFactory.createFromString(keyColumn.getDataType(), parts[1]));
+    }
+    for (; i < partitionColumnSchema.size(); i++) {
+      tuple.put(i, NullDatum.get());
+    }
+    return tuple;
+  }
+
+  /**
+   * Get a prefix of column partition path. For example, consider a column partition (col1, col2).
+   * Then, you will get a string 'col1='.
+   *
+   * @param partitionColumn the schema of column partition
+   * @return The first part string of column partition path.
+   */
+  private static String getColumnPartitionPathPrefix(Schema partitionColumn) {
+    StringBuilder sb = new StringBuilder();
+    sb.append(partitionColumn.getColumn(0).getSimpleName()).append("=");
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/AbstractTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/AbstractTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/AbstractTaskScheduler.java
new file mode 100644
index 0000000..320a5aa
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/AbstractTaskScheduler.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tajo.master.event.TaskRequestEvent;
+import org.apache.tajo.master.event.TaskSchedulerEvent;
+
+
+public abstract class AbstractTaskScheduler extends AbstractService implements EventHandler<TaskSchedulerEvent> {
+
+  protected int hostLocalAssigned;
+  protected int rackLocalAssigned;
+  protected int totalAssigned;
+
+  /**
+   * Construct the service.
+   *
+   * @param name service name
+   */
+  public AbstractTaskScheduler(String name) {
+    super(name);
+  }
+
+  public int getHostLocalAssigned() {
+    return hostLocalAssigned;
+  }
+
+  public int getRackLocalAssigned() {
+    return rackLocalAssigned;
+  }
+
+  public int getTotalAssigned() {
+    return totalAssigned;
+  }
+
+  public abstract void handleTaskRequestEvent(TaskRequestEvent event);
+  public abstract int remainingScheduledObjectNum();
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/ContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/ContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/ContainerProxy.java
new file mode 100644
index 0000000..59b071a
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/ContainerProxy.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.master.querymaster.QueryMasterTask;
+
+public abstract class ContainerProxy {
+  protected static final Log LOG = LogFactory.getLog(ContainerProxy.class);
+
+  final public static FsPermission QUERYCONF_FILE_PERMISSION =
+          FsPermission.createImmutable((short) 0644); // rw-r--r--
+
+
+  protected static enum ContainerState {
+    PREP, FAILED, RUNNING, DONE, KILLED_BEFORE_LAUNCH
+  }
+
+  protected final ExecutionBlockId executionBlockId;
+  protected Configuration conf;
+  protected QueryMasterTask.QueryMasterTaskContext context;
+
+  protected ContainerState state;
+  // store enough information to be able to cleanup the container
+  protected Container container;
+  protected ContainerId containerID;
+  protected String hostName;
+  protected int port = -1;
+
+  public abstract void launch(ContainerLaunchContext containerLaunchContext);
+  public abstract void stopContainer();
+
+  public ContainerProxy(QueryMasterTask.QueryMasterTaskContext context, Configuration conf,
+                        ExecutionBlockId executionBlockId, Container container) {
+    this.context = context;
+    this.conf = conf;
+    this.state = ContainerState.PREP;
+    this.container = container;
+    this.executionBlockId = executionBlockId;
+    this.containerID = container.getId();
+  }
+
+  public synchronized boolean isCompletelyDone() {
+    return state == ContainerState.DONE || state == ContainerState.FAILED;
+  }
+
+  public String getTaskHostName() {
+    return this.hostName;
+  }
+
+  public int getTaskPort() {
+    return this.port;
+  }
+
+  public String getId() {
+    return executionBlockId.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/DefaultFragmentScheduleAlgorithm.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/DefaultFragmentScheduleAlgorithm.java b/tajo-core/src/main/java/org/apache/tajo/master/DefaultFragmentScheduleAlgorithm.java
new file mode 100644
index 0000000..e4b98d4
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/DefaultFragmentScheduleAlgorithm.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.util.RackResolver;
+import org.apache.tajo.util.NetUtils;
+
+import java.util.*;
+import java.util.Map.Entry;
+
+/**
+ * DefaultFragmentScheduleAlgorithm selects a fragment randomly for the given argument.
+ * For example, when getHostLocalFragment(host, disk) is called, this algorithm randomly selects a fragment among
+ * the fragments which are stored at the disk of the host specified by the arguments.
+ */
+public class DefaultFragmentScheduleAlgorithm implements FragmentScheduleAlgorithm {
+  private final static Log LOG = LogFactory.getLog(DefaultFragmentScheduleAlgorithm.class);
+  private Map<String, Map<Integer, FragmentsPerDisk>> fragmentHostMapping =
+      new HashMap<String, Map<Integer, FragmentsPerDisk>>();
+  private Map<String, Set<FragmentPair>> rackFragmentMapping =
+      new HashMap<String, Set<FragmentPair>>();
+  private int fragmentNum = 0;
+  private Random random = new Random(System.currentTimeMillis());
+
+  public static class FragmentsPerDisk {
+    private Integer diskId;
+    private Set<FragmentPair> fragmentPairSet;
+
+    public FragmentsPerDisk(Integer diskId) {
+      this.diskId = diskId;
+      this.fragmentPairSet = Collections.newSetFromMap(new HashMap<FragmentPair, Boolean>());
+    }
+
+    public Integer getDiskId() {
+      return diskId;
+    }
+
+    public Set<FragmentPair> getFragmentPairSet() {
+      return fragmentPairSet;
+    }
+
+    public void addFragmentPair(FragmentPair fragmentPair) {
+      fragmentPairSet.add(fragmentPair);
+    }
+
+    public boolean removeFragmentPair(FragmentPair fragmentPair) {
+      return fragmentPairSet.remove(fragmentPair);
+    }
+
+    public int size() {
+      return fragmentPairSet.size();
+    }
+
+    public Iterator<FragmentPair> getFragmentPairIterator() {
+      return fragmentPairSet.iterator();
+    }
+
+    public boolean isEmpty() {
+      return fragmentPairSet.isEmpty();
+    }
+  }
+
+  @Override
+  public void addFragment(FragmentPair fragmentPair) {
+    String[] hosts = fragmentPair.getLeftFragment().getHosts();
+    int[] diskIds = fragmentPair.getLeftFragment().getDiskIds();
+    for (int i = 0; i < hosts.length; i++) {
+      addFragment(hosts[i], diskIds[i], fragmentPair);
+    }
+    fragmentNum++;
+  }
+
+  private void addFragment(String host, Integer diskId, FragmentPair fragmentPair) {
+    // update the fragment maps per host
+    String normalizeHost = NetUtils.normalizeHost(host);
+    Map<Integer, FragmentsPerDisk> diskFragmentMap;
+    if (fragmentHostMapping.containsKey(normalizeHost)) {
+      diskFragmentMap = fragmentHostMapping.get(normalizeHost);
+    } else {
+      diskFragmentMap = new HashMap<Integer, FragmentsPerDisk>();
+      fragmentHostMapping.put(normalizeHost, diskFragmentMap);
+    }
+    FragmentsPerDisk fragmentsPerDisk;
+    if (diskFragmentMap.containsKey(diskId)) {
+      fragmentsPerDisk = diskFragmentMap.get(diskId);
+    } else {
+      fragmentsPerDisk = new FragmentsPerDisk(diskId);
+      diskFragmentMap.put(diskId, fragmentsPerDisk);
+    }
+    fragmentsPerDisk.addFragmentPair(fragmentPair);
+
+    // update the fragment maps per rack
+    String rack = RackResolver.resolve(normalizeHost).getNetworkLocation();
+    Set<FragmentPair> fragmentPairList;
+    if (rackFragmentMapping.containsKey(rack)) {
+      fragmentPairList = rackFragmentMapping.get(rack);
+    } else {
+      fragmentPairList = Collections.newSetFromMap(new HashMap<FragmentPair, Boolean>());
+      rackFragmentMapping.put(rack, fragmentPairList);
+    }
+    fragmentPairList.add(fragmentPair);
+  }
+
+  @Override
+  public void removeFragment(FragmentPair fragmentPair) {
+    boolean removed = false;
+    for (String eachHost : fragmentPair.getLeftFragment().getHosts()) {
+      String normalizedHost = NetUtils.normalizeHost(eachHost);
+      Map<Integer, FragmentsPerDisk> diskFragmentMap = fragmentHostMapping.get(normalizedHost);
+      for (Entry<Integer, FragmentsPerDisk> entry : diskFragmentMap.entrySet()) {
+        FragmentsPerDisk fragmentsPerDisk = entry.getValue();
+        removed = fragmentsPerDisk.removeFragmentPair(fragmentPair);
+        if (removed) {
+          if (fragmentsPerDisk.size() == 0) {
+            diskFragmentMap.remove(entry.getKey());
+          }
+          if (diskFragmentMap.size() == 0) {
+            fragmentHostMapping.remove(normalizedHost);
+          }
+          break;
+        }
+      }
+      String rack = RackResolver.resolve(normalizedHost).getNetworkLocation();
+      if (rackFragmentMapping.containsKey(rack)) {
+        Set<FragmentPair> fragmentPairs = rackFragmentMapping.get(rack);
+        fragmentPairs.remove(fragmentPair);
+        if (fragmentPairs.size() == 0) {
+          rackFragmentMapping.remove(rack);
+        }
+      }
+    }
+    if (removed) {
+      fragmentNum--;
+    }
+  }
+
+  /**
+   * Randomly select a fragment among the fragments stored on the host.
+   * @param host
+   * @return a randomly selected fragment
+   */
+  @Override
+  public FragmentPair getHostLocalFragment(String host) {
+    String normalizedHost = NetUtils.normalizeHost(host);
+    if (fragmentHostMapping.containsKey(normalizedHost)) {
+      Collection<FragmentsPerDisk> disks = fragmentHostMapping.get(normalizedHost).values();
+      Iterator<FragmentsPerDisk> diskIterator = disks.iterator();
+      int randomIndex = random.nextInt(disks.size());
+      FragmentsPerDisk fragmentsPerDisk = null;
+      for (int i = 0; i < randomIndex; i++) {
+        fragmentsPerDisk = diskIterator.next();
+      }
+
+      if (fragmentsPerDisk != null) {
+        Iterator<FragmentPair> fragmentIterator = fragmentsPerDisk.getFragmentPairIterator();
+        if (fragmentIterator.hasNext()) {
+          return fragmentIterator.next();
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Randomly select a fragment among the fragments stored at the disk of the host.
+   * @param host
+   * @param diskId
+   * @return a randomly selected fragment
+   */
+  @Override
+  public FragmentPair getHostLocalFragment(String host, Integer diskId) {
+    String normalizedHost = NetUtils.normalizeHost(host);
+    if (fragmentHostMapping.containsKey(normalizedHost)) {
+      Map<Integer, FragmentsPerDisk> fragmentsPerDiskMap = fragmentHostMapping.get(normalizedHost);
+      if (fragmentsPerDiskMap.containsKey(diskId)) {
+        FragmentsPerDisk fragmentsPerDisk = fragmentsPerDiskMap.get(diskId);
+        if (!fragmentsPerDisk.isEmpty()) {
+          return fragmentsPerDisk.getFragmentPairIterator().next();
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Randomly select a fragment among the fragments stored on nodes of the same rack with the host.
+   * @param host
+   * @return a randomly selected fragment
+   */
+  @Override
+  public FragmentPair getRackLocalFragment(String host) {
+    String rack = RackResolver.resolve(host).getNetworkLocation();
+    if (rackFragmentMapping.containsKey(rack)) {
+      Set<FragmentPair> fragmentPairs = rackFragmentMapping.get(rack);
+      if (!fragmentPairs.isEmpty()) {
+        return fragmentPairs.iterator().next();
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Randomly select a fragment among the total fragments.
+   * @return a randomly selected fragment
+   */
+  @Override
+  public FragmentPair getRandomFragment() {
+    if (!fragmentHostMapping.isEmpty()) {
+      return fragmentHostMapping.values().iterator().next().values().iterator().next().getFragmentPairIterator().next();
+    }
+    return null;
+  }
+
+  @Override
+  public FragmentPair[] getAllFragments() {
+    List<FragmentPair> fragmentPairs = new ArrayList<FragmentPair>();
+    for (Map<Integer, FragmentsPerDisk> eachDiskFragmentMap : fragmentHostMapping.values()) {
+      for (FragmentsPerDisk fragmentsPerDisk : eachDiskFragmentMap.values()) {
+        fragmentPairs.addAll(fragmentsPerDisk.fragmentPairSet);
+      }
+    }
+    return fragmentPairs.toArray(new FragmentPair[fragmentPairs.size()]);
+  }
+
+  @Override
+  public int size() {
+    return fragmentNum;
+  }
+}


Mime
View raw message