Return-Path: X-Original-To: apmail-tajo-commits-archive@minotaur.apache.org Delivered-To: apmail-tajo-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 95D7510211 for ; Mon, 19 Aug 2013 10:38:16 +0000 (UTC) Received: (qmail 26207 invoked by uid 500); 19 Aug 2013 10:38:16 -0000 Delivered-To: apmail-tajo-commits-archive@tajo.apache.org Received: (qmail 26183 invoked by uid 500); 19 Aug 2013 10:38:16 -0000 Mailing-List: contact commits-help@tajo.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tajo.incubator.apache.org Delivered-To: mailing list commits@tajo.incubator.apache.org Received: (qmail 26176 invoked by uid 99); 19 Aug 2013 10:38:15 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 19 Aug 2013 10:38:15 +0000 X-ASF-Spam-Status: No, hits=-2000.8 required=5.0 tests=ALL_TRUSTED,FB_GET_MEDS,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Mon, 19 Aug 2013 10:38:13 +0000 Received: (qmail 26039 invoked by uid 99); 19 Aug 2013 10:37:52 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 19 Aug 2013 10:37:52 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id C9F3A8BF7EB; Mon, 19 Aug 2013 10:37:51 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jhkim@apache.org To: commits@tajo.incubator.apache.org Message-Id: <755757b2c93f46be95623d9171be40d5@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: TAJO-130: Same queryConf file conflicts. (jinho) Date: Mon, 19 Aug 2013 10:37:51 +0000 (UTC) X-Virus-Checked: Checked by ClamAV on apache.org Updated Branches: refs/heads/master 49607a542 -> df09fd22c TAJO-130: Same queryConf file conflicts. (jinho) Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/df09fd22 Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/df09fd22 Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/df09fd22 Branch: refs/heads/master Commit: df09fd22cc99585d3251c8f2d4e0499c7f566309 Parents: 49607a5 Author: jinossy Authored: Mon Aug 19 19:33:26 2013 +0900 Committer: jinossy Committed: Mon Aug 19 19:33:26 2013 +0900 ---------------------------------------------------------------------- CHANGES.txt | 2 ++ .../java/org/apache/tajo/conf/TajoConf.java | 2 +- .../main/java/org/apache/tajo/QueryConf.java | 1 + .../org/apache/tajo/master/ContainerProxy.java | 23 ++++++++++++---- .../tajo/master/TaskRunnerLauncherImpl.java | 10 +------ .../apache/tajo/master/TaskSchedulerImpl.java | 12 +++++---- .../master/querymaster/QueryMasterManager.java | 6 ++--- .../master/querymaster/QueryMasterRunner.java | 2 +- .../tajo/master/querymaster/SubQuery.java | 4 ++- .../tajo/master/rm/RMContainerAllocator.java | 28 +++++++++++--------- .../src/main/resources/tajo-default.xml | 10 ------- .../org/apache/tajo/TajoTestingCluster.java | 2 ++ 12 files changed, 55 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df09fd22/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 6f4cd5b..f41d352 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -104,6 +104,8 @@ Release 0.2.0 - unreleased BUG FIXES + TAJO-130: Same queryConf file conflicts. (jinho) + TAJO-82: NullPointerException occurs when Schema is converted as an array of columns. (jihoon) http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df09fd22/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java index f4b20ff..21d0c63 100644 --- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java @@ -86,7 +86,7 @@ public class TajoConf extends YarnConfiguration { /** how many launching TaskRunners in parallel */ AM_TASKRUNNER_LAUNCH_PARALLEL_NUM("tajo.master.taskrunnerlauncher.parallel.num", 16), - + MAX_WORKER_PER_NODE("tajo.query.max-workernum.per.node", 8), ////////////////////////////////// // Pull Server http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df09fd22/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/QueryConf.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/QueryConf.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/QueryConf.java index e88217a..34144ec 100644 --- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/QueryConf.java +++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/QueryConf.java @@ -24,6 +24,7 @@ import org.apache.tajo.conf.TajoConf; public class QueryConf extends TajoConf { public static String FILENAME = "queryconf.xml"; + public static String QUERY_MASTER_FILENAME = "querymasterconf.xml"; public QueryConf() { super(); http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df09fd22/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ContainerProxy.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ContainerProxy.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ContainerProxy.java index b935eb7..7ae01d6 100644 --- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ContainerProxy.java +++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ContainerProxy.java @@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ProtoUtils; import org.apache.hadoop.yarn.util.Records; import org.apache.tajo.QueryConf; +import org.apache.tajo.TajoConstants; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.master.querymaster.QueryMaster; import org.apache.tajo.pullserver.PullServerAuxService; @@ -226,7 +227,7 @@ public abstract class ContainerProxy { } } - public static ContainerLaunchContext createCommonContainerLaunchContext(Configuration config) { + public static ContainerLaunchContext createCommonContainerLaunchContext(Configuration config, String queryId, boolean isMaster) { TajoConf conf = (TajoConf)config; ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class); @@ -309,16 +310,28 @@ public abstract class ContainerProxy { LOG.error(e.getMessage(), e); } - LOG.info("Writing a QueryConf to HDFS and add to local environment"); - Path queryConfPath = new Path(fs.getHomeDirectory(), QueryConf.FILENAME); + LOG.info("Writing a QueryConf to HDFS and add to local environment"); try { - writeConf(conf, queryConfPath); + // TODO move to tajo temp + Path warehousePath = new Path(conf.getVar(TajoConf.ConfVars.ROOT_DIR), TajoConstants.WAREHOUSE_DIR); + Path queryConfPath = new Path(warehousePath, queryId); + if(isMaster) { + queryConfPath = new Path(queryConfPath, QueryConf.QUERY_MASTER_FILENAME); + } else { + queryConfPath = new Path(queryConfPath, QueryConf.FILENAME); + } + if(!fs.exists(queryConfPath)){ + writeConf(conf, queryConfPath); + } else { + LOG.warn("QueryConf already exist. path: " + queryConfPath.toString()); + } LocalResource queryConfSrc = createApplicationResource(fsCtx, queryConfPath, LocalResourceType.FILE); - localResources.put(QueryConf.FILENAME, queryConfSrc); + localResources.put(queryConfPath.getName(), queryConfSrc); ctx.setLocalResources(localResources); + } catch (IOException e) { LOG.error(e.getMessage(), e); } http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df09fd22/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskRunnerLauncherImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskRunnerLauncherImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskRunnerLauncherImpl.java index d8ddb46..e6d4c56 100644 --- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskRunnerLauncherImpl.java +++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskRunnerLauncherImpl.java @@ -70,14 +70,6 @@ public class TaskRunnerLauncherImpl extends AbstractService implements TaskRunne public void stop() { executorService.shutdownNow(); - - while(!executorService.isTerminated()) { - LOG.info("executorService.isTerminated:" + executorService.isTerminated() + "," + executorService.isShutdown()); - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - } - } Map containers = context.getContainers(); for(ContainerProxy eachProxy: containers.values()) { try { @@ -98,7 +90,7 @@ public class TaskRunnerLauncherImpl extends AbstractService implements TaskRunne } private void launchTaskRunners(SubQueryId subQueryId, Collection containers) { - commonContainerSpec = ContainerProxy.createCommonContainerLaunchContext(getConfig()); + commonContainerSpec = ContainerProxy.createCommonContainerLaunchContext(getConfig(), subQueryId.toString(), false); for (Container container : containers) { final ContainerProxy proxy = new TaskRunnerContainerProxy(context, getConfig(), context.getYarnRPC(), container, subQueryId); http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df09fd22/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java index 62e702d..fc776e3 100644 --- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java +++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java @@ -22,6 +22,7 @@ import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.service.AbstractService; @@ -111,7 +112,7 @@ public class TaskSchedulerImpl extends AbstractService while(!stopEventHandling && !Thread.currentThread().isInterrupted()) { try { - Thread.sleep(1000); + Thread.sleep(100); } catch (InterruptedException e) { break; } @@ -257,14 +258,15 @@ public class TaskSchedulerImpl extends AbstractService public void addLeafTask(TaskScheduleEvent event) { for (String host : event.getHosts()) { - LinkedList list = leafTasksHostMapping.get(host); + String hostName = NetUtils.normalizeHostName(host); + LinkedList list = leafTasksHostMapping.get(hostName); if (list == null) { list = new LinkedList(); - leafTasksHostMapping.put(host, list); + leafTasksHostMapping.put(hostName, list); } list.add(event.getAttemptId()); if (LOG.isDebugEnabled()) { - LOG.debug("Added attempt req to host " + host); + LOG.debug("Added attempt req to host " + hostName); } } for (String rack: event.getRacks()) { @@ -304,7 +306,7 @@ public class TaskSchedulerImpl extends AbstractService while (it.hasNext() && leafTasks.size() > 0) { taskRequest = it.next(); ContainerProxy container = context.getContainer(taskRequest.getContainerId()); - String hostName = container.getTaskHostName(); + String hostName = NetUtils.normalizeHostName(container.getTaskHostName()); QueryUnitAttemptId attemptId = null; http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df09fd22/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManager.java index 47adf7d..35d7201 100644 --- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManager.java +++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManager.java @@ -131,7 +131,7 @@ public class QueryMasterManager extends CompositeService { break; } try { - Thread.sleep(1000); + Thread.sleep(100); } catch (InterruptedException e) { } } @@ -244,7 +244,7 @@ public class QueryMasterManager extends CompositeService { appContext.setQueue("default"); ContainerLaunchContext commonContainerLaunchContext = - ContainerProxy.createCommonContainerLaunchContext(masterContext.getConf()); + ContainerProxy.createCommonContainerLaunchContext(masterContext.getConf(), queryId.toString(), true); // Setup environment by cloning from common env. Map env = commonContainerLaunchContext.getEnvironment(); @@ -288,7 +288,7 @@ public class QueryMasterManager extends CompositeService { final Resource resource = Records.newRecord(Resource.class); // TODO - get default value from conf - resource.setMemory(2048); + resource.setMemory(2000); resource.setVirtualCores(1); Map myServiceData = new HashMap(); http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df09fd22/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterRunner.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterRunner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterRunner.java index f34464b..daab9fd 100644 --- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterRunner.java +++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterRunner.java @@ -86,7 +86,7 @@ public class QueryMasterRunner extends AbstractService { LOG.info("QueryMasterRunner started"); final QueryConf conf = new QueryConf(); - conf.addResource(new Path(QueryConf.FILENAME)); + conf.addResource(new Path(QueryConf.QUERY_MASTER_FILENAME)); UserGroupInformation.setConfiguration(conf); http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df09fd22/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java index d5d4dfd..94a6af8 100644 --- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java +++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java @@ -616,7 +616,9 @@ public class SubQuery implements EventHandler { QueryUnit [] tasks = subQuery.getQueryUnits(); int numClusterNodes = subQuery.getContext().getNumClusterNode(); - int numRequest = Math.min(tasks.length, numClusterNodes * 4); + TajoConf conf = subQuery.getContext().getConf(); + int workerNum = conf.getIntVar(ConfVars.MAX_WORKER_PER_NODE); + int numRequest = Math.min(tasks.length, numClusterNodes * workerNum); final Resource resource = Records.newRecord(Resource.class); // TODO - for each different subquery, the volume of resource should be different. http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df09fd22/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/RMContainerAllocator.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/RMContainerAllocator.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/RMContainerAllocator.java index c01cabc..c02ddaf 100644 --- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/RMContainerAllocator.java +++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/RMContainerAllocator.java @@ -43,6 +43,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; public class RMContainerAllocator extends AMRMClientImpl implements EventHandler { @@ -95,7 +96,7 @@ public class RMContainerAllocator extends AMRMClientImpl protected Thread allocatorThread; private final AtomicBoolean stopped = new AtomicBoolean(false); - private int rmPollInterval = 1000;//millis + private int rmPollInterval = 100;//millis protected void startAllocatorThread() { allocatorThread = new Thread(new Runnable() { @Override @@ -135,16 +136,23 @@ public class RMContainerAllocator extends AMRMClientImpl private final Map subQueryMap = new HashMap(); + private AtomicLong prevReportTime = new AtomicLong(0); + private int reportInterval = 5 * 1000; // second public void heartbeat() throws Exception { AllocateResponse allocateResponse = allocate(context.getProgress()); AMResponse response = allocateResponse.getAMResponse(); List allocatedContainers = response.getAllocatedContainers(); - LOG.info("Available Cluster Nodes: " + allocateResponse.getNumClusterNodes()); - LOG.info("Available Resource: " + response.getAvailableResources()); - LOG.info("Num of Allocated Containers: " + response.getAllocatedContainers().size()); - if (response.getAllocatedContainers().size() > 0) { + long currentTime = System.currentTimeMillis(); + if((currentTime - prevReportTime.longValue()) >= reportInterval){ + LOG.debug("Available Cluster Nodes: " + allocateResponse.getNumClusterNodes()); + LOG.debug("Num of Allocated Containers: " + allocatedContainers.size()); + LOG.info("Available Resource: " + response.getAvailableResources()); + prevReportTime.set(currentTime); + } + + if (allocatedContainers.size() > 0) { LOG.info("================================================================"); for (Container container : response.getAllocatedContainers()) { LOG.info("> Container Id: " + container.getId()); @@ -154,18 +162,14 @@ public class RMContainerAllocator extends AMRMClientImpl LOG.info("> Priority: " + container.getPriority()); } LOG.info("================================================================"); - } - Map> allocated = new HashMap>(); - if (allocatedContainers.size() > 0) { + Map> allocated = new HashMap>(); + for (Container container : allocatedContainers) { SubQueryId subQueryId = subQueryMap.get(container.getPriority()); SubQueryState state = context.getSubQuery(subQueryId).getState(); - if (!(isRunningState(state) && subQueryMap.containsKey(container.getPriority()))) { + if (!(isRunningState(state))) { releaseAssignedContainer(container.getId()); - synchronized (subQueryMap) { - subQueryMap.remove(container.getPriority()); - } } else { if (allocated.containsKey(subQueryId)) { allocated.get(subQueryId).add(container); http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df09fd22/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml b/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml index f3da922..a1a111c 100644 --- a/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml +++ b/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml @@ -37,16 +37,6 @@ - tajo.master.clientservice.addr - 127.0.0.1:9004 - - - - tajo.master.querymastermanager.addr - 127.0.0.1:9005 - - - tajo.query.session.timeout 60000 ms http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df09fd22/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java index 30770cb..32b1f56 100644 --- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java +++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -154,6 +155,7 @@ public class TajoTestingCluster { System.setProperty(MiniDFSCluster.PROP_TEST_BUILD_DATA, this.clusterTestBuildDir.toString()); + conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1); MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(new HdfsConfiguration(conf)); builder.hosts(hosts); builder.numDataNodes(servers);