tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject [23/57] [abbrv] [partial] TAJO-752: Escalate sub modules in tajo-core into the top-level modules. (hyunsik)
Date Fri, 18 Apr 2014 11:44:26 GMT
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
new file mode 100644
index 0000000..23b0def
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
@@ -0,0 +1,556 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.querymaster;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.tajo.*;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.parser.HiveQLAnalyzer;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.logical.NodeType;
+import org.apache.tajo.engine.planner.logical.ScanNode;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.master.GlobalEngine;
+import org.apache.tajo.master.TajoAsyncDispatcher;
+import org.apache.tajo.master.TajoContainerProxy;
+import org.apache.tajo.master.event.*;
+import org.apache.tajo.master.rm.TajoWorkerResourceManager;
+import org.apache.tajo.master.session.Session;
+import org.apache.tajo.rpc.CallFuture;
+import org.apache.tajo.rpc.NettyClientBase;
+import org.apache.tajo.rpc.RpcConnectionPool;
+import org.apache.tajo.storage.AbstractStorageManager;
+import org.apache.tajo.util.metrics.TajoMetrics;
+import org.apache.tajo.util.metrics.reporter.MetricsConsoleReporter;
+import org.apache.tajo.worker.AbstractResourceAllocator;
+import org.apache.tajo.worker.TajoResourceAllocator;
+import org.apache.tajo.worker.YarnResourceAllocator;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.tajo.TajoProtos.QueryState;
+
+public class QueryMasterTask extends CompositeService {
+  private static final Log LOG = LogFactory.getLog(QueryMasterTask.class.getName());
+
+  // query submission directory is private!
+  final public static FsPermission STAGING_DIR_PERMISSION =
+      FsPermission.createImmutable((short) 0700); // rwx--------
+
+  private QueryId queryId;
+
+  private Session session;
+
+  private QueryContext queryContext;
+
+  private QueryMasterTaskContext queryTaskContext;
+
+  private QueryMaster.QueryMasterContext queryMasterContext;
+
+  private Query query;
+
+  private MasterPlan masterPlan;
+
+  private String sql;
+
+  private String logicalPlanJson;
+
+  private TajoAsyncDispatcher dispatcher;
+
+  private final long querySubmitTime;
+
+  private Map<String, TableDesc> tableDescMap = new HashMap<String, TableDesc>();
+
+  private TajoConf systemConf;
+
+  private AtomicLong lastClientHeartbeat = new AtomicLong(-1);
+
+  private AbstractResourceAllocator resourceAllocator;
+
+  private AtomicBoolean stopped = new AtomicBoolean(false);
+
+  private TajoMetrics queryMetrics;
+
+  private Throwable initError;
+
+  public QueryMasterTask(QueryMaster.QueryMasterContext queryMasterContext,
+                         QueryId queryId, Session session, QueryContext queryContext, String sql,
+                         String logicalPlanJson) {
+
+    super(QueryMasterTask.class.getName());
+    this.queryMasterContext = queryMasterContext;
+    this.queryId = queryId;
+    this.session = session;
+    this.queryContext = queryContext;
+    this.sql = sql;
+    this.logicalPlanJson = logicalPlanJson;
+    this.querySubmitTime = System.currentTimeMillis();
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    systemConf = (TajoConf)conf;
+
+    try {
+      queryTaskContext = new QueryMasterTaskContext();
+      String resourceManagerClassName = systemConf.getVar(TajoConf.ConfVars.RESOURCE_MANAGER_CLASS);
+
+      if(resourceManagerClassName.indexOf(TajoWorkerResourceManager.class.getName()) >= 0) {
+        resourceAllocator = new TajoResourceAllocator(queryTaskContext);
+      } else {
+        resourceAllocator = new YarnResourceAllocator(queryTaskContext);
+      }
+      addService(resourceAllocator);
+
+      dispatcher = new TajoAsyncDispatcher(queryId.toString());
+      addService(dispatcher);
+
+      dispatcher.register(SubQueryEventType.class, new SubQueryEventDispatcher());
+      dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
+      dispatcher.register(TaskAttemptEventType.class, new TaskAttemptEventDispatcher());
+      dispatcher.register(QueryMasterQueryCompletedEvent.EventType.class, new QueryFinishEventHandler());
+      dispatcher.register(TaskSchedulerEvent.EventType.class, new TaskSchedulerDispatcher());
+      dispatcher.register(LocalTaskEventType.class, new LocalTaskEventHandler());
+
+      initStagingDir();
+
+      queryMetrics = new TajoMetrics(queryId.toString());
+
+      super.init(systemConf);
+    } catch (Throwable t) {
+      LOG.error(t.getMessage(), t);
+      initError = t;
+    }
+  }
+
+  public boolean isStopped() {
+    return stopped.get();
+  }
+
+  @Override
+  public void start() {
+    startQuery();
+    super.start();
+  }
+
+  @Override
+  public void stop() {
+
+    if(stopped.getAndSet(true)) {
+      return;
+    }
+
+    LOG.info("Stopping QueryMasterTask:" + queryId);
+
+    CallFuture future = new CallFuture();
+
+    RpcConnectionPool connPool = RpcConnectionPool.getPool(queryMasterContext.getConf());
+    NettyClientBase tmClient = null;
+    try {
+      tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
+          TajoMasterProtocol.class, true);
+      TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
+      masterClientService.stopQueryMaster(null, queryId.getProto(), future);
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+    } finally {
+      connPool.releaseConnection(tmClient);
+    }
+
+    try {
+      future.get(3, TimeUnit.SECONDS);
+    } catch (Throwable t) {
+      LOG.warn(t);
+    }
+
+    super.stop();
+
+    //TODO change report to tajo master
+    queryMetrics.report(new MetricsConsoleReporter());
+
+    LOG.info("Stopped QueryMasterTask:" + queryId);
+  }
+
+  public void handleTaskRequestEvent(TaskRequestEvent event) {
+    ExecutionBlockId id = event.getExecutionBlockId();
+    query.getSubQuery(id).handleTaskRequestEvent(event);
+  }
+
+  private class SubQueryEventDispatcher implements EventHandler<SubQueryEvent> {
+    public void handle(SubQueryEvent event) {
+      ExecutionBlockId id = event.getSubQueryId();
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("SubQueryEventDispatcher:" + id + "," + event.getType());
+      }
+      //Query query = queryMasterTasks.get(id.getQueryId()).getQuery();
+      query.getSubQuery(id).handle(event);
+    }
+  }
+
+  private class TaskEventDispatcher
+      implements EventHandler<TaskEvent> {
+    public void handle(TaskEvent event) {
+      QueryUnitId taskId = event.getTaskId();
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("TaskEventDispatcher>" + taskId + "," + event.getType());
+      }
+      //Query query = queryMasterTasks.get(taskId.getExecutionBlockId().getQueryId()).getQuery();
+      QueryUnit task = query.getSubQuery(taskId.getExecutionBlockId()).
+          getQueryUnit(taskId);
+      task.handle(event);
+    }
+  }
+
+  private class TaskAttemptEventDispatcher
+      implements EventHandler<TaskAttemptEvent> {
+    public void handle(TaskAttemptEvent event) {
+      QueryUnitAttemptId attemptId = event.getTaskAttemptId();
+      //Query query = queryMasterTasks.get(attemptId.getQueryUnitId().getExecutionBlockId().getQueryId()).getQuery();
+      SubQuery subQuery = query.getSubQuery(attemptId.getQueryUnitId().getExecutionBlockId());
+      QueryUnit task = subQuery.getQueryUnit(attemptId.getQueryUnitId());
+      QueryUnitAttempt attempt = task.getAttempt(attemptId);
+      attempt.handle(event);
+    }
+  }
+
+  private class TaskSchedulerDispatcher
+      implements EventHandler<TaskSchedulerEvent> {
+    public void handle(TaskSchedulerEvent event) {
+      //Query query = queryMasterTasks.get(event.getExecutionBlockId().getQueryId()).getQuery();
+      SubQuery subQuery = query.getSubQuery(event.getExecutionBlockId());
+      subQuery.getTaskScheduler().handle(event);
+    }
+  }
+
+  private class LocalTaskEventHandler implements EventHandler<LocalTaskEvent> {
+    @Override
+    public void handle(LocalTaskEvent event) {
+      TajoContainerProxy proxy = (TajoContainerProxy) resourceAllocator.getContainers().get(event.getContainerId());
+      if (proxy != null) {
+        proxy.killTaskAttempt(event.getTaskAttemptId());
+      }
+    }
+  }
+
+  private class QueryFinishEventHandler implements EventHandler<QueryMasterQueryCompletedEvent> {
+    @Override
+    public void handle(QueryMasterQueryCompletedEvent event) {
+      QueryId queryId = event.getQueryId();
+      LOG.info("Query completion notified from " + queryId);
+
+      while (!isTerminatedState(query.getState())) {
+        try {
+          synchronized (this) {
+            wait(10);
+          }
+        } catch (InterruptedException e) {
+          LOG.error(e);
+        }
+      }
+      LOG.info("Query final state: " + query.getState());
+      queryMasterContext.stopQuery(queryId);
+    }
+
+    private boolean isTerminatedState(QueryState state) {
+      return
+          state == QueryState.QUERY_SUCCEEDED ||
+          state == QueryState.QUERY_FAILED ||
+          state == QueryState.QUERY_KILLED ||
+          state == QueryState.QUERY_ERROR;
+    }
+  }
+
+  public synchronized void startQuery() {
+    try {
+      if (query != null) {
+        LOG.warn("Query already started");
+        return;
+      }
+      CatalogService catalog = getQueryTaskContext().getQueryMasterContext().getWorkerContext().getCatalog();
+      LogicalPlanner planner = new LogicalPlanner(catalog);
+      LogicalOptimizer optimizer = new LogicalOptimizer(systemConf);
+      Expr expr;
+      if (queryContext.isHiveQueryMode()) {
+        HiveQLAnalyzer HiveQLAnalyzer = new HiveQLAnalyzer();
+        expr = HiveQLAnalyzer.parse(sql);
+      } else {
+        SQLAnalyzer analyzer = new SQLAnalyzer();
+        expr = analyzer.parse(sql);
+      }
+      LogicalPlan plan = planner.createPlan(session, expr);
+      optimizer.optimize(plan);
+
+      GlobalEngine.DistributedQueryHookManager hookManager = new GlobalEngine.DistributedQueryHookManager();
+      hookManager.addHook(new GlobalEngine.InsertHook());
+      hookManager.doHooks(queryContext, plan);
+
+      for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) {
+        LogicalNode[] scanNodes = PlannerUtil.findAllNodes(block.getRoot(), NodeType.SCAN);
+        if (scanNodes != null) {
+          for (LogicalNode eachScanNode : scanNodes) {
+            ScanNode scanNode = (ScanNode) eachScanNode;
+            tableDescMap.put(scanNode.getCanonicalName(), scanNode.getTableDesc());
+          }
+        }
+
+        scanNodes = PlannerUtil.findAllNodes(block.getRoot(), NodeType.PARTITIONS_SCAN);
+        if (scanNodes != null) {
+          for (LogicalNode eachScanNode : scanNodes) {
+            ScanNode scanNode = (ScanNode) eachScanNode;
+            tableDescMap.put(scanNode.getCanonicalName(), scanNode.getTableDesc());
+          }
+        }
+      }
+      MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan);
+      queryMasterContext.getGlobalPlanner().build(masterPlan);
+
+      query = new Query(queryTaskContext, queryId, querySubmitTime,
+          "", queryTaskContext.getEventHandler(), masterPlan);
+
+      dispatcher.register(QueryEventType.class, query);
+      queryTaskContext.getEventHandler().handle(new QueryEvent(queryId, QueryEventType.START));
+    } catch (Throwable t) {
+      LOG.error(t.getMessage(), t);
+      initError = t;
+    }
+  }
+
+  /**
+   * It initializes the final output and staging directory and sets
+   * them to variables.
+   */
+  private void initStagingDir() throws IOException {
+
+    String realUser;
+    String currentUser;
+    UserGroupInformation ugi;
+    ugi = UserGroupInformation.getLoginUser();
+    realUser = ugi.getShortUserName();
+    currentUser = UserGroupInformation.getCurrentUser().getShortUserName();
+    FileSystem defaultFS = TajoConf.getWarehouseDir(systemConf).getFileSystem(systemConf);
+
+    Path stagingDir = null;
+    Path outputDir = null;
+    try {
+      ////////////////////////////////////////////
+      // Create Output Directory
+      ////////////////////////////////////////////
+
+      stagingDir = new Path(TajoConf.getStagingDir(systemConf), queryId.toString());
+
+      if (defaultFS.exists(stagingDir)) {
+        throw new IOException("The staging directory '" + stagingDir + "' already exists");
+      }
+      defaultFS.mkdirs(stagingDir, new FsPermission(STAGING_DIR_PERMISSION));
+      FileStatus fsStatus = defaultFS.getFileStatus(stagingDir);
+      String owner = fsStatus.getOwner();
+
+      if (!owner.isEmpty() && !(owner.equals(currentUser) || owner.equals(realUser))) {
+        throw new IOException("The ownership on the user's query " +
+            "directory " + stagingDir + " is not as expected. " +
+            "It is owned by " + owner + ". The directory must " +
+            "be owned by the submitter " + currentUser + " or " +
+            "by " + realUser);
+      }
+
+      if (!fsStatus.getPermission().equals(STAGING_DIR_PERMISSION)) {
+        LOG.info("Permissions on staging directory " + stagingDir + " are " +
+            "incorrect: " + fsStatus.getPermission() + ". Fixing permissions " +
+            "to correct value " + STAGING_DIR_PERMISSION);
+        defaultFS.setPermission(stagingDir, new FsPermission(STAGING_DIR_PERMISSION));
+      }
+
+      // Create a subdirectories
+      defaultFS.mkdirs(new Path(stagingDir, TajoConstants.RESULT_DIR_NAME));
+      LOG.info("The staging dir '" + stagingDir + "' is created.");
+      queryContext.setStagingDir(stagingDir);
+
+      /////////////////////////////////////////////////
+      // Check and Create Output Directory If Necessary
+      /////////////////////////////////////////////////
+      if (queryContext.hasOutputPath()) {
+        outputDir = queryContext.getOutputPath();
+        if (!queryContext.isOutputOverwrite()) {
+          if (defaultFS.exists(outputDir)) {
+            throw new IOException("The output directory '" + outputDir + " already exists.");
+          }
+        }
+      }
+    } catch (IOException ioe) {
+      if (stagingDir != null && defaultFS.exists(stagingDir)) {
+        defaultFS.delete(stagingDir, true);
+        LOG.info("The staging directory '" + stagingDir + "' is deleted");
+      }
+
+      throw ioe;
+    }
+  }
+
+  public Query getQuery() {
+    return query;
+  }
+
+  public void expiredSessionTimeout() {
+    stop();
+  }
+
+  public QueryMasterTaskContext getQueryTaskContext() {
+    return queryTaskContext;
+  }
+
+  public EventHandler getEventHandler() {
+    return queryTaskContext.getEventHandler();
+  }
+
+  public void touchSessionTime() {
+    this.lastClientHeartbeat.set(System.currentTimeMillis());
+  }
+
+  public long getLastClientHeartbeat() {
+    return this.lastClientHeartbeat.get();
+  }
+
+  public QueryId getQueryId() {
+    return queryId;
+  }
+
+  public boolean isInitError() {
+    return initError != null;
+  }
+
+  public QueryState getState() {
+    if(query == null) {
+      if (isInitError()) {
+        return QueryState.QUERY_ERROR;
+      } else {
+        return QueryState.QUERY_NOT_ASSIGNED;
+      }
+    } else {
+      return query.getState();
+    }
+  }
+
+  public String getErrorMessage() {
+    if (isInitError()) {
+      return StringUtils.stringifyException(initError);
+    } else {
+      return null;
+    }
+  }
+
+  public long getQuerySubmitTime() {
+    return this.querySubmitTime;
+  }
+
+  public class QueryMasterTaskContext {
+    EventHandler eventHandler;
+    public QueryMaster.QueryMasterContext getQueryMasterContext() {
+      return queryMasterContext;
+    }
+
+    public Session getSession() {
+      return session;
+    }
+
+    public QueryContext getQueryContext() {
+      return queryContext;
+    }
+
+    public TajoConf getConf() {
+      return systemConf;
+    }
+
+    public Clock getClock() {
+      return queryMasterContext.getClock();
+    }
+
+    public Query getQuery() {
+      return query;
+    }
+
+    public QueryId getQueryId() {
+      return queryId;
+    }
+
+    public AbstractStorageManager getStorageManager() {
+      return queryMasterContext.getStorageManager();
+    }
+
+    public Path getStagingDir() {
+      return queryContext.getStagingDir();
+    }
+
+    public synchronized EventHandler getEventHandler() {
+      if(eventHandler == null) {
+        eventHandler = dispatcher.getEventHandler();
+      }
+      return eventHandler;
+    }
+
+    public TajoAsyncDispatcher getDispatcher() {
+      return dispatcher;
+    }
+
+    public SubQuery getSubQuery(ExecutionBlockId id) {
+      return query.getSubQuery(id);
+    }
+
+    public Map<String, TableDesc> getTableDescMap() {
+      return tableDescMap;
+    }
+
+    public float getProgress() {
+      if(query == null) {
+        return 0.0f;
+      }
+      return query.getProgress();
+    }
+
+    public AbstractResourceAllocator getResourceAllocator() {
+      return resourceAllocator;
+    }
+
+    public TajoMetrics getQueryMetrics() {
+      return queryMetrics;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
new file mode 100644
index 0000000..42fbf8a
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
@@ -0,0 +1,658 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.querymaster;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.state.*;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.QueryUnitId;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.master.FragmentPair;
+import org.apache.tajo.master.TaskState;
+import org.apache.tajo.master.event.*;
+import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttemptScheduleContext;
+import org.apache.tajo.storage.DataLocation;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.TajoIdUtils;
+
+import java.net.URI;
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleFileOutput;
+
+public class QueryUnit implements EventHandler<TaskEvent> {
+  /** Class Logger */
+  private static final Log LOG = LogFactory.getLog(QueryUnit.class);
+
+  private final Configuration systemConf;
+	private QueryUnitId taskId;
+  private EventHandler eventHandler;
+	private StoreTableNode store = null;
+	private LogicalNode plan = null;
+	private List<ScanNode> scan;
+	
+	private Map<String, Set<FragmentProto>> fragMap;
+	private Map<String, Set<URI>> fetchMap;
+
+  private int totalFragmentNum;
+
+  private List<ShuffleFileOutput> shuffleFileOutputs;
+	private TableStats stats;
+  private final boolean isLeafTask;
+  private List<IntermediateEntry> intermediateData;
+
+  private Map<QueryUnitAttemptId, QueryUnitAttempt> attempts;
+  private final int maxAttempts = 3;
+  private Integer nextAttempt = -1;
+  private QueryUnitAttemptId lastAttemptId;
+
+  private QueryUnitAttemptId successfulAttempt;
+  private String succeededHost;
+  private int succeededPullServerPort;
+
+  private int failedAttempts;
+  private int finishedAttempts; // finish are total of success, failed and killed
+
+  private long launchTime;
+  private long finishTime;
+
+  private List<DataLocation> dataLocations = Lists.newArrayList();
+
+  private static final AttemptKilledTransition ATTEMPT_KILLED_TRANSITION = new AttemptKilledTransition();
+
+  protected static final StateMachineFactory
+      <QueryUnit, TaskState, TaskEventType, TaskEvent> stateMachineFactory =
+      new StateMachineFactory <QueryUnit, TaskState, TaskEventType, TaskEvent>(TaskState.NEW)
+
+          // Transitions from NEW state
+          .addTransition(TaskState.NEW, TaskState.SCHEDULED,
+              TaskEventType.T_SCHEDULE,
+              new InitialScheduleTransition())
+          .addTransition(TaskState.NEW, TaskState.KILLED,
+              TaskEventType.T_KILL,
+              new KillNewTaskTransition())
+
+          // Transitions from SCHEDULED state
+          .addTransition(TaskState.SCHEDULED, TaskState.RUNNING,
+              TaskEventType.T_ATTEMPT_LAUNCHED,
+              new AttemptLaunchedTransition())
+          .addTransition(TaskState.SCHEDULED, TaskState.KILL_WAIT,
+              TaskEventType.T_KILL,
+              new KillTaskTransition())
+
+          // Transitions from RUNNING state
+          .addTransition(TaskState.RUNNING, TaskState.RUNNING,
+              TaskEventType.T_ATTEMPT_LAUNCHED)
+          .addTransition(TaskState.RUNNING, TaskState.SUCCEEDED,
+              TaskEventType.T_ATTEMPT_SUCCEEDED,
+              new AttemptSucceededTransition())
+          .addTransition(TaskState.RUNNING, TaskState.KILL_WAIT,
+              TaskEventType.T_KILL,
+              new KillTaskTransition())
+          .addTransition(TaskState.RUNNING,
+              EnumSet.of(TaskState.RUNNING, TaskState.FAILED),
+              TaskEventType.T_ATTEMPT_FAILED,
+              new AttemptFailedOrRetryTransition())
+
+          // Transitions from KILL_WAIT state
+          .addTransition(TaskState.KILL_WAIT, TaskState.KILLED,
+              TaskEventType.T_ATTEMPT_KILLED,
+              ATTEMPT_KILLED_TRANSITION)
+          .addTransition(TaskState.KILL_WAIT, TaskState.KILL_WAIT,
+              TaskEventType.T_ATTEMPT_LAUNCHED,
+              new KillTaskTransition())
+          .addTransition(TaskState.KILL_WAIT, TaskState.FAILED,
+              TaskEventType.T_ATTEMPT_FAILED,
+              new AttemptFailedTransition())
+          .addTransition(TaskState.KILL_WAIT, TaskState.KILLED,
+              TaskEventType.T_ATTEMPT_SUCCEEDED,
+              ATTEMPT_KILLED_TRANSITION)
+              // Ignore-able transitions.
+          .addTransition(TaskState.KILL_WAIT, TaskState.KILL_WAIT,
+              EnumSet.of(
+                  TaskEventType.T_KILL,
+                  TaskEventType.T_SCHEDULE))
+
+          // Transitions from SUCCEEDED state
+          // Ignore-able transitions
+          .addTransition(TaskState.SUCCEEDED, TaskState.SUCCEEDED,
+              EnumSet.of(TaskEventType.T_KILL, TaskEventType.T_ATTEMPT_KILLED, TaskEventType.T_ATTEMPT_SUCCEEDED))
+
+          // Transitions from FAILED state
+          // Ignore-able transitions
+          .addTransition(TaskState.FAILED, TaskState.FAILED,
+              EnumSet.of(TaskEventType.T_KILL, TaskEventType.T_ATTEMPT_KILLED, TaskEventType.T_ATTEMPT_SUCCEEDED))
+
+          .installTopology();
+
+  private final StateMachine<TaskState, TaskEventType, TaskEvent> stateMachine;
+
+
+  private final Lock readLock;
+  private final Lock writeLock;
+  private QueryUnitAttemptScheduleContext scheduleContext;
+
+	public QueryUnit(Configuration conf, QueryUnitAttemptScheduleContext scheduleContext,
+                   QueryUnitId id, boolean isLeafTask, EventHandler eventHandler) {
+    this.systemConf = conf;
+		this.taskId = id;
+    this.eventHandler = eventHandler;
+    this.isLeafTask = isLeafTask;
+		scan = new ArrayList<ScanNode>();
+    fetchMap = Maps.newHashMap();
+    fragMap = Maps.newHashMap();
+    shuffleFileOutputs = new ArrayList<ShuffleFileOutput>();
+    attempts = Collections.emptyMap();
+    lastAttemptId = null;
+    nextAttempt = -1;
+    failedAttempts = 0;
+
+    ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+    this.readLock = readWriteLock.readLock();
+    this.writeLock = readWriteLock.writeLock();
+    this.scheduleContext = scheduleContext;
+
+    stateMachine = stateMachineFactory.make(this);
+    totalFragmentNum = 0;
+	}
+
+  public boolean isLeafTask() {
+    return this.isLeafTask;
+  }
+
+  public TaskState getState() {
+    readLock.lock();
+    try {
+      return stateMachine.getCurrentState();
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+	public void setLogicalPlan(LogicalNode plan) {
+	  this.plan = plan;
+
+	  LogicalNode node = plan;
+	  ArrayList<LogicalNode> s = new ArrayList<LogicalNode>();
+	  s.add(node);
+	  while (!s.isEmpty()) {
+	    node = s.remove(s.size()-1);
+	    if (node instanceof UnaryNode) {
+	      UnaryNode unary = (UnaryNode) node;
+	      s.add(s.size(), unary.getChild());
+	    } else if (node instanceof BinaryNode) {
+	      BinaryNode binary = (BinaryNode) node;
+	      s.add(s.size(), binary.getLeftChild());
+	      s.add(s.size(), binary.getRightChild());
+	    } else if (node instanceof ScanNode) {
+	      scan.add((ScanNode)node);
+	    } else if (node instanceof TableSubQueryNode) {
+        s.add(((TableSubQueryNode) node).getSubQuery());
+      }
+	  }
+	}
+
+  private void addDataLocation(FileFragment fragment) {
+    String[] hosts = fragment.getHosts();
+    int[] diskIds = fragment.getDiskIds();
+    for (int i = 0; i < hosts.length; i++) {
+      dataLocations.add(new DataLocation(hosts[i], diskIds[i]));
+    }
+  }
+
+  public void addFragment(FileFragment fragment, boolean useDataLocation) {
+    Set<FragmentProto> fragmentProtos;
+    if (fragMap.containsKey(fragment.getTableName())) {
+      fragmentProtos = fragMap.get(fragment.getTableName());
+    } else {
+      fragmentProtos = new HashSet<FragmentProto>();
+      fragMap.put(fragment.getTableName(), fragmentProtos);
+    }
+    fragmentProtos.add(fragment.getProto());
+    if (useDataLocation) {
+      addDataLocation(fragment);
+    }
+    totalFragmentNum++;
+  }
+
+  public void addFragments(Collection<FileFragment> fragments) {
+    for (FileFragment eachFragment: fragments) {
+      addFragment(eachFragment, false);
+    }
+  }
+
+  public void setFragment(FragmentPair[] fragmentPairs) {
+    for (FragmentPair eachFragmentPair : fragmentPairs) {
+      this.addFragment(eachFragmentPair.getLeftFragment(), true);
+      if (eachFragmentPair.getRightFragment() != null) {
+        this.addFragment(eachFragmentPair.getRightFragment(), true);
+      }
+    }
+  }
+
+  public List<DataLocation> getDataLocations() {
+    return dataLocations;
+  }
+
+  public String getSucceededHost() {
+    return succeededHost;
+  }
+	
+	public void addFetches(String tableId, Collection<URI> urilist) {
+	  Set<URI> uris;
+    if (fetchMap.containsKey(tableId)) {
+      uris = fetchMap.get(tableId);
+    } else {
+      uris = Sets.newHashSet();
+    }
+    uris.addAll(urilist);
+    fetchMap.put(tableId, uris);
+	}
+	
+	public void setFetches(Map<String, Set<URI>> fetches) {
+	  this.fetchMap.clear();
+	  this.fetchMap.putAll(fetches);
+	}
+
+  public Collection<FragmentProto> getAllFragments() {
+    Set<FragmentProto> fragmentProtos = new HashSet<FragmentProto>();
+    for (Set<FragmentProto> eachFragmentSet : fragMap.values()) {
+      fragmentProtos.addAll(eachFragmentSet);
+    }
+    return fragmentProtos;
+  }
+	
+	public LogicalNode getLogicalPlan() {
+	  return this.plan;
+	}
+	
+	public QueryUnitId getId() {
+		return taskId;
+	}
+	
+	public Collection<URI> getFetchHosts(String tableId) {
+	  return fetchMap.get(tableId);
+	}
+	
+	public Collection<Set<URI>> getFetches() {
+	  return fetchMap.values();
+	}
+
+  public Map<String, Set<URI>> getFetchMap() {
+    return fetchMap;
+  }
+	
+	public Collection<URI> getFetch(ScanNode scan) {
+	  return this.fetchMap.get(scan.getTableName());
+	}
+	
+	public ScanNode[] getScanNodes() {
+	  return this.scan.toArray(new ScanNode[scan.size()]);
+	}
+	
+	@Override
+	public String toString() {
+		String str = new String(plan.getType() + " \n");
+		for (Entry<String, Set<FragmentProto>> e : fragMap.entrySet()) {
+		  str += e.getKey() + " : ";
+      for (FragmentProto fragment : e.getValue()) {
+        str += fragment + ", ";
+      }
+		}
+		for (Entry<String, Set<URI>> e : fetchMap.entrySet()) {
+      str += e.getKey() + " : ";
+      for (URI t : e.getValue()) {
+        str += t + " ";
+      }
+    }
+		
+		return str;
+	}
+	
+	public void setStats(TableStats stats) {
+	  this.stats = stats;
+	}
+	
+	public void setShuffleFileOutputs(List<ShuffleFileOutput> partitions) {
+	  this.shuffleFileOutputs = Collections.unmodifiableList(partitions);
+	}
+	
+	public TableStats getStats() {
+	  return this.stats;
+	}
+	
+	public List<ShuffleFileOutput> getShuffleFileOutputs() {
+	  return this.shuffleFileOutputs;
+	}
+	
+	public int getShuffleOutpuNum() {
+	  return this.shuffleFileOutputs.size();
+	}
+
+  public QueryUnitAttempt newAttempt() {
+    QueryUnitAttempt attempt = new QueryUnitAttempt(scheduleContext,
+        QueryIdFactory.newQueryUnitAttemptId(this.getId(), ++nextAttempt),
+        this, eventHandler);
+    lastAttemptId = attempt.getId();
+    return attempt;
+  }
+
+  public QueryUnitAttempt getAttempt(QueryUnitAttemptId attemptId) {
+    return attempts.get(attemptId);
+  }
+
+  public QueryUnitAttempt getAttempt(int attempt) {
+    return this.attempts.get(QueryIdFactory.newQueryUnitAttemptId(this.getId(), attempt));
+  }
+
+  public QueryUnitAttempt getLastAttempt() {
+    return getAttempt(this.lastAttemptId);
+  }
+
+  public QueryUnitAttempt getSuccessfulAttempt() {
+    readLock.lock();
+    try {
+      if (null == successfulAttempt) {
+        return null;
+      }
+      return attempts.get(successfulAttempt);
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  public int getRetryCount () {
+    return this.nextAttempt;
+  }
+
+  public int getTotalFragmentNum() {
+    return totalFragmentNum;
+  }
+
+  private static class InitialScheduleTransition implements
+    SingleArcTransition<QueryUnit, TaskEvent> {
+
+    @Override
+    public void transition(QueryUnit task, TaskEvent taskEvent) {
+      task.addAndScheduleAttempt();
+    }
+  }
+
+  public long getLaunchTime() {
+    return launchTime;
+  }
+
+  public long getFinishTime() {
+    return finishTime;
+  }
+
+  @VisibleForTesting
+  public void setLaunchTime(long launchTime) {
+    this.launchTime = launchTime;
+  }
+
+  @VisibleForTesting
+  public void setFinishTime(long finishTime) {
+    this.finishTime = finishTime;
+  }
+
+  public long getRunningTime() {
+    if(finishTime > 0) {
+      return finishTime - launchTime;
+    } else {
+      return System.currentTimeMillis() - launchTime;
+    }
+  }
+
+  // This is always called in the Write Lock
+  private void addAndScheduleAttempt() {
+    // Create new task attempt
+    QueryUnitAttempt attempt = newAttempt();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Created attempt " + attempt.getId());
+    }
+    switch (attempts.size()) {
+      case 0:
+        attempts = Collections.singletonMap(attempt.getId(), attempt);
+        break;
+
+      case 1:
+        Map<QueryUnitAttemptId, QueryUnitAttempt> newAttempts
+            = new LinkedHashMap<QueryUnitAttemptId, QueryUnitAttempt>(3);
+        newAttempts.putAll(attempts);
+        attempts = newAttempts;
+        attempts.put(attempt.getId(), attempt);
+        break;
+
+      default:
+        attempts.put(attempt.getId(), attempt);
+        break;
+    }
+
+    if (failedAttempts > 0) {
+      eventHandler.handle(new TaskAttemptScheduleEvent(systemConf, attempt.getId(),
+          TaskAttemptEventType.TA_RESCHEDULE));
+    } else {
+      eventHandler.handle(new TaskAttemptScheduleEvent(systemConf, attempt.getId(),
+          TaskAttemptEventType.TA_SCHEDULE));
+    }
+  }
+
+  private void finishTask() {
+    this.finishTime = System.currentTimeMillis();
+  }
+
+  private static class KillNewTaskTransition implements SingleArcTransition<QueryUnit, TaskEvent> {
+
+    @Override
+    public void transition(QueryUnit task, TaskEvent taskEvent) {
+      task.eventHandler.handle(new SubQueryTaskEvent(task.getId(), TaskState.KILLED));
+    }
+  }
+
+  private static class KillTaskTransition implements SingleArcTransition<QueryUnit, TaskEvent> {
+
+    @Override
+    public void transition(QueryUnit task, TaskEvent taskEvent) {
+      task.finishTask();
+      task.eventHandler.handle(new TaskAttemptEvent(task.lastAttemptId, TaskAttemptEventType.TA_KILL));
+    }
+  }
+
+  private static class AttemptKilledTransition implements SingleArcTransition<QueryUnit, TaskEvent>{
+
+    @Override
+    public void transition(QueryUnit task, TaskEvent event) {
+      task.finishTask();
+      task.eventHandler.handle(new SubQueryTaskEvent(task.getId(), TaskState.KILLED));
+    }
+  }
+
+  private static class AttemptSucceededTransition
+      implements SingleArcTransition<QueryUnit, TaskEvent>{
+
+    @Override
+    public void transition(QueryUnit task,
+                           TaskEvent event) {
+      TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) event;
+      QueryUnitAttempt attempt = task.attempts.get(attemptEvent.getTaskAttemptId());
+
+      task.successfulAttempt = attemptEvent.getTaskAttemptId();
+      task.succeededHost = attempt.getHost();
+      task.succeededPullServerPort = attempt.getPullServerPort();
+
+      task.finishTask();
+      task.eventHandler.handle(new SubQueryTaskEvent(event.getTaskId(), TaskState.SUCCEEDED));
+    }
+  }
+
+  private static class AttemptLaunchedTransition implements SingleArcTransition<QueryUnit, TaskEvent> {
+    @Override
+    public void transition(QueryUnit task,
+                           TaskEvent event) {
+      TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) event;
+      QueryUnitAttempt attempt = task.attempts.get(attemptEvent.getTaskAttemptId());
+      task.launchTime = System.currentTimeMillis();
+      task.succeededHost = attempt.getHost();
+    }
+  }
+
+  private static class AttemptFailedTransition implements SingleArcTransition<QueryUnit, TaskEvent> {
+    @Override
+    public void transition(QueryUnit task, TaskEvent event) {
+      TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) event;
+      LOG.info("=============================================================");
+      LOG.info(">>> Task Failed: " + attemptEvent.getTaskAttemptId() + " <<<");
+      LOG.info("=============================================================");
+      task.failedAttempts++;
+      task.finishedAttempts++;
+
+      task.finishTask();
+      task.eventHandler.handle(new SubQueryTaskEvent(task.getId(), TaskState.FAILED));
+    }
+  }
+
+  private static class AttemptFailedOrRetryTransition implements
+    MultipleArcTransition<QueryUnit, TaskEvent, TaskState> {
+
+    @Override
+    public TaskState transition(QueryUnit task, TaskEvent taskEvent) {
+      TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) taskEvent;
+      LOG.info("=============================================================");
+      LOG.info(">>> Task Failed: " + attemptEvent.getTaskAttemptId() + " <<<");
+      LOG.info("=============================================================");
+      task.failedAttempts++;
+      task.finishedAttempts++;
+
+      if (task.failedAttempts < task.maxAttempts) {
+        if (task.successfulAttempt == null) {
+          task.addAndScheduleAttempt();
+        }
+      } else {
+        task.finishTask();
+        task.eventHandler.handle(new SubQueryTaskEvent(task.getId(), TaskState.FAILED));
+        return TaskState.FAILED;
+      }
+
+      return task.getState();
+    }
+  }
+
+  @Override
+  public void handle(TaskEvent event) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Processing " + event.getTaskId() + " of type "
+          + event.getType());
+    }
+
+    try {
+      writeLock.lock();
+      TaskState oldState = getState();
+      try {
+        stateMachine.doTransition(event.getType(), event);
+      } catch (InvalidStateTransitonException e) {
+        LOG.error("Can't handle this event at current state", e);
+        eventHandler.handle(new QueryEvent(TajoIdUtils.parseQueryId(getId().toString()),
+            QueryEventType.INTERNAL_ERROR));
+      }
+
+      //notify the eventhandler of state change
+      if (LOG.isDebugEnabled()) {
+        if (oldState != getState()) {
+          LOG.debug(taskId + " Task Transitioned from " + oldState + " to "
+              + getState());
+        }
+      }
+    }
+
+    finally {
+      writeLock.unlock();
+    }
+  }
+
+  public void setIntermediateData(Collection<IntermediateEntry> partitions) {
+    this.intermediateData = new ArrayList<IntermediateEntry>(partitions);
+  }
+
+  public List<IntermediateEntry> getIntermediateData() {
+    return this.intermediateData;
+  }
+
+  public static class IntermediateEntry {
+    int taskId;
+    int attemptId;
+    int partId;
+    String pullHost;
+    int port;
+
+    public IntermediateEntry(int taskId, int attemptId, int partId,
+                             String pullServerAddr, int pullServerPort) {
+      this.taskId = taskId;
+      this.attemptId = attemptId;
+      this.partId = partId;
+      this.pullHost = pullServerAddr;
+      this.port = pullServerPort;
+    }
+
+    public int getTaskId() {
+      return this.taskId;
+    }
+
+    public int getAttemptId() {
+      return this.attemptId;
+    }
+
+    public int getPartId() {
+      return this.partId;
+    }
+
+    public String getPullHost() {
+      return this.pullHost;
+    }
+
+    public int getPullPort() {
+      return port;
+    }
+
+    public String getPullAddress() {
+      return pullHost + ":" + port;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(taskId, attemptId, partId, pullHost, port);
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
new file mode 100644
index 0000000..7993ce9
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
@@ -0,0 +1,442 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.querymaster;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.state.*;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TajoProtos.TaskAttemptState;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.ipc.TajoWorkerProtocol.TaskCompletionReport;
+import org.apache.tajo.master.event.*;
+import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttemptScheduleContext;
+import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
+import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry;
+
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleFileOutput;
+
+public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
+
+  private static final Log LOG = LogFactory.getLog(QueryUnitAttempt.class);
+
+  private final static int EXPIRE_TIME = 15000;
+
+  private final QueryUnitAttemptId id;
+  private final QueryUnit queryUnit;
+  final EventHandler eventHandler;
+
+  private ContainerId containerId;
+  private String hostName;
+  private int port;
+  private int expire;
+
+  private final Lock readLock;
+  private final Lock writeLock;
+
+  private final List<String> diagnostics = new ArrayList<String>();
+
+  private final QueryUnitAttemptScheduleContext scheduleContext;
+
+  private float progress;
+  private CatalogProtos.TableStatsProto inputStats;
+  private CatalogProtos.TableStatsProto resultStats;
+
+  protected static final StateMachineFactory
+      <QueryUnitAttempt, TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
+      stateMachineFactory = new StateMachineFactory
+      <QueryUnitAttempt, TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
+      (TaskAttemptState.TA_NEW)
+
+      // Transitions from TA_NEW state
+      .addTransition(TaskAttemptState.TA_NEW, TaskAttemptState.TA_UNASSIGNED,
+          TaskAttemptEventType.TA_SCHEDULE, new TaskAttemptScheduleTransition())
+      .addTransition(TaskAttemptState.TA_NEW, TaskAttemptState.TA_UNASSIGNED,
+          TaskAttemptEventType.TA_RESCHEDULE, new TaskAttemptScheduleTransition())
+      .addTransition(TaskAttemptState.TA_NEW, TaskAttemptState.TA_KILLED,
+          TaskAttemptEventType.TA_KILL,
+          new TaskKilledCompleteTransition())
+
+      // Transitions from TA_UNASSIGNED state
+      .addTransition(TaskAttemptState.TA_UNASSIGNED, TaskAttemptState.TA_ASSIGNED,
+          TaskAttemptEventType.TA_ASSIGNED,
+          new LaunchTransition())
+      .addTransition(TaskAttemptState.TA_UNASSIGNED, TaskAttemptState.TA_KILL_WAIT,
+          TaskAttemptEventType.TA_KILL,
+          new KillUnassignedTaskTransition())
+
+      // Transitions from TA_ASSIGNED state
+      .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_ASSIGNED,
+          TaskAttemptEventType.TA_ASSIGNED, new AlreadyAssignedTransition())
+      .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_KILL_WAIT,
+          TaskAttemptEventType.TA_KILL,
+          new KillTaskTransition())
+      .addTransition(TaskAttemptState.TA_ASSIGNED,
+          EnumSet.of(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_KILLED),
+          TaskAttemptEventType.TA_UPDATE, new StatusUpdateTransition())
+      .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_SUCCEEDED,
+          TaskAttemptEventType.TA_DONE, new SucceededTransition())
+      .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_FAILED,
+          TaskAttemptEventType.TA_FATAL_ERROR, new FailedTransition())
+
+      // Transitions from TA_RUNNING state
+      .addTransition(TaskAttemptState.TA_RUNNING,
+          EnumSet.of(TaskAttemptState.TA_RUNNING),
+          TaskAttemptEventType.TA_UPDATE, new StatusUpdateTransition())
+      .addTransition(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_KILL_WAIT,
+          TaskAttemptEventType.TA_KILL,
+          new KillTaskTransition())
+      .addTransition(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_SUCCEEDED,
+          TaskAttemptEventType.TA_DONE, new SucceededTransition())
+      .addTransition(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_FAILED,
+          TaskAttemptEventType.TA_FATAL_ERROR, new FailedTransition())
+
+      .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILLED,
+          TaskAttemptEventType.TA_LOCAL_KILLED,
+          new TaskKilledCompleteTransition())
+      .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILL_WAIT,
+          TaskAttemptEventType.TA_ASSIGNED,
+          new KillTaskTransition())
+      .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILLED,
+          TaskAttemptEventType.TA_SCHEDULE_CANCELED,
+          new TaskKilledCompleteTransition())
+      .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILLED,
+          TaskAttemptEventType.TA_DONE,
+          new TaskKilledCompleteTransition())
+      .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_FAILED,
+          TaskAttemptEventType.TA_FATAL_ERROR)
+      .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILL_WAIT,
+          EnumSet.of(
+              TaskAttemptEventType.TA_KILL,
+              TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
+              TaskAttemptEventType.TA_UPDATE))
+
+      // Transitions from TA_SUCCEEDED state
+      .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_SUCCEEDED,
+          TaskAttemptEventType.TA_UPDATE)
+      .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_SUCCEEDED,
+          TaskAttemptEventType.TA_DONE, new AlreadyDoneTransition())
+      .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_FAILED,
+          TaskAttemptEventType.TA_FATAL_ERROR, new FailedTransition())
+       // Ignore-able transitions
+      .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_SUCCEEDED,
+          TaskAttemptEventType.TA_KILL)
+
+      // Transitions from TA_KILLED state
+      .addTransition(TaskAttemptState.TA_KILLED, TaskAttemptState.TA_KILLED,
+          TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE)
+      // Ignore-able transitions
+      .addTransition(TaskAttemptState.TA_KILLED, TaskAttemptState.TA_KILLED,
+          EnumSet.of(
+              TaskAttemptEventType.TA_UPDATE))
+
+      .installTopology();
+
+  private final StateMachine<TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
+    stateMachine;
+
+
+  public QueryUnitAttempt(final QueryUnitAttemptScheduleContext scheduleContext,
+                          final QueryUnitAttemptId id, final QueryUnit queryUnit,
+                          final EventHandler eventHandler) {
+    this.scheduleContext = scheduleContext;
+    this.id = id;
+    this.expire = QueryUnitAttempt.EXPIRE_TIME;
+    this.queryUnit = queryUnit;
+    this.eventHandler = eventHandler;
+
+    ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+    this.readLock = readWriteLock.readLock();
+    this.writeLock = readWriteLock.writeLock();
+
+    stateMachine = stateMachineFactory.make(this);
+  }
+
+  public TaskAttemptState getState() {
+    readLock.lock();
+    try {
+      return stateMachine.getCurrentState();
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  public QueryUnitAttemptId getId() {
+    return this.id;
+  }
+
+  public boolean isLeafTask() {
+    return this.queryUnit.isLeafTask();
+  }
+
+  public QueryUnit getQueryUnit() {
+    return this.queryUnit;
+  }
+
+  public String getHost() {
+    return this.hostName;
+  }
+
+  public int getPort() {
+    return this.port;
+  }
+
+  public void setContainerId(ContainerId containerId) {
+    this.containerId = containerId;
+  }
+
+  public void setHost(String host) {
+    this.hostName = host;
+  }
+
+  public void setPullServerPort(int port) {
+    this.port = port;
+  }
+
+  public int getPullServerPort() {
+    return port;
+  }
+
+  public synchronized void setExpireTime(int expire) {
+    this.expire = expire;
+  }
+
+  public synchronized void updateExpireTime(int period) {
+    this.setExpireTime(this.expire - period);
+  }
+
+  public synchronized void resetExpireTime() {
+    this.setExpireTime(QueryUnitAttempt.EXPIRE_TIME);
+  }
+
+  public int getLeftTime() {
+    return this.expire;
+  }
+
+  public float getProgress() {
+    return progress;
+  }
+
+  public TableStats getInputStats() {
+    if (inputStats == null) {
+      return null;
+    }
+
+    return new TableStats(inputStats);
+  }
+
+  public TableStats getResultStats() {
+    if (resultStats == null) {
+      return null;
+    }
+    return new TableStats(resultStats);
+  }
+
+  private void fillTaskStatistics(TaskCompletionReport report) {
+    this.progress = 1.0f;
+
+    List<IntermediateEntry> partitions = new ArrayList<IntermediateEntry>();
+
+    if (report.getShuffleFileOutputsCount() > 0) {
+      this.getQueryUnit().setShuffleFileOutputs(report.getShuffleFileOutputsList());
+
+      for (ShuffleFileOutput p : report.getShuffleFileOutputsList()) {
+        IntermediateEntry entry = new IntermediateEntry(getId().getQueryUnitId().getId(),
+            getId().getId(), p.getPartId(), getHost(), getPullServerPort());
+        partitions.add(entry);
+      }
+    }
+    this.getQueryUnit().setIntermediateData(partitions);
+
+    if (report.hasInputStats()) {
+      this.inputStats = report.getInputStats();
+    }
+    if (report.hasResultStats()) {
+      this.resultStats = report.getResultStats();
+      this.getQueryUnit().setStats(new TableStats(resultStats));
+    }
+  }
+
+  private static class TaskAttemptScheduleTransition implements
+      SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent> {
+
+    @Override
+    public void transition(QueryUnitAttempt taskAttempt, TaskAttemptEvent taskAttemptEvent) {
+      taskAttempt.eventHandler.handle(new QueryUnitAttemptScheduleEvent(
+          EventType.T_SCHEDULE, taskAttempt.getQueryUnit().getId().getExecutionBlockId(),
+          taskAttempt.scheduleContext, taskAttempt));
+    }
+  }
+
+  private static class KillUnassignedTaskTransition implements
+      SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent> {
+
+    @Override
+    public void transition(QueryUnitAttempt taskAttempt, TaskAttemptEvent taskAttemptEvent) {
+      taskAttempt.eventHandler.handle(new QueryUnitAttemptScheduleEvent(
+          EventType.T_SCHEDULE_CANCEL, taskAttempt.getQueryUnit().getId().getExecutionBlockId(),
+          taskAttempt.scheduleContext, taskAttempt));
+    }
+  }
+
+  private static class LaunchTransition
+      implements SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent> {
+
+    @Override
+    public void transition(QueryUnitAttempt taskAttempt,
+                           TaskAttemptEvent event) {
+      TaskAttemptAssignedEvent castEvent = (TaskAttemptAssignedEvent) event;
+      taskAttempt.containerId = castEvent.getContainerId();
+      taskAttempt.setHost(castEvent.getHostName());
+      taskAttempt.setPullServerPort(castEvent.getPullServerPort());
+      taskAttempt.eventHandler.handle(
+          new TaskTAttemptEvent(taskAttempt.getId(),
+              TaskEventType.T_ATTEMPT_LAUNCHED));
+    }
+  }
+
+  private static class TaskKilledCompleteTransition implements SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent> {
+
+    @Override
+    public void transition(QueryUnitAttempt taskAttempt,
+                           TaskAttemptEvent event) {
+      taskAttempt.getQueryUnit().handle(new TaskEvent(taskAttempt.getId().getQueryUnitId(),
+          TaskEventType.T_ATTEMPT_KILLED));
+      LOG.info(taskAttempt.getId() + " Received TA_KILLED Status from LocalTask");
+    }
+  }
+
+  private static class StatusUpdateTransition
+      implements MultipleArcTransition<QueryUnitAttempt, TaskAttemptEvent, TaskAttemptState> {
+
+    @Override
+    public TaskAttemptState transition(QueryUnitAttempt taskAttempt,
+                                       TaskAttemptEvent event) {
+      TaskAttemptStatusUpdateEvent updateEvent = (TaskAttemptStatusUpdateEvent) event;
+
+      taskAttempt.progress = updateEvent.getStatus().getProgress();
+      taskAttempt.inputStats = updateEvent.getStatus().getInputStats();
+      taskAttempt.resultStats = updateEvent.getStatus().getResultStats();
+
+      return TaskAttemptState.TA_RUNNING;
+    }
+  }
+
+  private void addDiagnosticInfo(String diag) {
+    if (diag != null && !diag.equals("")) {
+      diagnostics.add(diag);
+    }
+  }
+
+  private static class AlreadyAssignedTransition
+      implements SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent>{
+
+    @Override
+    public void transition(QueryUnitAttempt queryUnitAttempt,
+                           TaskAttemptEvent taskAttemptEvent) {
+    }
+  }
+
+  private static class AlreadyDoneTransition
+      implements SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent>{
+
+    @Override
+    public void transition(QueryUnitAttempt queryUnitAttempt,
+                           TaskAttemptEvent taskAttemptEvent) {
+    }
+  }
+
+  private static class SucceededTransition implements SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent> {
+    @Override
+    public void transition(QueryUnitAttempt taskAttempt,
+                           TaskAttemptEvent event) {
+      TaskCompletionReport report = ((TaskCompletionEvent)event).getReport();
+
+      try {
+        taskAttempt.fillTaskStatistics(report);
+        taskAttempt.eventHandler.handle(new TaskTAttemptEvent(taskAttempt.getId(), TaskEventType.T_ATTEMPT_SUCCEEDED));
+      } catch (Throwable t) {
+        taskAttempt.eventHandler.handle(new TaskFatalErrorEvent(taskAttempt.getId(), t.getMessage()));
+      }
+    }
+  }
+
+  private static class KillTaskTransition implements SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent> {
+
+    @Override
+    public void transition(QueryUnitAttempt taskAttempt, TaskAttemptEvent event) {
+      taskAttempt.eventHandler.handle(new LocalTaskEvent(taskAttempt.getId(), taskAttempt.containerId,
+          LocalTaskEventType.KILL));
+    }
+  }
+
+  private static class FailedTransition implements SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent>{
+    @Override
+    public void transition(QueryUnitAttempt taskAttempt, TaskAttemptEvent event) {
+      TaskFatalErrorEvent errorEvent = (TaskFatalErrorEvent) event;
+      taskAttempt.eventHandler.handle(new TaskTAttemptEvent(taskAttempt.getId(), TaskEventType.T_ATTEMPT_FAILED));
+      taskAttempt.addDiagnosticInfo(errorEvent.errorMessage());
+      LOG.error("FROM " + taskAttempt.getHost() + " >> " + errorEvent.errorMessage());
+    }
+  }
+
+  @Override
+  public void handle(TaskAttemptEvent event) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Processing " + event.getTaskAttemptId() + " of type " + event.getType());
+    }
+    try {
+      writeLock.lock();
+      TaskAttemptState oldState = getState();
+      try {
+        stateMachine.doTransition(event.getType(), event);
+      } catch (InvalidStateTransitonException e) {
+        LOG.error("Can't handle this event at current state of " + event.getTaskAttemptId() + ")", e);
+        eventHandler.handle(
+            new SubQueryDiagnosticsUpdateEvent(event.getTaskAttemptId().getQueryUnitId().getExecutionBlockId(),
+                "Can't handle this event at current state of " + event.getTaskAttemptId() + ")"));
+        eventHandler.handle(
+            new SubQueryEvent(event.getTaskAttemptId().getQueryUnitId().getExecutionBlockId(),
+                SubQueryEventType.SQ_INTERNAL_ERROR));
+      }
+
+      //notify the eventhandler of state change
+      if (LOG.isDebugEnabled()) {
+       if (oldState != getState()) {
+          LOG.debug(id + " TaskAttempt Transitioned from " + oldState + " to "
+              + getState());
+        }
+      }
+    }
+
+    finally {
+      writeLock.unlock();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
new file mode 100644
index 0000000..31d433d
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -0,0 +1,653 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.querymaster;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.algebra.JoinType;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.statistics.StatisticsUtil;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.PlanningException;
+import org.apache.tajo.engine.planner.RangePartitionAlgorithm;
+import org.apache.tajo.engine.planner.UniformRangePartition;
+import org.apache.tajo.engine.planner.global.DataChannel;
+import org.apache.tajo.engine.planner.global.ExecutionBlock;
+import org.apache.tajo.engine.planner.global.GlobalPlanner;
+import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.engine.utils.TupleUtil;
+import org.apache.tajo.exception.InternalException;
+import org.apache.tajo.master.TaskSchedulerContext;
+import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry;
+import org.apache.tajo.storage.AbstractStorageManager;
+import org.apache.tajo.storage.RowStoreUtil;
+import org.apache.tajo.storage.TupleRange;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.TUtil;
+import org.apache.tajo.util.TajoIdUtils;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.math.BigDecimal;
+import java.net.URI;
+import java.util.*;
+import java.util.Map.Entry;
+
+import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType.HASH_SHUFFLE;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType.RANGE_SHUFFLE;
+
+/**
+ * Repartitioner creates non-leaf tasks and shuffles intermediate data.
+ * It supports two repartition methods, such as hash and range repartition.
+ */
+public class Repartitioner {
+  private static final Log LOG = LogFactory.getLog(Repartitioner.class);
+
+  private static int HTTP_REQUEST_MAXIMUM_LENGTH = 1900;
+  private final static String UNKNOWN_HOST = "unknown";
+
+  public static void scheduleFragmentsForJoinQuery(TaskSchedulerContext schedulerContext, SubQuery subQuery)
+      throws IOException {
+    MasterPlan masterPlan = subQuery.getMasterPlan();
+    ExecutionBlock execBlock = subQuery.getBlock();
+    QueryMasterTask.QueryMasterTaskContext masterContext = subQuery.getContext();
+    AbstractStorageManager storageManager = subQuery.getStorageManager();
+
+    ScanNode[] scans = execBlock.getScanNodes();
+
+    Path tablePath;
+    FileFragment[] fragments = new FileFragment[scans.length];
+    long[] stats = new long[scans.length];
+
+    // initialize variables from the child operators
+    for (int i = 0; i < scans.length; i++) {
+      TableDesc tableDesc = masterContext.getTableDescMap().get(scans[i].getCanonicalName());
+      if (tableDesc == null) { // if it is a real table stored on storage
+        // TODO - to be fixed (wrong directory)
+        ExecutionBlock [] childBlocks = new ExecutionBlock[2];
+        childBlocks[0] = masterPlan.getChild(execBlock.getId(), 0);
+        childBlocks[1] = masterPlan.getChild(execBlock.getId(), 1);
+
+        tablePath = storageManager.getTablePath(scans[i].getTableName());
+        stats[i] = masterContext.getSubQuery(childBlocks[i].getId()).getResultStats().getNumBytes();
+        fragments[i] = new FileFragment(scans[i].getCanonicalName(), tablePath, 0, 0, new String[]{UNKNOWN_HOST});
+      } else {
+        tablePath = tableDesc.getPath();
+        try {
+          stats[i] = GlobalPlanner.computeDescendentVolume(scans[i]);
+        } catch (PlanningException e) {
+          throw new IOException(e);
+        }
+
+        // if table has no data, storageManager will return empty FileFragment.
+        // So, we need to handle FileFragment by its size.
+        // If we don't check its size, it can cause IndexOutOfBoundsException.
+        List<FileFragment> fileFragments = storageManager.getSplits(scans[i].getCanonicalName(), tableDesc.getMeta(), tableDesc.getSchema(), tablePath);
+        if (fileFragments.size() > 0) {
+          fragments[i] = fileFragments.get(0);
+        } else {
+          fragments[i] = new FileFragment(scans[i].getCanonicalName(), tablePath, 0, 0, new String[]{UNKNOWN_HOST});
+        }
+      }
+    }
+
+    // If one of inner join tables has no input data,
+    // it should return zero rows.
+    JoinNode joinNode = PlannerUtil.findMostBottomNode(execBlock.getPlan(), NodeType.JOIN);
+    if (joinNode != null) {
+      if ( (joinNode.getJoinType().equals(JoinType.INNER))) {
+        for (int i = 0; i < stats.length; i++) {
+          if (stats[i] == 0) {
+            return;
+          }
+        }
+      }
+    }
+
+    // Assigning either fragments or fetch urls to query units
+    boolean isAllBroadcastTable = true;
+    int baseScanIdx = -1;
+    for (int i = 0; i < scans.length; i++) {
+      if (!execBlock.isBroadcastTable(scans[i].getCanonicalName())) {
+        isAllBroadcastTable = false;
+        baseScanIdx = i;
+      }
+    }
+
+    if (isAllBroadcastTable) {
+      LOG.info("[Distributed Join Strategy] : Immediate " +  fragments.length + " Way Join on Single Machine");
+      SubQuery.scheduleFragment(subQuery, fragments[0], Arrays.asList(Arrays.copyOfRange(fragments, 1, fragments.length)));
+      schedulerContext.setEstimatedTaskNum(1);
+    } else if (!execBlock.getBroadcastTables().isEmpty()) {
+      LOG.info(String.format("[Distributed Join Strategy] : Broadcast Join, base_table=%s, base_volume=%d",
+          scans[baseScanIdx].getCanonicalName(), stats[baseScanIdx]));
+      scheduleLeafTasksWithBroadcastTable(schedulerContext, subQuery, baseScanIdx, fragments);
+    } else {
+      LOG.info("[Distributed Join Strategy] : Symmetric Repartition Join");
+      // The hash map is modeling as follows:
+      // <Part Id, <Table Name, Intermediate Data>>
+      Map<Integer, Map<String, List<IntermediateEntry>>> hashEntries = new HashMap<Integer, Map<String, List<IntermediateEntry>>>();
+
+      // Grouping IntermediateData by a partition key and a table name
+      for (ScanNode scan : scans) {
+        SubQuery childSubQuery = masterContext.getSubQuery(TajoIdUtils.createExecutionBlockId(scan.getCanonicalName()));
+        for (QueryUnit task : childSubQuery.getQueryUnits()) {
+          if (task.getIntermediateData() != null && !task.getIntermediateData().isEmpty()) {
+            for (IntermediateEntry intermEntry : task.getIntermediateData()) {
+              if (hashEntries.containsKey(intermEntry.getPartId())) {
+                Map<String, List<IntermediateEntry>> tbNameToInterm =
+                    hashEntries.get(intermEntry.getPartId());
+
+                if (tbNameToInterm.containsKey(scan.getCanonicalName())) {
+                  tbNameToInterm.get(scan.getCanonicalName()).add(intermEntry);
+                } else {
+                  tbNameToInterm.put(scan.getCanonicalName(), TUtil.newList(intermEntry));
+                }
+              } else {
+                Map<String, List<IntermediateEntry>> tbNameToInterm =
+                    new HashMap<String, List<IntermediateEntry>>();
+                tbNameToInterm.put(scan.getCanonicalName(), TUtil.newList(intermEntry));
+                hashEntries.put(intermEntry.getPartId(), tbNameToInterm);
+              }
+            }
+          } else {
+            //if no intermidatedata(empty table), make empty entry
+            int emptyPartitionId = 0;
+            if (hashEntries.containsKey(emptyPartitionId)) {
+              Map<String, List<IntermediateEntry>> tbNameToInterm = hashEntries.get(emptyPartitionId);
+              if (tbNameToInterm.containsKey(scan.getCanonicalName()))
+                tbNameToInterm.get(scan.getCanonicalName())
+                    .addAll(new ArrayList<IntermediateEntry>());
+              else
+                tbNameToInterm.put(scan.getCanonicalName(), new ArrayList<IntermediateEntry>());
+            } else {
+              Map<String, List<IntermediateEntry>> tbNameToInterm = new HashMap<String, List<IntermediateEntry>>();
+              tbNameToInterm.put(scan.getCanonicalName(), new ArrayList<IntermediateEntry>());
+              hashEntries.put(emptyPartitionId, tbNameToInterm);
+            }
+          }
+        }
+      }
+
+      // hashEntries can be zero if there are no input data.
+      // In the case, it will cause the zero divided exception.
+      // it avoids this problem.
+      int [] avgSize = new int[2];
+      avgSize[0] = hashEntries.size() == 0 ? 0 : (int) (stats[0] / hashEntries.size());
+      avgSize[1] = hashEntries.size() == 0 ? 0 : (int) (stats[1] / hashEntries.size());
+      int bothFetchSize = avgSize[0] + avgSize[1];
+
+      // Getting the desire number of join tasks according to the volumn
+      // of a larger table
+      int largerIdx = stats[0] >= stats[1] ? 0 : 1;
+      int desireJoinTaskVolumn = subQuery.getContext().getConf().
+          getIntVar(ConfVars.DIST_QUERY_JOIN_TASK_VOLUME);
+
+      // calculate the number of tasks according to the data size
+      int mb = (int) Math.ceil((double)stats[largerIdx] / 1048576);
+      LOG.info("Larger intermediate data is approximately " + mb + " MB");
+      // determine the number of task per 64MB
+      int maxTaskNum = (int) Math.ceil((double)mb / desireJoinTaskVolumn);
+      LOG.info("The calculated number of tasks is " + maxTaskNum);
+      LOG.info("The number of total shuffle keys is " + hashEntries.size());
+      // the number of join tasks cannot be larger than the number of
+      // distinct partition ids.
+      int joinTaskNum = Math.min(maxTaskNum, hashEntries.size());
+      LOG.info("The determined number of join tasks is " + joinTaskNum);
+
+      SubQuery.scheduleFragment(subQuery, fragments[0], Arrays.asList(new FileFragment[]{fragments[1]}));
+
+      // Assign partitions to tasks in a round robin manner.
+      for (Entry<Integer, Map<String, List<IntermediateEntry>>> entry
+          : hashEntries.entrySet()) {
+        addJoinShuffle(subQuery, entry.getKey(), entry.getValue());
+      }
+
+      schedulerContext.setTaskSize((int) Math.ceil((double) bothFetchSize / joinTaskNum));
+      schedulerContext.setEstimatedTaskNum(joinTaskNum);
+    }
+  }
+
+  /**
+   * It creates a number of fragments for all partitions.
+   */
+  public static List<FileFragment> getFragmentsFromPartitionedTable(AbstractStorageManager sm,
+                                                                          ScanNode scan,
+                                                                          TableDesc table) throws IOException {
+    List<FileFragment> fragments = Lists.newArrayList();
+    PartitionedTableScanNode partitionsScan = (PartitionedTableScanNode) scan;
+    fragments.addAll(sm.getSplits(
+        scan.getCanonicalName(), table.getMeta(), table.getSchema(), partitionsScan.getInputPaths()));
+    partitionsScan.setInputPaths(null);
+    return fragments;
+  }
+
+  private static void scheduleLeafTasksWithBroadcastTable(TaskSchedulerContext schedulerContext, SubQuery subQuery,
+                                                          int baseScanId, FileFragment[] fragments) throws IOException {
+    ExecutionBlock execBlock = subQuery.getBlock();
+    ScanNode[] scans = execBlock.getScanNodes();
+    //Preconditions.checkArgument(scans.length == 2, "Must be Join Query");
+
+    for (int i = 0; i < scans.length; i++) {
+      if (i != baseScanId) {
+        scans[i].setBroadcastTable(true);
+      }
+    }
+
+    TableMeta meta;
+    ScanNode scan = scans[baseScanId];
+    TableDesc desc = subQuery.getContext().getTableDescMap().get(scan.getCanonicalName());
+    meta = desc.getMeta();
+
+    Collection<FileFragment> baseFragments;
+    if (scan.getType() == NodeType.PARTITIONS_SCAN) {
+      baseFragments = getFragmentsFromPartitionedTable(subQuery.getStorageManager(), scan, desc);
+    } else {
+      baseFragments = subQuery.getStorageManager().getSplits(scan.getCanonicalName(), meta, desc.getSchema(),
+          desc.getPath());
+    }
+
+    List<FileFragment> broadcastFragments = new ArrayList<FileFragment>();
+    for (int i = 0; i < fragments.length; i++) {
+      if (i != baseScanId) {
+        broadcastFragments.add(fragments[i]);
+      }
+    }
+    SubQuery.scheduleFragments(subQuery, baseFragments, broadcastFragments);
+    schedulerContext.setEstimatedTaskNum(baseFragments.size());
+  }
+
+  private static void addJoinShuffle(SubQuery subQuery, int partitionId,
+                                     Map<String, List<IntermediateEntry>> grouppedPartitions) {
+    Map<String, List<URI>> fetches = new HashMap<String, List<URI>>();
+    for (ExecutionBlock execBlock : subQuery.getMasterPlan().getChilds(subQuery.getId())) {
+      Map<String, List<IntermediateEntry>> requests;
+      if (grouppedPartitions.containsKey(execBlock.getId().toString())) {
+          requests = mergeHashShuffleRequest(grouppedPartitions.get(execBlock.getId().toString()));
+      } else {
+        return;
+      }
+      Set<URI> fetchURIs = TUtil.newHashSet();
+      for (Entry<String, List<IntermediateEntry>> requestPerNode : requests.entrySet()) {
+        Collection<URI> uris = createHashFetchURL(requestPerNode.getKey(),
+            execBlock.getId(),
+            partitionId, HASH_SHUFFLE,
+            requestPerNode.getValue());
+        fetchURIs.addAll(uris);
+      }
+      fetches.put(execBlock.getId().toString(), Lists.newArrayList(fetchURIs));
+    }
+    SubQuery.scheduleFetches(subQuery, fetches);
+  }
+
+  /**
+   * This method merges the partition request associated with the pullserver's address.
+   * It reduces the number of TCP connections.
+   *
+   * @return key: pullserver's address, value: a list of requests
+   */
+  private static Map<String, List<IntermediateEntry>> mergeHashShuffleRequest(List<IntermediateEntry> partitions) {
+    Map<String, List<IntermediateEntry>> mergedPartitions = new HashMap<String, List<IntermediateEntry>>();
+    for (IntermediateEntry partition : partitions) {
+      if (mergedPartitions.containsKey(partition.getPullAddress())) {
+        mergedPartitions.get(partition.getPullAddress()).add(partition);
+      } else {
+        mergedPartitions.put(partition.getPullAddress(), TUtil.newList(partition));
+      }
+    }
+
+    return mergedPartitions;
+  }
+
+  public static void scheduleFragmentsForNonLeafTasks(TaskSchedulerContext schedulerContext,
+                                                      MasterPlan masterPlan, SubQuery subQuery, int maxNum)
+      throws IOException {
+    DataChannel channel = masterPlan.getIncomingChannels(subQuery.getBlock().getId()).get(0);
+    if (channel.getShuffleType() == HASH_SHUFFLE) {
+      scheduleHashShuffledFetches(schedulerContext, masterPlan, subQuery, channel, maxNum);
+    } else if (channel.getShuffleType() == RANGE_SHUFFLE) {
+      scheduleRangeShuffledFetches(schedulerContext, masterPlan, subQuery, channel, maxNum);
+    } else {
+      throw new InternalException("Cannot support partition type");
+    }
+  }
+
+  private static TableStats computeChildBlocksStats(QueryMasterTask.QueryMasterTaskContext context, MasterPlan masterPlan,
+                                                    ExecutionBlockId parentBlockId) {
+    List<TableStats> tableStatses = new ArrayList<TableStats>();
+    List<ExecutionBlock> childBlocks = masterPlan.getChilds(parentBlockId);
+    for (ExecutionBlock childBlock : childBlocks) {
+      SubQuery childExecSM = context.getSubQuery(childBlock.getId());
+      tableStatses.add(childExecSM.getResultStats());
+    }
+    return StatisticsUtil.aggregateTableStat(tableStatses);
+  }
+
+  public static void scheduleRangeShuffledFetches(TaskSchedulerContext schedulerContext, MasterPlan masterPlan,
+                                                  SubQuery subQuery, DataChannel channel, int maxNum)
+      throws IOException {
+    ExecutionBlock execBlock = subQuery.getBlock();
+    ScanNode scan = execBlock.getScanNodes()[0];
+    Path tablePath;
+    tablePath = subQuery.getContext().getStorageManager().getTablePath(scan.getTableName());
+
+    ExecutionBlock sampleChildBlock = masterPlan.getChild(subQuery.getId(), 0);
+    SortNode sortNode = PlannerUtil.findTopNode(sampleChildBlock.getPlan(), NodeType.SORT);
+    SortSpec [] sortSpecs = sortNode.getSortKeys();
+    Schema sortSchema = new Schema(channel.getShuffleKeys());
+
+    // calculate the number of maximum query ranges
+    TableStats totalStat = computeChildBlocksStats(subQuery.getContext(), masterPlan, subQuery.getId());
+
+    // If there is an empty table in inner join, it should return zero rows.
+    if (totalStat.getNumBytes() == 0 && totalStat.getColumnStats().size() == 0 ) {
+      return;
+    }
+    TupleRange mergedRange = TupleUtil.columnStatToRange(sortSpecs, sortSchema, totalStat.getColumnStats());
+    RangePartitionAlgorithm partitioner = new UniformRangePartition(mergedRange, sortSpecs);
+    BigDecimal card = partitioner.getTotalCardinality();
+
+    // if the number of the range cardinality is less than the desired number of tasks,
+    // we set the the number of tasks to the number of range cardinality.
+    int determinedTaskNum;
+    if (card.compareTo(new BigDecimal(maxNum)) < 0) {
+      LOG.info("The range cardinality (" + card
+          + ") is less then the desired number of tasks (" + maxNum + ")");
+      determinedTaskNum = card.intValue();
+    } else {
+      determinedTaskNum = maxNum;
+    }
+
+    LOG.info("Try to divide " + mergedRange + " into " + determinedTaskNum +
+        " sub ranges (total units: " + determinedTaskNum + ")");
+    TupleRange [] ranges = partitioner.partition(determinedTaskNum);
+
+    FileFragment dummyFragment = new FileFragment(scan.getTableName(), tablePath, 0, 0, new String[]{UNKNOWN_HOST});
+    SubQuery.scheduleFragment(subQuery, dummyFragment);
+
+    List<String> basicFetchURIs = new ArrayList<String>();
+    List<ExecutionBlock> childBlocks = masterPlan.getChilds(subQuery.getId());
+    for (ExecutionBlock childBlock : childBlocks) {
+      SubQuery childExecSM = subQuery.getContext().getSubQuery(childBlock.getId());
+      for (QueryUnit qu : childExecSM.getQueryUnits()) {
+        for (IntermediateEntry p : qu.getIntermediateData()) {
+          String uri = createBasicFetchUri(p.getPullHost(), p.getPullPort(), childBlock.getId(), p.taskId, p.attemptId);
+          basicFetchURIs.add(uri);
+        }
+      }
+    }
+
+    boolean ascendingFirstKey = sortSpecs[0].isAscending();
+    SortedMap<TupleRange, Collection<URI>> map;
+    if (ascendingFirstKey) {
+      map = new TreeMap<TupleRange, Collection<URI>>();
+    } else {
+      map = new TreeMap<TupleRange, Collection<URI>>(new TupleRange.DescendingTupleRangeComparator());
+    }
+
+    Set<URI> uris;
+    try {
+      RowStoreUtil.RowStoreEncoder encoder = RowStoreUtil.createEncoder(sortSchema);
+      for (int i = 0; i < ranges.length; i++) {
+        uris = new HashSet<URI>();
+        for (String uri: basicFetchURIs) {
+          String rangeParam =
+              TupleUtil.rangeToQuery(ranges[i], ascendingFirstKey ? i == (ranges.length - 1) : i == 0, encoder);
+          URI finalUri = URI.create(uri + "&" + rangeParam);
+          uris.add(finalUri);
+        }
+        map.put(ranges[i], uris);
+      }
+
+    } catch (UnsupportedEncodingException e) {
+      LOG.error(e);
+    }
+
+    scheduleFetchesByRoundRobin(subQuery, map, scan.getTableName(), determinedTaskNum);
+
+    schedulerContext.setEstimatedTaskNum(determinedTaskNum);
+  }
+
+  public static void scheduleFetchesByRoundRobin(SubQuery subQuery, Map<?, Collection<URI>> partitions,
+                                                   String tableName, int num) {
+    int i;
+    Map<String, List<URI>>[] fetchesArray = new Map[num];
+    for (i = 0; i < num; i++) {
+      fetchesArray[i] = new HashMap<String, List<URI>>();
+    }
+    i = 0;
+    for (Entry<?, Collection<URI>> entry : partitions.entrySet()) {
+      Collection<URI> value = entry.getValue();
+      TUtil.putCollectionToNestedList(fetchesArray[i++], tableName, value);
+      if (i == num) i = 0;
+    }
+    for (Map<String, List<URI>> eachFetches : fetchesArray) {
+      SubQuery.scheduleFetches(subQuery, eachFetches);
+    }
+  }
+
+  public static String createBasicFetchUri(String hostName, int port,
+                                           ExecutionBlockId childSid,
+                                           int taskId, int attemptId) {
+    String scheme = "http://";
+    StringBuilder sb = new StringBuilder(scheme);
+    sb.append(hostName).append(":").append(port).append("/?")
+        .append("qid=").append(childSid.getQueryId().toString())
+        .append("&sid=").append(childSid.getId())
+        .append("&").append("ta=").append(taskId).append("_").append(attemptId)
+        .append("&").append("p=0")
+        .append("&").append("type=r");
+
+    return sb.toString();
+  }
+
+  public static void scheduleHashShuffledFetches(TaskSchedulerContext schedulerContext, MasterPlan masterPlan,
+                                                 SubQuery subQuery, DataChannel channel,
+                                                 int maxNum) {
+    ExecutionBlock execBlock = subQuery.getBlock();
+    TableStats totalStat = computeChildBlocksStats(subQuery.getContext(), masterPlan, subQuery.getId());
+
+    if (totalStat.getNumRows() == 0) {
+      return;
+    }
+
+    ScanNode scan = execBlock.getScanNodes()[0];
+    Path tablePath;
+    tablePath = subQuery.getContext().getStorageManager().getTablePath(scan.getTableName());
+
+    FileFragment frag = new FileFragment(scan.getCanonicalName(), tablePath, 0, 0, new String[]{UNKNOWN_HOST});
+    List<FileFragment> fragments = new ArrayList<FileFragment>();
+    fragments.add(frag);
+    SubQuery.scheduleFragments(subQuery, fragments);
+
+    Map<String, List<IntermediateEntry>> hashedByHost;
+    Map<Integer, Collection<URI>> finalFetchURI = new HashMap<Integer, Collection<URI>>();
+
+    for (ExecutionBlock block : masterPlan.getChilds(execBlock)) {
+      List<IntermediateEntry> partitions = new ArrayList<IntermediateEntry>();
+      for (QueryUnit tasks : subQuery.getContext().getSubQuery(block.getId()).getQueryUnits()) {
+        if (tasks.getIntermediateData() != null) {
+          partitions.addAll(tasks.getIntermediateData());
+        }
+      }
+      Map<Integer, List<IntermediateEntry>> hashed = hashByKey(partitions);
+      for (Entry<Integer, List<IntermediateEntry>> interm : hashed.entrySet()) {
+        hashedByHost = hashByHost(interm.getValue());
+        for (Entry<String, List<IntermediateEntry>> e : hashedByHost.entrySet()) {
+          Collection<URI> uris = createHashFetchURL(e.getKey(), block.getId(),
+              interm.getKey(), channel.getShuffleType(), e.getValue());
+
+          if (finalFetchURI.containsKey(interm.getKey())) {
+            finalFetchURI.get(interm.getKey()).addAll(uris);
+          } else {
+            finalFetchURI.put(interm.getKey(), TUtil.newList(uris));
+          }
+        }
+      }
+    }
+
+    GroupbyNode groupby = PlannerUtil.findMostBottomNode(subQuery.getBlock().getPlan(), NodeType.GROUP_BY);
+    // get a proper number of tasks
+    int determinedTaskNum = Math.min(maxNum, finalFetchURI.size());
+    LOG.info(subQuery.getId() + ", ScheduleHashShuffledFetches - Max num=" + maxNum + ", finalFetchURI=" + finalFetchURI.size());
+    if (groupby != null && groupby.getGroupingColumns().length == 0) {
+      determinedTaskNum = 1;
+      LOG.info(subQuery.getId() + ", No Grouping Column - determinedTaskNum is set to 1");
+    }
+
+    // set the proper number of tasks to the estimated task num
+    schedulerContext.setEstimatedTaskNum(determinedTaskNum);
+    // divide fetch uris into the the proper number of tasks in a round robin manner.
+    scheduleFetchesByRoundRobin(subQuery, finalFetchURI, scan.getTableName(), determinedTaskNum);
+    LOG.info(subQuery.getId() + ", DeterminedTaskNum : " + determinedTaskNum);
+  }
+
+  public static Collection<URI> createHashFetchURL(String hostAndPort, ExecutionBlockId ebid,
+                                       int partitionId, ShuffleType type, List<IntermediateEntry> entries) {
+    String scheme = "http://";
+    StringBuilder urlPrefix = new StringBuilder(scheme);
+    urlPrefix.append(hostAndPort).append("/?")
+        .append("qid=").append(ebid.getQueryId().toString())
+        .append("&sid=").append(ebid.getId())
+        .append("&p=").append(partitionId)
+        .append("&type=");
+    if (type == HASH_SHUFFLE) {
+      urlPrefix.append("h");
+    } else if (type == RANGE_SHUFFLE) {
+      urlPrefix.append("r");
+    }
+    urlPrefix.append("&ta=");
+
+    // If the get request is longer than 2000 characters,
+    // the long request uri may cause HTTP Status Code - 414 Request-URI Too Long.
+    // Refer to http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html#sec10.4.15
+    // The below code transforms a long request to multiple requests.
+    List<String> taskIdsParams = new ArrayList<String>();
+    boolean first = true;
+    StringBuilder taskIdListBuilder = new StringBuilder();
+    for (IntermediateEntry entry: entries) {
+      StringBuilder taskAttemptId = new StringBuilder();
+
+      if (!first) { // when comma is added?
+        taskAttemptId.append(",");
+      } else {
+        first = false;
+      }
+
+      taskAttemptId.append(entry.getTaskId()).append("_").
+          append(entry.getAttemptId());
+      if (taskIdListBuilder.length() + taskAttemptId.length()
+          > HTTP_REQUEST_MAXIMUM_LENGTH) {
+        taskIdsParams.add(taskIdListBuilder.toString());
+        taskIdListBuilder = new StringBuilder(entry.getTaskId() + "_" + entry.getAttemptId());
+      } else {
+        taskIdListBuilder.append(taskAttemptId);
+      }
+    }
+
+    // if the url params remain
+    if (taskIdListBuilder.length() > 0) {
+      taskIdsParams.add(taskIdListBuilder.toString());
+    }
+
+    Collection<URI> fetchURLs = new ArrayList<URI>();
+    for (String param : taskIdsParams) {
+      fetchURLs.add(URI.create(urlPrefix + param));
+    }
+
+    return fetchURLs;
+  }
+
+  public static Map<Integer, List<IntermediateEntry>> hashByKey(List<IntermediateEntry> entries) {
+    Map<Integer, List<IntermediateEntry>> hashed = new HashMap<Integer, List<IntermediateEntry>>();
+    for (IntermediateEntry entry : entries) {
+      if (hashed.containsKey(entry.getPartId())) {
+        hashed.get(entry.getPartId()).add(entry);
+      } else {
+        hashed.put(entry.getPartId(), TUtil.newList(entry));
+      }
+    }
+
+    return hashed;
+  }
+
+  public static Map<String, List<IntermediateEntry>> hashByHost(List<IntermediateEntry> entries) {
+    Map<String, List<IntermediateEntry>> hashed = new HashMap<String, List<IntermediateEntry>>();
+
+    String hostName;
+    for (IntermediateEntry entry : entries) {
+      hostName = entry.getPullHost() + ":" + entry.getPullPort();
+      if (hashed.containsKey(hostName)) {
+        hashed.get(hostName).add(entry);
+      } else {
+        hashed.put(hostName, TUtil.newList(entry));
+      }
+    }
+
+    return hashed;
+  }
+
+  public static SubQuery setShuffleOutputNumForTwoPhase(SubQuery subQuery, final int desiredNum, DataChannel channel) {
+    ExecutionBlock execBlock = subQuery.getBlock();
+    Column[] keys;
+    // if the next query is join,
+    // set the partition number for the current logicalUnit
+    // TODO: the union handling is required when a join has unions as its child
+    MasterPlan masterPlan = subQuery.getMasterPlan();
+    keys = channel.getShuffleKeys();
+    if (!masterPlan.isRoot(subQuery.getBlock()) ) {
+      ExecutionBlock parentBlock = masterPlan.getParent(subQuery.getBlock());
+      if (parentBlock.getPlan().getType() == NodeType.JOIN) {
+        channel.setShuffleOutputNum(desiredNum);
+      }
+    }
+
+    // set the partition number for group by and sort
+    if (channel.getShuffleType() == HASH_SHUFFLE) {
+      if (execBlock.getPlan().getType() == NodeType.GROUP_BY) {
+        keys = channel.getShuffleKeys();
+      }
+    } else if (channel.getShuffleType() == RANGE_SHUFFLE) {
+      if (execBlock.getPlan().getType() == NodeType.SORT) {
+        SortNode sort = (SortNode) execBlock.getPlan();
+        keys = new Column[sort.getSortKeys().length];
+        for (int i = 0; i < keys.length; i++) {
+          keys[i] = sort.getSortKeys()[i].getSortKey();
+        }
+      }
+    }
+    if (keys != null) {
+      if (keys.length == 0) {
+        channel.setShuffleKeys(new Column[]{});
+        channel.setShuffleOutputNum(1);
+      } else {
+        channel.setShuffleKeys(keys);
+        channel.setShuffleOutputNum(desiredNum);
+      }
+    }
+    return subQuery;
+  }
+}


Mime
View raw message