tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jh...@apache.org
Subject git commit: TAJO-130: Same queryConf file conflicts. (jinho)
Date Mon, 19 Aug 2013 10:37:51 GMT
Updated Branches:
  refs/heads/master 49607a542 -> df09fd22c


TAJO-130: Same queryConf file conflicts. (jinho)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/df09fd22
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/df09fd22
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/df09fd22

Branch: refs/heads/master
Commit: df09fd22cc99585d3251c8f2d4e0499c7f566309
Parents: 49607a5
Author: jinossy <jinossy@gmail.com>
Authored: Mon Aug 19 19:33:26 2013 +0900
Committer: jinossy <jinossy@gmail.com>
Committed: Mon Aug 19 19:33:26 2013 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 ++
 .../java/org/apache/tajo/conf/TajoConf.java     |  2 +-
 .../main/java/org/apache/tajo/QueryConf.java    |  1 +
 .../org/apache/tajo/master/ContainerProxy.java  | 23 ++++++++++++----
 .../tajo/master/TaskRunnerLauncherImpl.java     | 10 +------
 .../apache/tajo/master/TaskSchedulerImpl.java   | 12 +++++----
 .../master/querymaster/QueryMasterManager.java  |  6 ++---
 .../master/querymaster/QueryMasterRunner.java   |  2 +-
 .../tajo/master/querymaster/SubQuery.java       |  4 ++-
 .../tajo/master/rm/RMContainerAllocator.java    | 28 +++++++++++---------
 .../src/main/resources/tajo-default.xml         | 10 -------
 .../org/apache/tajo/TajoTestingCluster.java     |  2 ++
 12 files changed, 55 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df09fd22/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6f4cd5b..f41d352 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -104,6 +104,8 @@ Release 0.2.0 - unreleased
 
   BUG FIXES
 
