tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hj...@apache.org
Subject [03/29] tajo git commit: TAJO-1166: S3 related storage causes compilation error in Hadoop 2.6.0-SNAPSHOT. (jaehwa)
Date Fri, 05 Dec 2014 08:21:07 GMT
TAJO-1166: S3 related storage causes compilation error in Hadoop 2.6.0-SNAPSHOT. (jaehwa)

Closes #262


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/3d485ecb
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/3d485ecb
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/3d485ecb

Branch: refs/heads/hbase_storage
Commit: 3d485ecb0112af12258a5a2bdc4a400b8df4fae8
Parents: 0c97fc0
Author: JaeHwa Jung <blrunner@apache.org>
Authored: Thu Nov 27 18:42:24 2014 +0900
Committer: JaeHwa Jung <blrunner@apache.org>
Committed: Thu Nov 27 18:45:18 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 tajo-core/pom.xml                               |   1 +
 .../org/apache/tajo/master/ContainerProxy.java  |  10 +-
 .../tajo/master/DefaultTaskScheduler.java       |  18 +-
 .../tajo/master/LaunchTaskRunnersEvent.java     |   5 +-
 .../apache/tajo/master/LazyTaskScheduler.java   |   9 +-
 .../apache/tajo/master/TajoContainerProxy.java  |  22 +-
 .../apache/tajo/master/TajoMasterService.java   |   5 +-
 .../tajo/master/TaskRunnerGroupEvent.java       |   8 +-
 .../tajo/master/container/TajoContainer.java    | 173 ++++++++++
 .../tajo/master/container/TajoContainerId.java  | 172 ++++++++++
 .../master/container/TajoConverterUtils.java    | 263 ++++++++++++++++
 .../master/container/TajoRecordFactory.java     |  31 ++
 .../container/TajoRecordFactoryPBImpl.java      | 104 ++++++
 .../container/TajoRecordFactoryProvider.java    |  70 +++++
 .../tajo/master/container/TajoRecords.java      |  39 +++
 .../impl/pb/TajoContainerIdPBImpl.java          | 100 ++++++
 .../tajo/master/event/LocalTaskEvent.java       |   9 +-
 .../event/QueryUnitAttemptScheduleEvent.java    |  10 +-
 .../event/SubQueryContainerAllocationEvent.java |   8 +-
 .../master/event/TaskAttemptAssignedEvent.java  |   8 +-
 .../tajo/master/event/TaskRequestEvent.java     |   8 +-
 .../master/querymaster/QueryInProgress.java     |   3 +-
 .../querymaster/QueryMasterManagerService.java  |   4 +-
 .../master/querymaster/QueryUnitAttempt.java    |   6 +-
 .../tajo/master/querymaster/SubQuery.java       |  17 +-
 .../apache/tajo/master/rm/TajoRMContext.java    |   8 +-
 .../tajo/master/rm/TajoWorkerContainer.java     |  15 +-
 .../tajo/master/rm/TajoWorkerContainerId.java   |  53 ++--
 .../master/rm/TajoWorkerResourceManager.java    |  63 ++--
 .../tajo/master/rm/WorkerResourceManager.java   |   4 +-
 .../tajo/worker/AbstractResourceAllocator.java  |  14 +-
 .../apache/tajo/worker/ResourceAllocator.java   |   6 +-
 .../tajo/worker/TajoResourceAllocator.java      | 101 +++---
 .../java/org/apache/tajo/worker/TaskRunner.java |  16 +-
 .../apache/tajo/worker/TaskRunnerHistory.java   |  14 +-
 .../src/main/proto/ContainerProtocol.proto      |  48 +++
 .../src/main/proto/QueryMasterProtocol.proto    |   3 +
 .../main/proto/ResourceTrackerProtocol.proto    |   3 +
 .../src/main/proto/TajoMasterProtocol.proto     |   7 +-
 .../src/main/proto/TajoWorkerProtocol.proto     |   5 +-
 .../tajo/master/rm/TestTajoResourceManager.java |  16 +-
 .../org/apache/tajo/storage/StorageManager.java |   1 +
 .../apache/tajo/storage/TestFileSystems.java    | 267 ++++++++--------
 .../java/org/apache/tajo/storage/s3/INode.java  | 124 --------
 .../storage/s3/InMemoryFileSystemStore.java     | 175 -----------
 .../apache/tajo/storage/s3/S3OutputStream.java  | 234 --------------
 .../tajo/storage/s3/SmallBlockS3FileSystem.java | 314 -------------------
 48 files changed, 1392 insertions(+), 1204 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 6521c1c..1657c20 100644
--- a/CHANGES
+++ b/CHANGES
@@ -68,6 +68,8 @@ Release 0.9.1 - unreleased
 
   BUG FIXES
 
+    TAJO-1166: S3 related storage causes compilation error in Hadoop 2.6.0-SNAPSHOT. (jaehwa)
+
     TAJO-1208: Failure of create table using textfile on hivemeta.
     (jinho)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-core/pom.xml b/tajo-core/pom.xml
index fce96e4..060ac1b 100644
--- a/tajo-core/pom.xml
+++ b/tajo-core/pom.xml
@@ -162,6 +162,7 @@
                 <argument>--proto_path=../tajo-client/src/main/proto</argument>
                 <argument>--proto_path=../tajo-plan/src/main/proto</argument>
                 <argument>--java_out=target/generated-sources/proto</argument>
+                <argument>src/main/proto/ContainerProtocol.proto</argument>
                 <argument>src/main/proto/ResourceTrackerProtocol.proto</argument>
                 <argument>src/main/proto/QueryMasterProtocol.proto</argument>
                 <argument>src/main/proto/TajoMasterProtocol.proto</argument>

http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/master/ContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/ContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/ContainerProxy.java
index 59b071a..462de91 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/ContainerProxy.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/ContainerProxy.java
@@ -22,11 +22,11 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.master.querymaster.QueryMasterTask;
+import org.apache.tajo.master.container.TajoContainer;
+import org.apache.tajo.master.container.TajoContainerId;
 
 public abstract class ContainerProxy {
   protected static final Log LOG = LogFactory.getLog(ContainerProxy.class);
@@ -45,8 +45,8 @@ public abstract class ContainerProxy {
 
   protected ContainerState state;
   // store enough information to be able to cleanup the container
-  protected Container container;
-  protected ContainerId containerID;
+  protected TajoContainer container;
+  protected TajoContainerId containerID;
   protected String hostName;
   protected int port = -1;
 
@@ -54,7 +54,7 @@ public abstract class ContainerProxy {
   public abstract void stopContainer();
 
   public ContainerProxy(QueryMasterTask.QueryMasterTaskContext context, Configuration conf,
-                        ExecutionBlockId executionBlockId, Container container) {
+                        ExecutionBlockId executionBlockId, TajoContainer container) {
     this.context = context;
     this.conf = conf;
     this.state = ContainerState.PREP;

http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
index 62d4892..77e3257 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
@@ -41,6 +41,7 @@ import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
 import org.apache.tajo.master.querymaster.QueryUnit;
 import org.apache.tajo.master.querymaster.QueryUnitAttempt;
 import org.apache.tajo.master.querymaster.SubQuery;
+import org.apache.tajo.master.container.TajoContainerId;
 import org.apache.tajo.storage.DataLocation;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.util.NetUtils;
@@ -338,7 +339,8 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
     private Map<Integer, LinkedHashSet<QueryUnitAttempt>> unassignedTaskForEachVolume =
         Collections.synchronizedMap(new HashMap<Integer, LinkedHashSet<QueryUnitAttempt>>());
     /** A value is last assigned volume id for each task runner */
-    private HashMap<ContainerId, Integer> lastAssignedVolumeId = new HashMap<ContainerId, Integer>();
+    private HashMap<TajoContainerId, Integer> lastAssignedVolumeId = new HashMap<TajoContainerId,
+      Integer>();
     /**
      * A key is disk volume id, and a value is the load of this volume.
      * This load is measured by counting how many number of tasks are running.
@@ -378,7 +380,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
      *  2. unknown block or Non-splittable task in host
      *  3. remote tasks. unassignedTaskForEachVolume is only contained local task. so it will be null
      */
-    public synchronized QueryUnitAttemptId getLocalTask(ContainerId containerId) {
+    public synchronized QueryUnitAttemptId getLocalTask(TajoContainerId containerId) {
       int volumeId;
       QueryUnitAttemptId queryUnitAttemptId = null;
 
@@ -489,7 +491,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
      * @param volumeId Volume identifier
      * @return the volume load (i.e., how many running tasks use this volume)
      */
-    private synchronized int increaseConcurrency(ContainerId containerId, int volumeId) {
+    private synchronized int increaseConcurrency(TajoContainerId containerId, int volumeId) {
 
       int concurrency = 1;
       if (diskVolumeLoads.containsKey(volumeId)) {
@@ -514,7 +516,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
     /**
      * Decrease the count of running tasks of a certain task runner
      */
-    private synchronized void decreaseConcurrency(ContainerId containerId){
+    private synchronized void decreaseConcurrency(TajoContainerId containerId){
       Integer volumeId = lastAssignedVolumeId.get(containerId);
       if(volumeId != null && diskVolumeLoads.containsKey(volumeId)){
         Integer concurrency = diskVolumeLoads.get(volumeId);
@@ -552,11 +554,11 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
       }
     }
 
-    public boolean isAssigned(ContainerId containerId){
+    public boolean isAssigned(TajoContainerId containerId){
       return lastAssignedVolumeId.containsKey(containerId);
     }
 
-    public boolean isRemote(ContainerId containerId){
+    public boolean isRemote(TajoContainerId containerId){
       Integer volumeId = lastAssignedVolumeId.get(containerId);
       if(volumeId == null || volumeId > REMOTE){
         return false;
@@ -647,7 +649,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
 
     public Set<QueryUnitAttemptId> assignedRequest = new HashSet<QueryUnitAttemptId>();
 
-    private QueryUnitAttemptId allocateLocalTask(String host, ContainerId containerId){
+    private QueryUnitAttemptId allocateLocalTask(String host, TajoContainerId containerId){
       HostVolumeMapping hostVolumeMapping = leafTaskHostMapping.get(host);
 
       if (hostVolumeMapping != null) { //tajo host is located in hadoop datanode
@@ -778,7 +780,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
           }
         }
 
-        ContainerId containerId = taskRequest.getContainerId();
+        TajoContainerId containerId = taskRequest.getContainerId();
         LOG.debug("assignToLeafTasks: " + taskRequest.getExecutionBlockId() + "," +
             "containerId=" + containerId);
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/master/LaunchTaskRunnersEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/LaunchTaskRunnersEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/LaunchTaskRunnersEvent.java
index 9a4a01d..e620afa 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/LaunchTaskRunnersEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/LaunchTaskRunnersEvent.java
@@ -18,9 +18,9 @@
 
 package org.apache.tajo.master;
 
-import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.master.container.TajoContainer;
 
 import java.util.Collection;
 
@@ -29,7 +29,8 @@ public class LaunchTaskRunnersEvent extends TaskRunnerGroupEvent {
   private final String planJson;
 
   public LaunchTaskRunnersEvent(ExecutionBlockId executionBlockId,
-                                Collection<Container> containers, QueryContext queryContext, String planJson) {
+                                Collection<TajoContainer> containers, QueryContext queryContext,
+                                String planJson) {
     super(EventType.CONTAINER_REMOTE_LAUNCH, executionBlockId, containers);
     this.queryContext = queryContext;
     this.planJson = planJson;

http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
index f7953e0..b2883cc 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
@@ -21,7 +21,6 @@ package org.apache.tajo.master;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.QueryIdFactory;
@@ -38,6 +37,7 @@ import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
 import org.apache.tajo.master.querymaster.QueryUnit;
 import org.apache.tajo.master.querymaster.QueryUnitAttempt;
 import org.apache.tajo.master.querymaster.SubQuery;
+import org.apache.tajo.master.container.TajoContainerId;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.util.NetUtils;
 import org.apache.tajo.worker.FetchImpl;
@@ -246,7 +246,8 @@ public class LazyTaskScheduler extends AbstractTaskScheduler {
   }
 
   private static class DiskBalancer {
-    private HashMap<ContainerId, Integer> containerDiskMap = new HashMap<ContainerId, Integer>();
+    private HashMap<TajoContainerId, Integer> containerDiskMap = new HashMap<TajoContainerId,
+      Integer>();
     private HashMap<Integer, Integer> diskReferMap = new HashMap<Integer, Integer>();
     private String host;
 
@@ -260,7 +261,7 @@ public class LazyTaskScheduler extends AbstractTaskScheduler {
       }
     }
 
-    public Integer getDiskId(ContainerId containerId) {
+    public Integer getDiskId(TajoContainerId containerId) {
       if (!containerDiskMap.containsKey(containerId)) {
         assignVolumeId(containerId);
       }
@@ -268,7 +269,7 @@ public class LazyTaskScheduler extends AbstractTaskScheduler {
       return containerDiskMap.get(containerId);
     }
 
-    public void assignVolumeId(ContainerId containerId){
+    public void assignVolumeId(TajoContainerId containerId){
       Map.Entry<Integer, Integer> volumeEntry = null;
 
       for (Map.Entry<Integer, Integer> entry : diskReferMap.entrySet()) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
index c236c20..158316e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
@@ -20,17 +20,17 @@ package org.apache.tajo.master;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.proto.YarnProtos;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.QueryUnitAttemptId;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.ipc.ContainerProtocol;
 import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.master.querymaster.QueryMasterTask;
+import org.apache.tajo.master.container.TajoContainer;
+import org.apache.tajo.master.container.TajoContainerId;
 import org.apache.tajo.master.rm.TajoWorkerContainer;
 import org.apache.tajo.master.rm.TajoWorkerContainerId;
 import org.apache.tajo.rpc.NettyClientBase;
@@ -47,7 +47,7 @@ public class TajoContainerProxy extends ContainerProxy {
   private final String planJson;
 
   public TajoContainerProxy(QueryMasterTask.QueryMasterTaskContext context,
-                            Configuration conf, Container container,
+                            Configuration conf, TajoContainer container,
                             QueryContext queryContext, ExecutionBlockId executionBlockId, String planJson) {
     super(context, conf, executionBlockId, container);
     this.queryContext = queryContext;
@@ -89,7 +89,7 @@ public class TajoContainerProxy extends ContainerProxy {
     }
   }
 
-  private void assignExecutionBlock(ExecutionBlockId executionBlockId, Container container) {
+  private void assignExecutionBlock(ExecutionBlockId executionBlockId, TajoContainer container) {
     NettyClientBase tajoWorkerRpc = null;
     try {
       InetSocketAddress myAddr= context.getQueryMasterContext().getWorkerContext()
@@ -149,8 +149,8 @@ public class TajoContainerProxy extends ContainerProxy {
 
   public static void releaseWorkerResource(QueryMasterTask.QueryMasterTaskContext context,
                                            ExecutionBlockId executionBlockId,
-                                           ContainerId containerId) throws Exception {
-    List<ContainerId> containerIds = new ArrayList<ContainerId>();
+                                           TajoContainerId containerId) throws Exception {
+    List<TajoContainerId> containerIds = new ArrayList<TajoContainerId>();
     containerIds.add(containerId);
 
     releaseWorkerResource(context, executionBlockId, containerIds);
@@ -158,11 +158,11 @@ public class TajoContainerProxy extends ContainerProxy {
 
   public static void releaseWorkerResource(QueryMasterTask.QueryMasterTaskContext context,
                                            ExecutionBlockId executionBlockId,
-                                           List<ContainerId> containerIds) throws Exception {
-    List<YarnProtos.ContainerIdProto> containerIdProtos =
-        new ArrayList<YarnProtos.ContainerIdProto>();
+                                           List<TajoContainerId> containerIds) throws Exception {
+    List<ContainerProtocol.TajoContainerIdProto> containerIdProtos =
+        new ArrayList<ContainerProtocol.TajoContainerIdProto>();
 
-    for(ContainerId eachContainerId: containerIds) {
+    for(TajoContainerId eachContainerId: containerIds) {
       containerIdProtos.add(TajoWorkerContainerId.getContainerIdProto(eachContainerId));
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java
index ddf24d3..1e3501c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.TajoIdProtos;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.ContainerProtocol;
 import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
 import org.apache.tajo.master.querymaster.QueryJobManager;
@@ -128,9 +129,9 @@ public class TajoMasterService extends AbstractService {
     public void releaseWorkerResource(RpcController controller,
                                            TajoMasterProtocol.WorkerResourceReleaseRequest request,
                                            RpcCallback<PrimitiveProtos.BoolProto> done) {
-      List<YarnProtos.ContainerIdProto> containerIds = request.getContainerIdsList();
+      List<ContainerProtocol.TajoContainerIdProto> containerIds = request.getContainerIdsList();
 
-      for(YarnProtos.ContainerIdProto eachContainer: containerIds) {
+      for(ContainerProtocol.TajoContainerIdProto eachContainer: containerIds) {
         context.getResourceManager().releaseWorkerResource(eachContainer);
       }
       done.run(BOOL_TRUE);

http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/master/TaskRunnerGroupEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TaskRunnerGroupEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/TaskRunnerGroupEvent.java
index 1e6655c..c1c6522 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TaskRunnerGroupEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TaskRunnerGroupEvent.java
@@ -18,10 +18,10 @@
 
 package org.apache.tajo.master;
 
-import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.event.AbstractEvent;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.master.TaskRunnerGroupEvent.EventType;
+import org.apache.tajo.master.container.TajoContainer;
 
 import java.util.Collection;
 
@@ -32,16 +32,16 @@ public class TaskRunnerGroupEvent extends AbstractEvent<EventType> {
   }
 
   protected final ExecutionBlockId executionBlockId;
-  protected final Collection<Container> containers;
+  protected final Collection<TajoContainer> containers;
   public TaskRunnerGroupEvent(EventType eventType,
                               ExecutionBlockId executionBlockId,
-                              Collection<Container> containers) {
+                              Collection<TajoContainer> containers) {
     super(eventType);
     this.executionBlockId = executionBlockId;
     this.containers = containers;
   }
 
-  public Collection<Container> getContainers() {
+  public Collection<TajoContainer> getContainers() {
     return containers;
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/master/container/TajoContainer.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/container/TajoContainer.java b/tajo-core/src/main/java/org/apache/tajo/master/container/TajoContainer.java
new file mode 100644
index 0000000..77562b5
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/container/TajoContainer.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.container;
+
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * This class is borrowed from the following source code :
+ * ${hadoop-yarn-api}/src/main/java/org/apache/hadoop/yarn/api/records/Container.java
+ *
+ * <p><code>TajoContainer</code> represents an allocated resource in the cluster.
+ * </p>
+ *
+ * <p>The <code>ResourceManager</code> is the sole authority to allocate any
+ * <code>TajoContainer</code> to applications. The allocated <code>TajoContainer</code>
+ * is always on a single node and has a unique {@link org.apache.tajo.master.container.TajoContainerId}. It has
+ * a specific amount of {@link org.apache.hadoop.yarn.api.records.Resource} allocated.</p>
+ *
+ * <p>It includes details such as:
+ *   <ul>
+ *     <li>{@link org.apache.tajo.master.container.TajoContainerId} for the container, which is globally unique.</li>
+ *     <li>
+ *       {@link org.apache.hadoop.yarn.api.records.NodeId} of the node on which it is allocated.
+ *     </li>
+ *     <li>HTTP uri of the node.</li>
+ *     <li>{@link org.apache.hadoop.yarn.api.records.Resource} allocated to the container.</li>
+ *     <li>{@link org.apache.hadoop.yarn.api.records.Priority} at which the container was allocated.</li>
+ *     <li>
+ *       TajoContainer {@link org.apache.hadoop.yarn.api.records.Token} of the container, used to securely verify
+ *       authenticity of the allocation. 
+ *     </li>
+ *   </ul>
+ * </p>
+ *
+ * <p>Typically, an <code>ApplicationMaster</code> receives the 
+ * <code>TajoContainer</code> from the <code>ResourceManager</code> during
+ * resource-negotiation and then talks to the <code>NodeManager</code> to 
+ * start/stop containers.</p>
+ *
+ * @see ApplicationMasterProtocol#allocate(org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest)
+ * @see org.apache.hadoop.yarn.api.ContainerManagementProtocol#startContainers(org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest)
+ * @see org.apache.hadoop.yarn.api.ContainerManagementProtocol#stopContainers(org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest)
+ */
+@Public
+@Stable
+public abstract class TajoContainer implements Comparable<TajoContainer> {
+
+  @Private
+  @Unstable
+  public static TajoContainer newInstance(TajoContainerId containerId, NodeId nodeId,
+                                      String nodeHttpAddress, Resource resource, Priority priority,
+                                      Token containerToken) {
+    TajoContainer container = Records.newRecord(TajoContainer.class);
+    container.setId(containerId);
+    container.setNodeId(nodeId);
+    container.setNodeHttpAddress(nodeHttpAddress);
+    container.setResource(resource);
+    container.setPriority(priority);
+    container.setContainerToken(containerToken);
+    return container;
+  }
+
+  /**
+   * Get the globally unique identifier for the container.
+   * @return globally unique identifier for the container
+   */
+  @Public
+  @Stable
+  public abstract TajoContainerId getId();
+
+  @Private
+  @Unstable
+  public abstract void setId(TajoContainerId id);
+
+  /**
+   * Get the identifier of the node on which the container is allocated.
+   * @return identifier of the node on which the container is allocated
+   */
+  @Public
+  @Stable
+  public abstract NodeId getNodeId();
+
+  @Private
+  @Unstable
+  public abstract void setNodeId(NodeId nodeId);
+
+  /**
+   * Get the http uri of the node on which the container is allocated.
+   * @return http uri of the node on which the container is allocated
+   */
+  @Public
+  @Stable
+  public abstract String getNodeHttpAddress();
+
+  @Private
+  @Unstable
+  public abstract void setNodeHttpAddress(String nodeHttpAddress);
+
+  /**
+   * Get the <code>Resource</code> allocated to the container.
+   * @return <code>Resource</code> allocated to the container
+   */
+  @Public
+  @Stable
+  public abstract Resource getResource();
+
+  @Private
+  @Unstable
+  public abstract void setResource(Resource resource);
+
+  /**
+   * Get the <code>Priority</code> at which the <code>TajoContainer</code> was
+   * allocated.
+   * @return <code>Priority</code> at which the <code>TajoContainer</code> was
+   *         allocated
+   */
+  @Public
+  @Stable
+  public abstract Priority getPriority();
+
+  @Private
+  @Unstable
+  public abstract void setPriority(Priority priority);
+
+  /**
+   * Get the <code>TajoContainerToken</code> for the container.
+   * <p><code>TajoContainerToken</code> is the security token used by the framework
+   * to verify authenticity of any <code>TajoContainer</code>.</p>
+   *
+   * <p>The <code>ResourceManager</code>, on container allocation provides a
+   * secure token which is verified by the <code>NodeManager</code> on
+   * container launch.</p>
+   *
+   * <p>Applications do not need to care about <code>TajoContainerToken</code>, they
+   * are transparently handled by the framework - the allocated
+   * <code>TajoContainer</code> includes the <code>TajoContainerToken</code>.</p>
+   *
+   * @see ApplicationMasterProtocol#allocate(org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest)
+   * @see org.apache.hadoop.yarn.api.ContainerManagementProtocol#startContainers(org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest)
+   *
+   * @return <code>TajoContainerToken</code> for the container
+   */
+  @Public
+  @Stable
+  public abstract Token getContainerToken();
+
+  @Private
+  @Unstable
+  public abstract void setContainerToken(Token containerToken);
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/master/container/TajoContainerId.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/container/TajoContainerId.java b/tajo-core/src/main/java/org/apache/tajo/master/container/TajoContainerId.java
new file mode 100644
index 0000000..0de5fe0
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/container/TajoContainerId.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.container;
+
+import java.text.NumberFormat;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * This class is borrowed from the following source code :
+ * ${hadoop-yarn-api}/src/main/java/org/apache/hadoop/yarn/api/records/ContainerId.java
+ *
+ * <p><code>TajoContainerId</code> represents a globally unique identifier
+ * for a {@link org.apache.tajo.master.container.TajoContainer} in the cluster.</p>
+ */
+@Public
+@Stable
+public abstract class TajoContainerId implements Comparable<TajoContainerId>{
+
+  @Private
+  @Unstable
+  public static TajoContainerId newInstance(ApplicationAttemptId appAttemptId,
+                                        int containerId) {
+    TajoContainerId id = TajoRecords.newRecord(TajoContainerId.class);
+    id.setId(containerId);
+    id.setApplicationAttemptId(appAttemptId);
+    id.build();
+    return id;
+  }
+
+  /**
+   * Get the <code>ApplicationAttemptId</code> of the application to which the
+   * <code>Container</code> was assigned.
+   * <p>
+   * Note: If containers are kept alive across application attempts via
+   * {@link org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext#setKeepContainersAcrossApplicationAttempts(boolean)}
+   * the <code>TajoContainerId</code> does not necessarily contain the current
+   * running application attempt's <code>ApplicationAttemptId</code> This
+   * container can be allocated by previously exited application attempt and
+   * managed by the current running attempt thus have the previous application
+   * attempt's <code>ApplicationAttemptId</code>.
+   * </p>
+   *
+   * @return <code>ApplicationAttemptId</code> of the application to which the
+   *         <code>Container</code> was assigned
+   */
+  @Public
+  @Stable
+  public abstract ApplicationAttemptId getApplicationAttemptId();
+
+  @Private
+  @Unstable
+  protected abstract void setApplicationAttemptId(ApplicationAttemptId atId);
+
+  /**
+   * Get the identifier of the <code>TajoContainerId</code>.
+   * @return identifier of the <code>TajoContainerId</code>
+   */
+  @Public
+  @Stable
+  public abstract int getId();
+
+  @Private
+  @Unstable
+  protected abstract void setId(int id);
+
+
+  // TODO: fail the app submission if attempts are more than 10 or something
+  private static final ThreadLocal<NumberFormat> appAttemptIdFormat =
+    new ThreadLocal<NumberFormat>() {
+      @Override
+      public NumberFormat initialValue() {
+        NumberFormat fmt = NumberFormat.getInstance();
+        fmt.setGroupingUsed(false);
+        fmt.setMinimumIntegerDigits(2);
+        return fmt;
+      }
+    };
+  // TODO: Why thread local?
+  // ^ NumberFormat instances are not threadsafe
+  private static final ThreadLocal<NumberFormat> containerIdFormat =
+    new ThreadLocal<NumberFormat>() {
+      @Override
+      public NumberFormat initialValue() {
+        NumberFormat fmt = NumberFormat.getInstance();
+        fmt.setGroupingUsed(false);
+        fmt.setMinimumIntegerDigits(6);
+        return fmt;
+      }
+    };
+
+  @Override
+  public int hashCode() {
+    // Generated by eclipse.
+    final int prime = 435569;
+    int result = 7507;
+    result = prime * result + getId();
+    result = prime * result + getApplicationAttemptId().hashCode();
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    TajoContainerId other = (TajoContainerId) obj;
+    if (!this.getApplicationAttemptId().equals(other.getApplicationAttemptId()))
+      return false;
+    if (this.getId() != other.getId())
+      return false;
+    return true;
+  }
+
+  @Override
+  public int compareTo(TajoContainerId other) {
+    if (this.getApplicationAttemptId().compareTo(
+      other.getApplicationAttemptId()) == 0) {
+      return this.getId() - other.getId();
+    } else {
+      return this.getApplicationAttemptId().compareTo(
+        other.getApplicationAttemptId());
+    }
+
+  }
+
+  @Override
+  public String toString() {
+    NumberFormat fmt = NumberFormat.getInstance();
+    fmt.setGroupingUsed(false);
+    fmt.setMinimumIntegerDigits(4);
+
+    StringBuilder sb = new StringBuilder();
+    sb.append("container_");
+    ApplicationId appId = getApplicationAttemptId().getApplicationId();
+    sb.append(appId.getClusterTimestamp()).append("_");
+    sb.append(fmt.format(appId.getId()))
+      .append("_");
+    sb.append(
+      appAttemptIdFormat.get().format(
+        getApplicationAttemptId().getAttemptId())).append("_");
+    sb.append(containerIdFormat.get().format(getId()));
+    return sb.toString();
+  }
+
+  protected abstract void build();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/master/container/TajoConverterUtils.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/container/TajoConverterUtils.java b/tajo-core/src/main/java/org/apache/tajo/master/container/TajoConverterUtils.java
new file mode 100644
index 0000000..a6db654
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/container/TajoConverterUtils.java
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tajo.master.container;
+
+
+import static org.apache.hadoop.yarn.util.StringHelper._split;
+
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+
+
+/**
+ * This class is borrowed from the following source code :
+ * ${hadoop-yarn-common}/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java
+ *
+ * This class contains a set of utilities which help converting data structures
+ * from/to 'serializableFormat' to/from hadoop/nativejava data structures.
+ *
+ */
+@Private
+public class TajoConverterUtils {
+
+  public static final String APPLICATION_PREFIX = "application";
+  public static final String CONTAINER_PREFIX = "container";
+  public static final String APPLICATION_ATTEMPT_PREFIX = "appattempt";
+
+  /**
+   * return a hadoop path from a given url
+   *
+   * @param url
+   *          url to convert
+   * @return path from {@link URL}
+   * @throws URISyntaxException
+   */
+  public static Path getPathFromYarnURL(URL url) throws URISyntaxException {
+    String scheme = url.getScheme() == null ? "" : url.getScheme();
+
+    String authority = "";
+    if (url.getHost() != null) {
+      authority = url.getHost();
+      if (url.getUserInfo() != null) {
+        authority = url.getUserInfo() + "@" + authority;
+      }
+      if (url.getPort() > 0) {
+        authority += ":" + url.getPort();
+      }
+    }
+
+    return new Path(
+      (new URI(scheme, authority, url.getFile(), null, null)).normalize());
+  }
+
+  /**
+   * change from CharSequence to string for map key and value
+   * @param env map for converting
+   * @return string,string map
+   */
+  public static Map<String, String> convertToString(
+    Map<CharSequence, CharSequence> env) {
+
+    Map<String, String> stringMap = new HashMap<String, String>();
+    for (Entry<CharSequence, CharSequence> entry: env.entrySet()) {
+      stringMap.put(entry.getKey().toString(), entry.getValue().toString());
+    }
+    return stringMap;
+  }
+
+  public static URL getYarnUrlFromPath(Path path) {
+    return getYarnUrlFromURI(path.toUri());
+  }
+
+  public static URL getYarnUrlFromURI(URI uri) {
+    URL url = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(URL.class);
+    if (uri.getHost() != null) {
+      url.setHost(uri.getHost());
+    }
+    if (uri.getUserInfo() != null) {
+      url.setUserInfo(uri.getUserInfo());
+    }
+    url.setPort(uri.getPort());
+    url.setScheme(uri.getScheme());
+    url.setFile(uri.getPath());
+    return url;
+  }
+
+  public static String toString(ApplicationId appId) {
+    return appId.toString();
+  }
+
+  public static ApplicationId toApplicationId(RecordFactory recordFactory,
+                                              String appIdStr) {
+    Iterator<String> it = _split(appIdStr).iterator();
+    it.next(); // prefix. TODO: Validate application prefix
+    return toApplicationId(recordFactory, it);
+  }
+
+  private static ApplicationId toApplicationId(RecordFactory recordFactory,
+                                               Iterator<String> it) {
+    ApplicationId appId = ApplicationId.newInstance(Long.parseLong(it.next()),
+      Integer.parseInt(it.next()));
+    return appId;
+  }
+
+  private static ApplicationAttemptId toApplicationAttemptId(
+    Iterator<String> it) throws NumberFormatException {
+    ApplicationId appId = ApplicationId.newInstance(Long.parseLong(it.next()),
+      Integer.parseInt(it.next()));
+    ApplicationAttemptId appAttemptId =
+      ApplicationAttemptId.newInstance(appId, Integer.parseInt(it.next()));
+    return appAttemptId;
+  }
+
+  private static ApplicationId toApplicationId(
+    Iterator<String> it) throws NumberFormatException {
+    ApplicationId appId = ApplicationId.newInstance(Long.parseLong(it.next()),
+      Integer.parseInt(it.next()));
+    return appId;
+  }
+
+  public static String toString(TajoContainerId cId) {
+    return cId == null ? null : cId.toString();
+  }
+
+  public static NodeId toNodeId(String nodeIdStr) {
+    String[] parts = nodeIdStr.split(":");
+    if (parts.length != 2) {
+      throw new IllegalArgumentException("Invalid NodeId [" + nodeIdStr
+        + "]. Expected host:port");
+    }
+    try {
+      NodeId nodeId =
+        NodeId.newInstance(parts[0], Integer.parseInt(parts[1]));
+      return nodeId;
+    } catch (NumberFormatException e) {
+      throw new IllegalArgumentException("Invalid port: " + parts[1], e);
+    }
+  }
+
+  public static TajoContainerId toTajoContainerId(String containerIdStr) {
+    Iterator<String> it = _split(containerIdStr).iterator();
+    if (!it.next().equals(CONTAINER_PREFIX)) {
+      throw new IllegalArgumentException("Invalid TajoContainerId prefix: "
+        + containerIdStr);
+    }
+    try {
+      ApplicationAttemptId appAttemptID = toApplicationAttemptId(it);
+      TajoContainerId containerId =
+        TajoContainerId.newInstance(appAttemptID, Integer.parseInt(it.next()));
+      return containerId;
+    } catch (NumberFormatException n) {
+      throw new IllegalArgumentException("Invalid TajoContainerId: "
+        + containerIdStr, n);
+    }
+  }
+
+  public static ApplicationAttemptId toApplicationAttemptId(
+    String applicationAttmeptIdStr) {
+    Iterator<String> it = _split(applicationAttmeptIdStr).iterator();
+    if (!it.next().equals(APPLICATION_ATTEMPT_PREFIX)) {
+      throw new IllegalArgumentException("Invalid AppAttemptId prefix: "
+        + applicationAttmeptIdStr);
+    }
+    try {
+      return toApplicationAttemptId(it);
+    } catch (NumberFormatException n) {
+      throw new IllegalArgumentException("Invalid AppAttemptId: "
+        + applicationAttmeptIdStr, n);
+    }
+  }
+
+  public static ApplicationId toApplicationId(
+    String appIdStr) {
+    Iterator<String> it = _split(appIdStr).iterator();
+    if (!it.next().equals(APPLICATION_PREFIX)) {
+      throw new IllegalArgumentException("Invalid ApplicationId prefix: "
+        + appIdStr + ". The valid ApplicationId should start with prefix "
+        + APPLICATION_PREFIX);
+    }
+    try {
+      return toApplicationId(it);
+    } catch (NumberFormatException n) {
+      throw new IllegalArgumentException("Invalid AppAttemptId: "
+        + appIdStr, n);
+    }
+  }
+
+  /**
+   * Convert a protobuf token into a rpc token and set its service. Supposed
+   * to be used for tokens other than RMDelegationToken. For
+   * RMDelegationToken, use
+   * {@link #convertFromYarn(org.apache.hadoop.yarn.api.records.Token,
+   * org.apache.hadoop.io.Text)} instead.
+   *
+   * @param protoToken the yarn token
+   * @param serviceAddr the connect address for the service
+   * @return rpc token
+   */
+  public static <T extends TokenIdentifier> Token<T> convertFromYarn(
+    org.apache.hadoop.yarn.api.records.Token protoToken,
+    InetSocketAddress serviceAddr) {
+    Token<T> token = new Token<T>(protoToken.getIdentifier().array(),
+      protoToken.getPassword().array(),
+      new Text(protoToken.getKind()),
+      new Text(protoToken.getService()));
+    if (serviceAddr != null) {
+      SecurityUtil.setTokenService(token, serviceAddr);
+    }
+    return token;
+  }
+
+  /**
+   * Convert a protobuf token into a rpc token and set its service.
+   *
+   * @param protoToken the yarn token
+   * @param service the service for the token
+   */
+  public static <T extends TokenIdentifier> Token<T> convertFromYarn(
+    org.apache.hadoop.yarn.api.records.Token protoToken,
+    Text service) {
+    Token<T> token = new Token<T>(protoToken.getIdentifier().array(),
+      protoToken.getPassword().array(),
+      new Text(protoToken.getKind()),
+      new Text(protoToken.getService()));
+
+    if (service != null) {
+      token.setService(service);
+    }
+    return token;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/master/container/TajoRecordFactory.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/container/TajoRecordFactory.java b/tajo-core/src/main/java/org/apache/tajo/master/container/TajoRecordFactory.java
new file mode 100644
index 0000000..2fd8697
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/container/TajoRecordFactory.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.container;
+
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * This class is borrowed from the following source code :
+ * ${hadoop-yarn-api}/src/main/java/org/apache/hadoop/yarn/factories/RecordFactory.java
+ *
+ */
+@Unstable
+public interface TajoRecordFactory {
+  public <T> T newRecordInstance(Class<T> clazz);
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/master/container/TajoRecordFactoryPBImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/container/TajoRecordFactoryPBImpl.java b/tajo-core/src/main/java/org/apache/tajo/master/container/TajoRecordFactoryPBImpl.java
new file mode 100644
index 0000000..c352a28
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/container/TajoRecordFactoryPBImpl.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.tajo.master.container;
+
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.tajo.master.container.TajoRecordFactory;
+
+/**
+ * This class is borrowed from the following source code :
+ * ${hadoop-yarn-common}/src/main/java/org/apache/hadoop/yarn/factories/impl/pb/RecordFactoryPBImpl.java
+ */
+@Private
+public class TajoRecordFactoryPBImpl implements TajoRecordFactory {
+
+  private static final String PB_IMPL_PACKAGE_SUFFIX = "impl.pb";
+  private static final String PB_IMPL_CLASS_SUFFIX = "PBImpl";
+
+  private static final TajoRecordFactoryPBImpl self = new TajoRecordFactoryPBImpl();
+  private Configuration localConf = new Configuration();
+  private ConcurrentMap<Class<?>, Constructor<?>> cache = new ConcurrentHashMap<Class<?>, Constructor<?>>();
+
+  private TajoRecordFactoryPBImpl() {
+  }
+
+  public static TajoRecordFactory get() {
+    return self;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public <T> T newRecordInstance(Class<T> clazz) {
+
+    Constructor<?> constructor = cache.get(clazz);
+    if (constructor == null) {
+      Class<?> pbClazz = null;
+      try {
+        pbClazz = localConf.getClassByName(getPBImplClassName(clazz));
+      } catch (ClassNotFoundException e) {
+        throw new YarnRuntimeException("Failed to load class: ["
+          + getPBImplClassName(clazz) + "]", e);
+      }
+      try {
+        constructor = pbClazz.getConstructor();
+        constructor.setAccessible(true);
+        cache.putIfAbsent(clazz, constructor);
+      } catch (NoSuchMethodException e) {
+        throw new YarnRuntimeException("Could not find 0 argument constructor", e);
+      }
+    }
+    try {
+      Object retObject = constructor.newInstance();
+      return (T)retObject;
+    } catch (InvocationTargetException e) {
+      throw new YarnRuntimeException(e);
+    } catch (IllegalAccessException e) {
+      throw new YarnRuntimeException(e);
+    } catch (InstantiationException e) {
+      throw new YarnRuntimeException(e);
+    }
+  }
+
+  private String getPBImplClassName(Class<?> clazz) {
+    String srcPackagePart = getPackageName(clazz);
+    String srcClassName = getClassName(clazz);
+    String destPackagePart = srcPackagePart + "." + PB_IMPL_PACKAGE_SUFFIX;
+    String destClassPart = srcClassName + PB_IMPL_CLASS_SUFFIX;
+    return destPackagePart + "." + destClassPart;
+  }
+
+  private String getClassName(Class<?> clazz) {
+    String fqName = clazz.getName();
+    return (fqName.substring(fqName.lastIndexOf(".") + 1, fqName.length()));
+  }
+
+  private String getPackageName(Class<?> clazz) {
+    return clazz.getPackage().getName();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/master/container/TajoRecordFactoryProvider.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/container/TajoRecordFactoryProvider.java b/tajo-core/src/main/java/org/apache/tajo/master/container/TajoRecordFactoryProvider.java
new file mode 100644
index 0000000..c260e85
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/container/TajoRecordFactoryProvider.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.container;
+
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+
+/**
+ * This class is borrowed from the following source code :
+ * ${hadoop-yarn-api}/src/main/java/org/apache/hadoop/yarn/factory/providers/RecordFactoryProvider.java
+ */
+@Unstable
+public class TajoRecordFactoryProvider {
+  private static Configuration defaultConf;
+
+  static {
+    defaultConf = new Configuration();
+  }
+
+  private TajoRecordFactoryProvider() {
+  }
+
+  public static TajoRecordFactory getRecordFactory(Configuration conf) {
+    if (conf == null) {
+      //Assuming the default configuration has the correct factories set.
+      //Users can specify a particular factory by providing a configuration.
+      conf = defaultConf;
+    }
+    return (TajoRecordFactory) getFactoryClassInstance(TajoRecordFactoryPBImpl.class.getCanonicalName());
+  }
+
+  private static Object getFactoryClassInstance(String factoryClassName) {
+    try {
+      Class<?> clazz = Class.forName(factoryClassName);
+      Method method = clazz.getMethod("get", null);
+      method.setAccessible(true);
+      return method.invoke(null, null);
+    } catch (ClassNotFoundException e) {
+      throw new YarnRuntimeException(e);
+    } catch (NoSuchMethodException e) {
+      throw new YarnRuntimeException(e);
+    } catch (InvocationTargetException e) {
+      throw new YarnRuntimeException(e);
+    } catch (IllegalAccessException e) {
+      throw new YarnRuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/master/container/TajoRecords.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/container/TajoRecords.java b/tajo-core/src/main/java/org/apache/tajo/master/container/TajoRecords.java
new file mode 100644
index 0000000..e85edf9
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/container/TajoRecords.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.container;
+
+
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * This class is borrowed from the following source code :
+ * ${hadoop-yarn-api}/src/main/java/org/apache/hadoop/yarn/util/Records.java
+ *
+ * Convenient API record utils
+ */
+@Unstable
+public class TajoRecords {
+  // The default record factory
+  private static final TajoRecordFactory factory =
+    TajoRecordFactoryProvider.getRecordFactory(null);
+
+  public static <T> T newRecord(Class<T> cls) {
+    return factory.newRecordInstance(cls);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/master/container/impl/pb/TajoContainerIdPBImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/container/impl/pb/TajoContainerIdPBImpl.java b/tajo-core/src/main/java/org/apache/tajo/master/container/impl/pb/TajoContainerIdPBImpl.java
new file mode 100644
index 0000000..9d31050
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/container/impl/pb/TajoContainerIdPBImpl.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.container.impl.pb;
+
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptIdProto;
+
+import com.google.common.base.Preconditions;
+import org.apache.tajo.ipc.ContainerProtocol;
+import org.apache.tajo.master.container.TajoContainerId;
+
+/**
+ * This class is borrowed from the following source code :
+ * ${hadoop-yarn-common}/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerIdPBImpl.java
+ *
+ */
+@Private
+@Unstable
+public class TajoContainerIdPBImpl extends TajoContainerId {
+  ContainerProtocol.TajoContainerIdProto proto = null;
+  ContainerProtocol.TajoContainerIdProto.Builder builder = null;
+  private ApplicationAttemptId applicationAttemptId = null;
+
+  public TajoContainerIdPBImpl() {
+    builder = ContainerProtocol.TajoContainerIdProto.newBuilder();
+  }
+
+  public TajoContainerIdPBImpl(ContainerProtocol.TajoContainerIdProto proto) {
+    this.proto = proto;
+    this.applicationAttemptId = convertFromProtoFormat(proto.getAppAttemptId());
+  }
+
+  public ContainerProtocol.TajoContainerIdProto getProto() {
+    return proto;
+  }
+
+  @Override
+  public int getId() {
+    Preconditions.checkNotNull(proto);
+    return proto.getId();
+  }
+
+  @Override
+  protected void setId(int id) {
+    Preconditions.checkNotNull(builder);
+    builder.setId((id));
+  }
+
+
+  @Override
+  public ApplicationAttemptId getApplicationAttemptId() {
+    return this.applicationAttemptId;
+  }
+
+  @Override
+  protected void setApplicationAttemptId(ApplicationAttemptId atId) {
+    if (atId != null) {
+      Preconditions.checkNotNull(builder);
+      builder.setAppAttemptId(convertToProtoFormat(atId));
+    }
+    this.applicationAttemptId = atId;
+  }
+
+  private ApplicationAttemptIdPBImpl convertFromProtoFormat(
+    ApplicationAttemptIdProto p) {
+    return new ApplicationAttemptIdPBImpl(p);
+  }
+
+  private ApplicationAttemptIdProto convertToProtoFormat(
+    ApplicationAttemptId t) {
+    return ((ApplicationAttemptIdPBImpl)t).getProto();
+  }
+
+  @Override
+  protected void build() {
+    proto = builder.build();
+    builder = null;
+  }
+}  
+

http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/master/event/LocalTaskEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/LocalTaskEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/LocalTaskEvent.java
index 92e6695..cab2202 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/LocalTaskEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/LocalTaskEvent.java
@@ -18,18 +18,19 @@
 
 package org.apache.tajo.master.event;
 
-import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.event.AbstractEvent;
 import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.master.container.TajoContainerId;
 
 /**
  * This event is sent to a running TaskAttempt on a worker.
  */
 public class LocalTaskEvent extends AbstractEvent<LocalTaskEventType> {
   private final QueryUnitAttemptId taskAttemptId;
-  private final ContainerId containerId;
+  private final TajoContainerId containerId;
 
-  public LocalTaskEvent(QueryUnitAttemptId taskAttemptId, ContainerId containerId, LocalTaskEventType eventType) {
+  public LocalTaskEvent(QueryUnitAttemptId taskAttemptId, TajoContainerId containerId,
+                        LocalTaskEventType eventType) {
     super(eventType);
     this.taskAttemptId = taskAttemptId;
     this.containerId = containerId;
@@ -39,7 +40,7 @@ public class LocalTaskEvent extends AbstractEvent<LocalTaskEventType> {
     return taskAttemptId;
   }
 
-  public ContainerId getContainerId() {
+  public TajoContainerId getContainerId() {
     return containerId;
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/master/event/QueryUnitAttemptScheduleEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/QueryUnitAttemptScheduleEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/QueryUnitAttemptScheduleEvent.java
index a2acc7e..6e0d9fd 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/QueryUnitAttemptScheduleEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/QueryUnitAttemptScheduleEvent.java
@@ -19,10 +19,10 @@
 package org.apache.tajo.master.event;
 
 import com.google.protobuf.RpcCallback;
-import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.ipc.TajoWorkerProtocol.QueryUnitRequestProto;
 import org.apache.tajo.master.querymaster.QueryUnitAttempt;
+import org.apache.tajo.master.container.TajoContainerId;
 
 public class QueryUnitAttemptScheduleEvent extends TaskSchedulerEvent {
   private final QueryUnitAttemptScheduleContext context;
@@ -44,7 +44,7 @@ public class QueryUnitAttemptScheduleEvent extends TaskSchedulerEvent {
   }
 
   public static class QueryUnitAttemptScheduleContext {
-    private ContainerId containerId;
+    private TajoContainerId containerId;
     private String host;
     private RpcCallback<QueryUnitRequestProto> callback;
 
@@ -52,7 +52,7 @@ public class QueryUnitAttemptScheduleEvent extends TaskSchedulerEvent {
 
     }
 
-    public QueryUnitAttemptScheduleContext(ContainerId containerId,
+    public QueryUnitAttemptScheduleContext(TajoContainerId containerId,
                                            String host,
                                            RpcCallback<QueryUnitRequestProto> callback) {
       this.containerId = containerId;
@@ -60,11 +60,11 @@ public class QueryUnitAttemptScheduleEvent extends TaskSchedulerEvent {
       this.callback = callback;
     }
 
-    public ContainerId getContainerId() {
+    public TajoContainerId getContainerId() {
       return containerId;
     }
 
-    public void setContainerId(ContainerId containerId) {
+    public void setContainerId(TajoContainerId containerId) {
       this.containerId = containerId;
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryContainerAllocationEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryContainerAllocationEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryContainerAllocationEvent.java
index a8f4800..e617d53 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryContainerAllocationEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryContainerAllocationEvent.java
@@ -18,21 +18,21 @@
 
 package org.apache.tajo.master.event;
 
-import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.master.container.TajoContainer;
 
 import java.util.List;
 
 public class SubQueryContainerAllocationEvent extends SubQueryEvent {
-  private List<Container> allocatedContainer;
+  private List<TajoContainer> allocatedContainer;
 
   public SubQueryContainerAllocationEvent(final ExecutionBlockId id,
-                                          List<Container> allocatedContainer) {
+                                          List<TajoContainer> allocatedContainer) {
     super(id, SubQueryEventType.SQ_CONTAINER_ALLOCATED);
     this.allocatedContainer = allocatedContainer;
   }
 
-  public List<Container> getAllocatedContainer() {
+  public List<TajoContainer> getAllocatedContainer() {
     return this.allocatedContainer;
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java
index e0928c5..3b9edcb 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java
@@ -18,22 +18,22 @@
 
 package org.apache.tajo.master.event;
 
-import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.tajo.QueryUnitAttemptId;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.master.container.TajoContainerId;
 
 public class TaskAttemptAssignedEvent extends TaskAttemptEvent {
-  private final ContainerId cId;
+  private final TajoContainerId cId;
   private final WorkerConnectionInfo workerConnectionInfo;
 
-  public TaskAttemptAssignedEvent(QueryUnitAttemptId id, ContainerId cId,
+  public TaskAttemptAssignedEvent(QueryUnitAttemptId id, TajoContainerId cId,
                                   WorkerConnectionInfo connectionInfo) {
     super(id, TaskAttemptEventType.TA_ASSIGNED);
     this.cId = cId;
     this.workerConnectionInfo = connectionInfo;
   }
 
-  public ContainerId getContainerId() {
+  public TajoContainerId getContainerId() {
     return cId;
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java
index 2197c33..9e8e3dd 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java
@@ -19,11 +19,11 @@
 package org.apache.tajo.master.event;
 
 import com.google.protobuf.RpcCallback;
-import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.event.AbstractEvent;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.ipc.TajoWorkerProtocol.QueryUnitRequestProto;
 import org.apache.tajo.master.event.TaskRequestEvent.TaskRequestEventType;
+import org.apache.tajo.master.container.TajoContainerId;
 
 public class TaskRequestEvent extends AbstractEvent<TaskRequestEventType> {
 
@@ -32,13 +32,13 @@ public class TaskRequestEvent extends AbstractEvent<TaskRequestEventType> {
   }
 
   private final int workerId;
-  private final ContainerId containerId;
+  private final TajoContainerId containerId;
   private final ExecutionBlockId executionBlockId;
 
   private final RpcCallback<QueryUnitRequestProto> callback;
 
   public TaskRequestEvent(int workerId,
-                          ContainerId containerId,
+                          TajoContainerId containerId,
                           ExecutionBlockId executionBlockId,
                           RpcCallback<QueryUnitRequestProto> callback) {
     super(TaskRequestEventType.TASK_REQ);
@@ -52,7 +52,7 @@ public class TaskRequestEvent extends AbstractEvent<TaskRequestEventType> {
     return this.workerId;
   }
 
-  public ContainerId getContainerId() {
+  public TajoContainerId getContainerId() {
     return this.containerId;
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
index d949ca4..e361c7f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.TajoProtos;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.ContainerProtocol;
 import org.apache.tajo.plan.logical.LogicalRootNode;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.QueryMasterProtocol;
@@ -74,7 +75,7 @@ public class QueryInProgress extends CompositeService {
 
   private QueryMasterProtocolService queryMasterRpcClient;
 
-  private YarnProtos.ContainerIdProto qmContainerId;
+  private ContainerProtocol.TajoContainerIdProto qmContainerId;
 
   public QueryInProgress(
       TajoMaster.MasterContext masterContext,

http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
index f953995..f4bd8a3 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
@@ -25,7 +25,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.CompositeService;
-import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.tajo.*;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.query.QueryContext;
@@ -33,6 +32,7 @@ import org.apache.tajo.ipc.QueryMasterProtocol;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.master.LazyTaskScheduler;
 import org.apache.tajo.master.event.*;
+import org.apache.tajo.master.container.TajoContainerId;
 import org.apache.tajo.master.session.Session;
 import org.apache.tajo.rpc.AsyncRpcServer;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
@@ -130,7 +130,7 @@ public class QueryMasterManagerService extends CompositeService
       if(queryMasterTask == null || queryMasterTask.isStopped()) {
         done.run(LazyTaskScheduler.stopTaskRunnerReq);
       } else {
-        ContainerId cid =
+        TajoContainerId cid =
             queryMasterTask.getQueryTaskContext().getResourceAllocator().makeContainerId(request.getContainerId());
         LOG.debug("getTask:" + cid + ", ebId:" + ebId);
         queryMasterTask.handleTaskRequestEvent(new TaskRequestEvent(request.getWorkerId(), cid, ebId, done));

http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
index db6f130..d88173f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
@@ -21,7 +21,6 @@ package org.apache.tajo.master.querymaster;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.state.*;
 import org.apache.tajo.QueryUnitAttemptId;
@@ -35,6 +34,7 @@ import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttem
 import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
 import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry;
 import org.apache.tajo.master.querymaster.QueryUnit.PullHost;
+import org.apache.tajo.master.container.TajoContainerId;
 
 import java.util.ArrayList;
 import java.util.EnumSet;
@@ -55,7 +55,7 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
   private final QueryUnit queryUnit;
   final EventHandler eventHandler;
 
-  private ContainerId containerId;
+  private TajoContainerId containerId;
   private WorkerConnectionInfo workerConnectionInfo;
   private int expire;
 
@@ -214,7 +214,7 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
     return this.workerConnectionInfo;
   }
 
-  public void setContainerId(ContainerId containerId) {
+  public void setContainerId(TajoContainerId containerId) {
     this.containerId = containerId;
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index 476da04..39bb7ed 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -24,7 +24,6 @@ import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -58,6 +57,8 @@ import org.apache.tajo.master.TaskRunnerGroupEvent.EventType;
 import org.apache.tajo.master.event.*;
 import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttemptScheduleContext;
 import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry;
+import org.apache.tajo.master.container.TajoContainer;
+import org.apache.tajo.master.container.TajoContainerId;
 import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.plan.logical.*;
 import org.apache.tajo.storage.StorageManager;
@@ -105,7 +106,8 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
   private long finishTime;
 
   volatile Map<QueryUnitId, QueryUnit> tasks = new ConcurrentHashMap<QueryUnitId, QueryUnit>();
-  volatile Map<ContainerId, Container> containers = new ConcurrentHashMap<ContainerId, Container>();
+  volatile Map<TajoContainerId, TajoContainer> containers = new ConcurrentHashMap<TajoContainerId,
+    TajoContainer>();
 
   private static final DiagnosticsUpdateTransition DIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition();
   private static final InternalErrorTransition INTERNAL_ERROR_TRANSITION = new InternalErrorTransition();
@@ -663,13 +665,6 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
     eventHandler.handle(new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_CLEANUP, getId(), containers.values()));
   }
 
-  public void releaseContainer(ContainerId containerId) {
-    // try to kill the container.
-    ArrayList<Container> list = new ArrayList<Container>();
-    list.add(containers.get(containerId));
-    eventHandler.handle(new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_CLEANUP, getId(), list));
-  }
-
   /**
    * It computes all stats and sets the intermediate result.
    */
@@ -1129,8 +1124,8 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
       try {
         SubQueryContainerAllocationEvent allocationEvent =
             (SubQueryContainerAllocationEvent) event;
-        for (Container container : allocationEvent.getAllocatedContainer()) {
-          ContainerId cId = container.getId();
+        for (TajoContainer container : allocationEvent.getAllocatedContainer()) {
+          TajoContainerId cId = container.getId();
           if (subQuery.containers.containsKey(cId)) {
             subQuery.eventHandler.handle(new SubQueryDiagnosticsUpdateEvent(subQuery.getId(),
                 "Duplicated containers are allocated: " + cId.toString()));

http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java
index 5d07ff2..bb8cc12 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java
@@ -21,14 +21,13 @@ package org.apache.tajo.master.rm;
 import com.google.common.collect.Maps;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.tajo.QueryId;
+import org.apache.tajo.ipc.ContainerProtocol;
 
 import java.util.Collections;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
-import static org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
-
 /**
  * It's a worker resource manager context. It contains all context data about TajoWorkerResourceManager.
  */
@@ -43,7 +42,8 @@ public class TajoRMContext {
   private final ConcurrentMap<Integer, Worker> inactiveWorkers = Maps.newConcurrentMap();
 
   /** map between queryIds and query master ContainerId */
-  private final ConcurrentMap<QueryId, ContainerIdProto> qmContainerMap = Maps.newConcurrentMap();
+  private final ConcurrentMap<QueryId, ContainerProtocol.TajoContainerIdProto> qmContainerMap = Maps
+    .newConcurrentMap();
 
   private final Set<Integer> liveQueryMasterWorkerResources =
       Collections.newSetFromMap(new ConcurrentHashMap<Integer, Boolean>());
@@ -77,7 +77,7 @@ public class TajoRMContext {
    *
    * @return The Map for query master containers
    */
-  public ConcurrentMap<QueryId, ContainerIdProto> getQueryMasterContainer() {
+  public ConcurrentMap<QueryId, ContainerProtocol.TajoContainerIdProto> getQueryMasterContainer() {
     return qmContainerMap;
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainer.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainer.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainer.java
index 4d6cbd2..3d28d85 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainer.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainer.java
@@ -19,9 +19,12 @@
 package org.apache.tajo.master.rm;
 
 import org.apache.hadoop.yarn.api.records.*;
+import org.apache.tajo.master.container.TajoContainer;
+import org.apache.tajo.master.container.TajoContainerId;
 
-public class TajoWorkerContainer extends Container {
-  ContainerId id;
+
+public class TajoWorkerContainer extends TajoContainer {
+  TajoContainerId id;
   NodeId nodeId;
   Worker worker;
 
@@ -34,12 +37,12 @@ public class TajoWorkerContainer extends Container {
   }
 
   @Override
-  public ContainerId getId() {
+  public TajoContainerId getId() {
     return id;
   }
 
   @Override
-  public void setId(ContainerId id) {
+  public void setId(TajoContainerId id) {
     this.id = id;
   }
 
@@ -94,7 +97,7 @@ public class TajoWorkerContainer extends Container {
   }
 
   @Override
-  public int compareTo(Container container) {
-    return 0;  //To change body of implemented methods use File | Settings | File Templates.
+  public int compareTo(TajoContainer container) {
+    return getId().compareTo(container.getId());
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainerId.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainerId.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainerId.java
index 634ad2b..184de71 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainerId.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainerId.java
@@ -19,10 +19,11 @@
 package org.apache.tajo.master.rm;
 
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.proto.YarnProtos;
+import org.apache.tajo.ipc.ContainerProtocol;
+import org.apache.tajo.master.container.TajoContainerId;
 
-public class TajoWorkerContainerId extends ContainerId {
+public class TajoWorkerContainerId extends TajoContainerId {
   ApplicationAttemptId applicationAttemptId;
   int id;
 
@@ -46,43 +47,43 @@ public class TajoWorkerContainerId extends ContainerId {
     this.id = id;
   }
 
-  public YarnProtos.ContainerIdProto getProto() {
+  public ContainerProtocol.TajoContainerIdProto getProto() {
     YarnProtos.ApplicationIdProto appIdProto = YarnProtos.ApplicationIdProto.newBuilder()
-        .setClusterTimestamp(applicationAttemptId.getApplicationId().getClusterTimestamp())
-        .setId(applicationAttemptId.getApplicationId().getId())
-        .build();
+      .setClusterTimestamp(applicationAttemptId.getApplicationId().getClusterTimestamp())
+      .setId(applicationAttemptId.getApplicationId().getId())
+      .build();
 
     YarnProtos.ApplicationAttemptIdProto attemptIdProto = YarnProtos.ApplicationAttemptIdProto.newBuilder()
-        .setAttemptId(applicationAttemptId.getAttemptId())
-        .setApplicationId(appIdProto)
-        .build();
+      .setAttemptId(applicationAttemptId.getAttemptId())
+      .setApplicationId(appIdProto)
+      .build();
 
-    return YarnProtos.ContainerIdProto.newBuilder()
-        .setAppAttemptId(attemptIdProto)
-        .setAppId(appIdProto)
-        .setId(id)
-        .build();
+    return ContainerProtocol.TajoContainerIdProto.newBuilder()
+      .setAppAttemptId(attemptIdProto)
+      .setAppId(appIdProto)
+      .setId(id)
+      .build();
   }
 
-  public static YarnProtos.ContainerIdProto getContainerIdProto(ContainerId containerId) {
+  public static ContainerProtocol.TajoContainerIdProto getContainerIdProto(TajoContainerId containerId) {
     if(containerId instanceof TajoWorkerContainerId) {
       return ((TajoWorkerContainerId)containerId).getProto();
     } else {
       YarnProtos.ApplicationIdProto appIdProto = YarnProtos.ApplicationIdProto.newBuilder()
-          .setClusterTimestamp(containerId.getApplicationAttemptId().getApplicationId().getClusterTimestamp())
-          .setId(containerId.getApplicationAttemptId().getApplicationId().getId())
-          .build();
+        .setClusterTimestamp(containerId.getApplicationAttemptId().getApplicationId().getClusterTimestamp())
+        .setId(containerId.getApplicationAttemptId().getApplicationId().getId())
+        .build();
 
       YarnProtos.ApplicationAttemptIdProto attemptIdProto = YarnProtos.ApplicationAttemptIdProto.newBuilder()
-          .setAttemptId(containerId.getApplicationAttemptId().getAttemptId())
-          .setApplicationId(appIdProto)
-          .build();
+        .setAttemptId(containerId.getApplicationAttemptId().getAttemptId())
+        .setApplicationId(appIdProto)
+        .build();
 
-      return YarnProtos.ContainerIdProto.newBuilder()
-          .setAppAttemptId(attemptIdProto)
-          .setAppId(appIdProto)
-          .setId(containerId.getId())
-          .build();
+      return ContainerProtocol.TajoContainerIdProto.newBuilder()
+        .setAppAttemptId(attemptIdProto)
+        .setAppId(appIdProto)
+        .setId(containerId.getId())
+        .build();
     }
   }
 


Mime
View raw message