+    TAJO-130: Same queryConf file conflicts. (jinho)
+
     TAJO-82: NullPointerException occurs when Schema is converted as an array 
     of columns. (jihoon)
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df09fd22/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index f4b20ff..21d0c63 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -86,7 +86,7 @@ public class TajoConf extends YarnConfiguration {
     /** how many launching TaskRunners in parallel */
     AM_TASKRUNNER_LAUNCH_PARALLEL_NUM("tajo.master.taskrunnerlauncher.parallel.num", 16),
 
-
+    MAX_WORKER_PER_NODE("tajo.query.max-workernum.per.node", 8),
 
     //////////////////////////////////
     // Pull Server

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df09fd22/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/QueryConf.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/QueryConf.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/QueryConf.java
index e88217a..34144ec 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/QueryConf.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/QueryConf.java
@@ -24,6 +24,7 @@ import org.apache.tajo.conf.TajoConf;
 
 public class QueryConf extends TajoConf {
   public static String FILENAME = "queryconf.xml";
+  public static String QUERY_MASTER_FILENAME = "querymasterconf.xml";
 
   public QueryConf() {
     super();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df09fd22/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ContainerProxy.java
b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ContainerProxy.java
index b935eb7..7ae01d6 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ContainerProxy.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ContainerProxy.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.ProtoUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.tajo.QueryConf;
+import org.apache.tajo.TajoConstants;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.master.querymaster.QueryMaster;
 import org.apache.tajo.pullserver.PullServerAuxService;
@@ -226,7 +227,7 @@ public abstract class ContainerProxy {
     }
   }
 
-  public static ContainerLaunchContext createCommonContainerLaunchContext(Configuration config)
{
+  public static ContainerLaunchContext createCommonContainerLaunchContext(Configuration config,
String queryId, boolean isMaster) {
     TajoConf conf = (TajoConf)config;
 
     ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
@@ -309,16 +310,28 @@ public abstract class ContainerProxy {
       LOG.error(e.getMessage(), e);
     }
 
-    LOG.info("Writing a QueryConf to HDFS and add to local environment");
 
-    Path queryConfPath = new Path(fs.getHomeDirectory(), QueryConf.FILENAME);
+    LOG.info("Writing a QueryConf to HDFS and add to local environment");
     try {
-      writeConf(conf, queryConfPath);
+      // TODO move to tajo temp
+      Path warehousePath = new Path(conf.getVar(TajoConf.ConfVars.ROOT_DIR), TajoConstants.WAREHOUSE_DIR);
+      Path queryConfPath = new Path(warehousePath, queryId);
+      if(isMaster) {
+        queryConfPath = new Path(queryConfPath, QueryConf.QUERY_MASTER_FILENAME);
+      } else {
+        queryConfPath = new Path(queryConfPath, QueryConf.FILENAME);
+      }
 
+      if(!fs.exists(queryConfPath)){
+        writeConf(conf, queryConfPath);
+      } else {
+        LOG.warn("QueryConf already exist. path: "  + queryConfPath.toString());
+      }
       LocalResource queryConfSrc = createApplicationResource(fsCtx, queryConfPath, LocalResourceType.FILE);
-      localResources.put(QueryConf.FILENAME,  queryConfSrc);
 
+      localResources.put(queryConfPath.getName(), queryConfSrc);
       ctx.setLocalResources(localResources);
+
     } catch (IOException e) {
       LOG.error(e.getMessage(), e);
     }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df09fd22/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskRunnerLauncherImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskRunnerLauncherImpl.java
b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskRunnerLauncherImpl.java
index d8ddb46..e6d4c56 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskRunnerLauncherImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskRunnerLauncherImpl.java
@@ -70,14 +70,6 @@ public class TaskRunnerLauncherImpl extends AbstractService implements
TaskRunne
 
   public void stop() {
     executorService.shutdownNow();
-
-    while(!executorService.isTerminated()) {
-      LOG.info("executorService.isTerminated:" + executorService.isTerminated() + "," + executorService.isShutdown());
-      try {
-        Thread.sleep(1000);
-      } catch (InterruptedException e) {
-      }
-    }
     Map<ContainerId, ContainerProxy> containers = context.getContainers();
     for(ContainerProxy eachProxy: containers.values()) {
       try {
@@ -98,7 +90,7 @@ public class TaskRunnerLauncherImpl extends AbstractService implements TaskRunne
   }
 
   private void launchTaskRunners(SubQueryId subQueryId, Collection<Container> containers)
{
-    commonContainerSpec = ContainerProxy.createCommonContainerLaunchContext(getConfig());
+    commonContainerSpec = ContainerProxy.createCommonContainerLaunchContext(getConfig(),
subQueryId.toString(), false);
     for (Container container : containers) {
       final ContainerProxy proxy =
           new TaskRunnerContainerProxy(context, getConfig(), context.getYarnRPC(), container,
subQueryId);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df09fd22/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
index 62e702d..fc776e3 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
@@ -22,6 +22,7 @@ import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.service.AbstractService;
@@ -111,7 +112,7 @@ public class TaskSchedulerImpl extends AbstractService
 
         while(!stopEventHandling && !Thread.currentThread().isInterrupted()) {
           try {
-            Thread.sleep(1000);
+            Thread.sleep(100);
           } catch (InterruptedException e) {
             break;
           }
@@ -257,14 +258,15 @@ public class TaskSchedulerImpl extends AbstractService
 
     public void addLeafTask(TaskScheduleEvent event) {
       for (String host : event.getHosts()) {
-        LinkedList<QueryUnitAttemptId> list = leafTasksHostMapping.get(host);
+        String hostName = NetUtils.normalizeHostName(host);
+        LinkedList<QueryUnitAttemptId> list = leafTasksHostMapping.get(hostName);
         if (list == null) {
           list = new LinkedList<QueryUnitAttemptId>();
-          leafTasksHostMapping.put(host, list);
+          leafTasksHostMapping.put(hostName, list);
         }
         list.add(event.getAttemptId());
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Added attempt req to host " + host);
+          LOG.debug("Added attempt req to host " + hostName);
         }
       }
       for (String rack: event.getRacks()) {
@@ -304,7 +306,7 @@ public class TaskSchedulerImpl extends AbstractService
       while (it.hasNext() && leafTasks.size() > 0) {
         taskRequest = it.next();
         ContainerProxy container = context.getContainer(taskRequest.getContainerId());
-        String hostName = container.getTaskHostName();
+        String hostName = NetUtils.normalizeHostName(container.getTaskHostName());
 
         QueryUnitAttemptId attemptId = null;
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df09fd22/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManager.java
b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManager.java
index 47adf7d..35d7201 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManager.java
@@ -131,7 +131,7 @@ public class QueryMasterManager extends CompositeService {
         break;
       }
       try {
-        Thread.sleep(1000);
+        Thread.sleep(100);
       } catch (InterruptedException e) {
       }
     }
@@ -244,7 +244,7 @@ public class QueryMasterManager extends CompositeService {
     appContext.setQueue("default");
 
     ContainerLaunchContext commonContainerLaunchContext =
-            ContainerProxy.createCommonContainerLaunchContext(masterContext.getConf());
+            ContainerProxy.createCommonContainerLaunchContext(masterContext.getConf(), queryId.toString(),
true);
 
     // Setup environment by cloning from common env.
     Map<String, String> env = commonContainerLaunchContext.getEnvironment();
@@ -288,7 +288,7 @@ public class QueryMasterManager extends CompositeService {
 
     final Resource resource = Records.newRecord(Resource.class);
     // TODO - get default value from conf
-    resource.setMemory(2048);
+    resource.setMemory(2000);
     resource.setVirtualCores(1);
 
     Map<String, ByteBuffer> myServiceData = new HashMap<String, ByteBuffer>();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df09fd22/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterRunner.java
b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterRunner.java
index f34464b..daab9fd 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterRunner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterRunner.java
@@ -86,7 +86,7 @@ public class QueryMasterRunner extends AbstractService {
     LOG.info("QueryMasterRunner started");
 
     final QueryConf conf = new QueryConf();
-    conf.addResource(new Path(QueryConf.FILENAME));
+    conf.addResource(new Path(QueryConf.QUERY_MASTER_FILENAME));
 
     UserGroupInformation.setConfiguration(conf);
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df09fd22/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index d5d4dfd..94a6af8 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -616,7 +616,9 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
       QueryUnit [] tasks = subQuery.getQueryUnits();
 
       int numClusterNodes = subQuery.getContext().getNumClusterNode();
-      int numRequest = Math.min(tasks.length, numClusterNodes * 4);
+      TajoConf conf =  subQuery.getContext().getConf();
+      int workerNum = conf.getIntVar(ConfVars.MAX_WORKER_PER_NODE);
+      int numRequest = Math.min(tasks.length, numClusterNodes * workerNum);
 
       final Resource resource = Records.newRecord(Resource.class);
       // TODO - for each different subquery, the volume of resource should be different.

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df09fd22/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/RMContainerAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/RMContainerAllocator.java
b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/RMContainerAllocator.java
index c01cabc..c02ddaf 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/RMContainerAllocator.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/RMContainerAllocator.java
@@ -43,6 +43,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 public class RMContainerAllocator extends AMRMClientImpl
     implements EventHandler<ContainerAllocationEvent> {
@@ -95,7 +96,7 @@ public class RMContainerAllocator extends AMRMClientImpl
 
   protected Thread allocatorThread;
   private final AtomicBoolean stopped = new AtomicBoolean(false);
-  private int rmPollInterval = 1000;//millis
+  private int rmPollInterval = 100;//millis
   protected void startAllocatorThread() {
     allocatorThread = new Thread(new Runnable() {
       @Override
@@ -135,16 +136,23 @@ public class RMContainerAllocator extends AMRMClientImpl
 
   private final Map<Priority, SubQueryId> subQueryMap =
       new HashMap<Priority, SubQueryId>();
+  private AtomicLong prevReportTime = new AtomicLong(0);
+  private int reportInterval = 5 * 1000; // second
 
   public void heartbeat() throws Exception {
     AllocateResponse allocateResponse = allocate(context.getProgress());
     AMResponse response = allocateResponse.getAMResponse();
     List<Container> allocatedContainers = response.getAllocatedContainers();
 
-    LOG.info("Available Cluster Nodes: " + allocateResponse.getNumClusterNodes());
-    LOG.info("Available Resource: " + response.getAvailableResources());
-    LOG.info("Num of Allocated Containers: " + response.getAllocatedContainers().size());
-    if (response.getAllocatedContainers().size() > 0) {
+    long currentTime = System.currentTimeMillis();
+    if((currentTime - prevReportTime.longValue()) >= reportInterval){
+      LOG.debug("Available Cluster Nodes: " + allocateResponse.getNumClusterNodes());
+      LOG.debug("Num of Allocated Containers: " + allocatedContainers.size());
+      LOG.info("Available Resource: " + response.getAvailableResources());
+      prevReportTime.set(currentTime);
+    }
+
+    if (allocatedContainers.size() > 0) {
       LOG.info("================================================================");
       for (Container container : response.getAllocatedContainers()) {
         LOG.info("> Container Id: " + container.getId());
@@ -154,18 +162,14 @@ public class RMContainerAllocator extends AMRMClientImpl
         LOG.info("> Priority: " + container.getPriority());
       }
       LOG.info("================================================================");
-    }
 
-    Map<SubQueryId, List<Container>> allocated = new HashMap<SubQueryId, List<Container>>();
-    if (allocatedContainers.size() > 0) {
+      Map<SubQueryId, List<Container>> allocated = new HashMap<SubQueryId,
List<Container>>();
+
       for (Container container : allocatedContainers) {
         SubQueryId subQueryId = subQueryMap.get(container.getPriority());
         SubQueryState state = context.getSubQuery(subQueryId).getState();
-        if (!(isRunningState(state) && subQueryMap.containsKey(container.getPriority())))
{
+        if (!(isRunningState(state))) {
           releaseAssignedContainer(container.getId());
-          synchronized (subQueryMap) {
-            subQueryMap.remove(container.getPriority());
-          }
         } else {
           if (allocated.containsKey(subQueryId)) {
             allocated.get(subQueryId).add(container);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df09fd22/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml b/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml
index f3da922..a1a111c 100644
--- a/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml
+++ b/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml
@@ -37,16 +37,6 @@
   </property>
 
   <property>
-    <name>tajo.master.clientservice.addr</name>
-    <value>127.0.0.1:9004</value>
-  </property>
-
-  <property>
-    <name>tajo.master.querymastermanager.addr</name>
-    <value>127.0.0.1:9005</value>
-  </property>
-
-  <property>
     <name>tajo.query.session.timeout</name>
     <value>60000</value>
     <description>ms</description>

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df09fd22/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
index 30770cb..32b1f56 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -154,6 +155,7 @@ public class TajoTestingCluster {
     System.setProperty(MiniDFSCluster.PROP_TEST_BUILD_DATA,
         this.clusterTestBuildDir.toString());
 
+    conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
     MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(new HdfsConfiguration(conf));
     builder.hosts(hosts);
     builder.numDataNodes(servers);


Mime
View raw message