tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject [09/39] tajo git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into TAJO-1730
Date Wed, 02 Sep 2015 11:06:32 GMT
http://git-wip-us.apache.org/repos/asf/tajo/blob/77dfbbdf/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --cc tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java
index 0000000,71ef0ea..a0f1280
mode 000000,100644..100644
--- a/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@@ -1,0 -1,778 +1,778 @@@
+ /**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.tajo;
+ 
+ import com.google.common.base.Charsets;
+ import com.google.common.io.Closeables;
+ import com.google.common.io.Files;
+ import org.apache.commons.lang.StringUtils;
+ import org.apache.commons.logging.Log;
+ import org.apache.commons.logging.LogFactory;
+ import org.apache.hadoop.fs.*;
+ import org.apache.hadoop.hdfs.DFSConfigKeys;
+ import org.apache.hadoop.hdfs.HdfsConfiguration;
+ import org.apache.hadoop.hdfs.MiniDFSCluster;
+ import org.apache.hadoop.util.ShutdownHookManager;
+ import org.apache.log4j.Level;
+ import org.apache.log4j.Logger;
+ import org.apache.tajo.catalog.*;
+ import org.apache.tajo.client.TajoClient;
+ import org.apache.tajo.client.TajoClientImpl;
+ import org.apache.tajo.client.TajoClientUtil;
+ import org.apache.tajo.conf.TajoConf;
+ import org.apache.tajo.conf.TajoConf.ConfVars;
+ import org.apache.tajo.engine.planner.global.rewriter.GlobalPlanTestRuleProvider;
+ import org.apache.tajo.master.TajoMaster;
+ import org.apache.tajo.plan.rewrite.LogicalPlanTestRuleProvider;
+ import org.apache.tajo.querymaster.Query;
+ import org.apache.tajo.querymaster.QueryMasterTask;
+ import org.apache.tajo.querymaster.Stage;
+ import org.apache.tajo.querymaster.StageState;
+ import org.apache.tajo.service.ServiceTrackerFactory;
+ import org.apache.tajo.storage.FileTablespace;
+ import org.apache.tajo.storage.TablespaceManager;
+ import org.apache.tajo.util.CommonTestingUtil;
+ import org.apache.tajo.util.KeyValueSet;
+ import org.apache.tajo.util.NetUtils;
+ import org.apache.tajo.util.Pair;
+ import org.apache.tajo.worker.TajoWorker;
+ 
+ import java.io.File;
+ import java.io.IOException;
+ import java.io.Writer;
+ import java.net.InetSocketAddress;
+ import java.net.URI;
+ import java.sql.ResultSet;
+ import java.sql.SQLException;
+ import java.util.ArrayList;
+ import java.util.List;
+ import java.util.TimeZone;
+ import java.util.UUID;
+ 
+ public class TajoTestingCluster {
+ 	private static Log LOG = LogFactory.getLog(TajoTestingCluster.class);
+ 	private TajoConf conf;
+   private FileSystem defaultFS;
+   private MiniDFSCluster dfsCluster;
+ 	private MiniCatalogServer catalogServer;
+   private HBaseTestClusterUtil hbaseUtil;
+ 
+   private TajoMaster tajoMaster;
+   private List<TajoWorker> tajoWorkers = new ArrayList<TajoWorker>();
+   private boolean isDFSRunning = false;
+   private boolean isTajoClusterRunning = false;
+   private boolean isCatalogServerRunning = false;
+ 
+ 	private File clusterTestBuildDir = null;
+ 
+ 	/**
+ 	 * Default parent directory for test output.
+ 	 */
+ 	public static final String DEFAULT_TEST_DIRECTORY = "target/" + 
+ 	    System.getProperty("tajo.test.data.dir", "test-data");
+ 
+   /**
+    * True If HiveCatalogStore is used. Otherwise, it is FALSE.
+    */
+   public Boolean isHiveCatalogStoreUse = false;
+ 
+   private static final String LOG_LEVEL;
+ 
+   static {
+     LOG_LEVEL = System.getProperty("LOG_LEVEL");
+   }
+ 
+   public TajoTestingCluster() {
+     this(false);
+   }
+ 
+   public TajoTestingCluster(boolean masterHaEMode) {
+     this.conf = new TajoConf();
+     this.conf.setBoolVar(ConfVars.TAJO_MASTER_HA_ENABLE, masterHaEMode);
+ 
+     initTestDir();
+     setTestingFlagProperties();
+     initPropertiesAndConfigs();
+   }
+ 
+   void setTestingFlagProperties() {
+     System.setProperty(CommonTestingUtil.TAJO_TEST_KEY, CommonTestingUtil.TAJO_TEST_TRUE);
+     conf.set(CommonTestingUtil.TAJO_TEST_KEY, CommonTestingUtil.TAJO_TEST_TRUE);
+   }
+ 
+   void initPropertiesAndConfigs() {
+ 
+     // Set time zone
+     TimeZone testDefaultTZ = TimeZone.getTimeZone(TajoConstants.DEFAULT_SYSTEM_TIMEZONE);
+     conf.setSystemTimezone(testDefaultTZ);
+     TimeZone.setDefault(testDefaultTZ);
+ 
+     // Injection of equality testing code of logical plan (de)serialization
+     conf.setClassVar(ConfVars.LOGICAL_PLAN_REWRITE_RULE_PROVIDER_CLASS, LogicalPlanTestRuleProvider.class);
+     conf.setClassVar(ConfVars.GLOBAL_PLAN_REWRITE_RULE_PROVIDER_CLASS, GlobalPlanTestRuleProvider.class);
+ 
+     conf.setInt(ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES.varname, 4);
+     conf.setInt(ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB.varname, 2000);
+     conf.setInt(ConfVars.WORKER_RESOURCE_AVAILABLE_DISK_PARALLEL_NUM.varname, 3);
+     conf.setInt(ConfVars.SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM.varname, 2);
+ 
+     // Client API RPC
+     conf.setIntVar(ConfVars.RPC_CLIENT_WORKER_THREAD_NUM, 2);
+ 
+     //Client API service RPC Server
+     conf.setIntVar(ConfVars.MASTER_SERVICE_RPC_SERVER_WORKER_THREAD_NUM, 2);
+     conf.setIntVar(ConfVars.WORKER_SERVICE_RPC_SERVER_WORKER_THREAD_NUM, 2);
+     conf.setIntVar(ConfVars.REST_SERVICE_RPC_SERVER_WORKER_THREAD_NUM, 2);
+ 
+     // Internal RPC Client
+     conf.setIntVar(ConfVars.INTERNAL_RPC_CLIENT_WORKER_THREAD_NUM, 2);
+     conf.setIntVar(ConfVars.SHUFFLE_RPC_CLIENT_WORKER_THREAD_NUM, 2);
+ 
+     // Internal RPC Server
+     conf.setIntVar(ConfVars.MASTER_RPC_SERVER_WORKER_THREAD_NUM, 2);
+     conf.setIntVar(ConfVars.QUERY_MASTER_RPC_SERVER_WORKER_THREAD_NUM, 2);
+     conf.setIntVar(ConfVars.WORKER_RPC_SERVER_WORKER_THREAD_NUM, 2);
+     conf.setIntVar(ConfVars.CATALOG_RPC_SERVER_WORKER_THREAD_NUM, 2);
+     conf.setIntVar(ConfVars.SHUFFLE_RPC_SERVER_WORKER_THREAD_NUM, 2);
+ 
+     // Memory cache termination
+     conf.setIntVar(ConfVars.WORKER_HISTORY_EXPIRE_PERIOD, 1);
+ 
+     // Python function path
+     conf.setStrings(ConfVars.PYTHON_CODE_DIR.varname, getClass().getResource("/python").toString());
+ 
+     /* Since Travis CI limits the size of standard output log up to 4MB */
+     if (!StringUtils.isEmpty(LOG_LEVEL)) {
+       Level defaultLevel = Logger.getRootLogger().getLevel();
+       Logger.getLogger("org.apache.tajo").setLevel(Level.toLevel(LOG_LEVEL.toUpperCase(),
defaultLevel));
+       Logger.getLogger("org.apache.hadoop").setLevel(Level.toLevel(LOG_LEVEL.toUpperCase(),
defaultLevel));
+       Logger.getLogger("org.apache.zookeeper").setLevel(Level.toLevel(LOG_LEVEL.toUpperCase(),
defaultLevel));
+       Logger.getLogger("BlockStateChange").setLevel(Level.toLevel(LOG_LEVEL.toUpperCase(),
defaultLevel));
+       Logger.getLogger("org.mortbay.log").setLevel(Level.toLevel(LOG_LEVEL.toUpperCase(),
defaultLevel));
+     }
+   }
+ 
+ 	public TajoConf getConfiguration() {
+ 		return this.conf;
+ 	}
+ 
+   public void initTestDir() {
+     if (clusterTestBuildDir == null) {
+       clusterTestBuildDir = setupClusterTestBuildDir();
+     }
+   }
+ 
+ 	/**
+ 	 * @return Where to write test data on local filesystem; usually
+ 	 * {@link #DEFAULT_TEST_DIRECTORY}
+ 	 * @see #setupClusterTestBuildDir()
+ 	 */
+ 	public File getTestDir() {
+ 		return clusterTestBuildDir;
+ 	}
+ 
+ 	/**
+ 	 * @param subdirName
+ 	 * @return Path to a subdirectory named <code>subdirName</code> under
+ 	 * {@link #getTestDir()}.
+ 	 * @see #setupClusterTestBuildDir()
+ 	 */
+ 	public static File getTestDir(final String subdirName) {
+ 		return new File(new File(DEFAULT_TEST_DIRECTORY), subdirName);
+   }
+ 
+ 	public static File setupClusterTestBuildDir() {
+ 		String randomStr = UUID.randomUUID().toString();
+ 		String dirStr = getTestDir(randomStr).toString();
+ 		File dir = new File(dirStr).getAbsoluteFile();
+ 		// Have it cleaned up on exit
+ 		dir.deleteOnExit();
+ 		return dir;
+ 	}
+ 
+   ////////////////////////////////////////////////////////
+   // HDFS Section
+   ////////////////////////////////////////////////////////
+   /**
+    * Start a minidfscluster.
+    * @param servers How many DNs to start.
+    * @throws Exception
+    * @see {@link #shutdownMiniDFSCluster()}
+    * @return The mini dfs cluster created.
+    */
+   public MiniDFSCluster startMiniDFSCluster(int servers) throws Exception {
+     return startMiniDFSCluster(servers, null, null);
+   }
+ 
+   /**
+    * Start a minidfscluster.
+    * Can only create one.
+    * @param servers How many DNs to start.
+    * @param dir Where to home your dfs cluster.
+    * @param hosts hostnames DNs to run on.
+    * @throws Exception
+    * @see {@link #shutdownMiniDFSCluster()}
+    * @return The mini dfs cluster created.
+    * @throws java.io.IOException
+    */
+   public MiniDFSCluster startMiniDFSCluster(int servers,
+                                             File dir,
+                                             final String hosts[])
+       throws IOException {
+ 
+     conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dir.toString());
+     conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
+     conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, false);
+     MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(new HdfsConfiguration(conf));
+     builder.hosts(hosts);
+     builder.numDataNodes(servers);
+     builder.format(true);
+     builder.manageNameDfsDirs(true);
+     builder.manageDataDfsDirs(true);
+     builder.waitSafeMode(true);
+     this.dfsCluster = builder.build();
+ 
+     // Set this just-started cluster as our filesystem.
+     this.defaultFS = this.dfsCluster.getFileSystem();
+     this.conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, defaultFS.getUri().toString());
+     this.conf.setVar(TajoConf.ConfVars.ROOT_DIR, defaultFS.getUri() + "/tajo");
+     isDFSRunning = true;
+     return this.dfsCluster;
+   }
+ 
+   public void shutdownMiniDFSCluster() throws Exception {
+     if (this.dfsCluster != null) {
+       try {
+         FileSystem fs = this.dfsCluster.getFileSystem();
+         if (fs != null) fs.close();
+       } catch (IOException e) {
+         System.err.println("error closing file system: " + e);
+       }
+       // The below throws an exception per dn, AsynchronousCloseException.
+       this.dfsCluster.shutdown();
+     }
+   }
+ 
+   public boolean isRunningDFSCluster() {
+     return this.defaultFS != null;
+   }
+ 
+   public MiniDFSCluster getMiniDFSCluster() {
+     return this.dfsCluster;
+   }
+ 
+   public FileSystem getDefaultFileSystem() {
+     return this.defaultFS;
+   }
+ 
+   public HBaseTestClusterUtil getHBaseUtil() {
+     return hbaseUtil;
+   }
+ 
+   ////////////////////////////////////////////////////////
+   // Catalog Section
+   ////////////////////////////////////////////////////////
+   public MiniCatalogServer startCatalogCluster() throws Exception {
+     if(isCatalogServerRunning) throw new IOException("Catalog Cluster already running");
+ 
+     TajoConf c = getConfiguration();
+ 
+     conf.set(CatalogConstants.STORE_CLASS, "org.apache.tajo.catalog.store.MemStore");
+     conf.set(CatalogConstants.CATALOG_URI, "jdbc:derby:" + clusterTestBuildDir.getAbsolutePath()
+ "/db");
+     LOG.info("Apache Derby repository is set to " + conf.get(CatalogConstants.CATALOG_URI));
+     conf.setVar(ConfVars.CATALOG_ADDRESS, "localhost:0");
+ 
+     catalogServer = new MiniCatalogServer(conf);
+     CatalogServer catServer = catalogServer.getCatalogServer();
+     InetSocketAddress sockAddr = catServer.getBindAddress();
+     c.setVar(ConfVars.CATALOG_ADDRESS, NetUtils.normalizeInetSocketAddress(sockAddr));
+     isCatalogServerRunning = true;
+     return this.catalogServer;
+   }
+ 
+   public void shutdownCatalogCluster() {
+     if (catalogServer != null) {
+       this.catalogServer.shutdown();
+     }
+     isCatalogServerRunning = false;
+   }
+ 
+   public MiniCatalogServer getMiniCatalogCluster() {
+     return this.catalogServer;
+   }
+ 
+   public boolean isHiveCatalogStoreRunning() {
+     return isHiveCatalogStoreUse;
+   }
+ 
+   ////////////////////////////////////////////////////////
+   // Tajo Cluster Section
+   ////////////////////////////////////////////////////////
+   private void startMiniTajoCluster(File testBuildDir,
+                                                final int numSlaves,
+                                                boolean local) throws Exception {
+     TajoConf c = getConfiguration();
+     c.setVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, "localhost:0");
+     c.setVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS, "localhost:0");
+     c.setVar(ConfVars.RESOURCE_TRACKER_RPC_ADDRESS, "localhost:0");
+     c.setVar(ConfVars.WORKER_PEER_RPC_ADDRESS, "localhost:0");
+     c.setVar(ConfVars.WORKER_TEMPORAL_DIR, "file://" + testBuildDir.getAbsolutePath() +
"/tajo-localdir");
+     c.setIntVar(ConfVars.REST_SERVICE_PORT, 0);
+ 
+     LOG.info("derby repository is set to "+conf.get(CatalogConstants.CATALOG_URI));
+ 
+     if (!local) {
+       String tajoRootDir = getMiniDFSCluster().getFileSystem().getUri().toString() + "/tajo";
+       c.setVar(ConfVars.ROOT_DIR, tajoRootDir);
+ 
+       URI defaultTsUri = TajoConf.getWarehouseDir(c).toUri();
+       FileTablespace defaultTableSpace =
 -          new FileTablespace(TablespaceManager.DEFAULT_TABLESPACE_NAME, defaultTsUri);
++          new FileTablespace(TablespaceManager.DEFAULT_TABLESPACE_NAME, defaultTsUri, null);
+       defaultTableSpace.init(conf);
+       TablespaceManager.addTableSpaceForTest(defaultTableSpace);
+ 
+     } else {
+       c.setVar(ConfVars.ROOT_DIR, "file://" + testBuildDir.getAbsolutePath() + "/tajo");
+     }
+ 
+     setupCatalogForTesting(c, testBuildDir);
+ 
+     tajoMaster = new TajoMaster();
+     tajoMaster.init(c);
+     tajoMaster.start();
+ 
+     this.conf.setVar(ConfVars.WORKER_PEER_RPC_ADDRESS, c.getVar(ConfVars.WORKER_PEER_RPC_ADDRESS));
+     this.conf.setVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, c.getVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS));
+ 
+     InetSocketAddress tajoMasterAddress = tajoMaster.getContext().getTajoMasterService().getBindAddress();
+ 
+     this.conf.setVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS,
+         tajoMasterAddress.getHostName() + ":" + tajoMasterAddress.getPort());
+     this.conf.setVar(ConfVars.RESOURCE_TRACKER_RPC_ADDRESS, c.getVar(ConfVars.RESOURCE_TRACKER_RPC_ADDRESS));
+     this.conf.setVar(ConfVars.CATALOG_ADDRESS, c.getVar(ConfVars.CATALOG_ADDRESS));
+     
+     InetSocketAddress tajoRestAddress = tajoMaster.getContext().getRestServer().getBindAddress();
+     
+     this.conf.setIntVar(ConfVars.REST_SERVICE_PORT, tajoRestAddress.getPort());
+ 
+     startTajoWorkers(numSlaves);
+ 
+     isTajoClusterRunning = true;
+     LOG.info("Mini Tajo cluster is up");
+     LOG.info("====================================================================================");
+     LOG.info("=                           MiniTajoCluster starts up                    
         =");
+     LOG.info("====================================================================================");
+     LOG.info("= * Master Address: " + tajoMaster.getMasterName());
+     LOG.info("= * CatalogStore: " + tajoMaster.getCatalogServer().getStoreClassName());
+     LOG.info("------------------------------------------------------------------------------------");
+     LOG.info("= * Warehouse Dir: " + TajoConf.getWarehouseDir(c));
+     LOG.info("= * Worker Tmp Dir: " + c.getVar(ConfVars.WORKER_TEMPORAL_DIR));
+     LOG.info("====================================================================================");
+   }
+ 
+   private void setupCatalogForTesting(TajoConf c, File testBuildDir) throws IOException
{
+     final String HIVE_CATALOG_CLASS_NAME = "org.apache.tajo.catalog.store.HiveCatalogStore";
+     boolean hiveCatalogClassExists = false;
+     try {
+       getClass().getClassLoader().loadClass(HIVE_CATALOG_CLASS_NAME);
+       hiveCatalogClassExists = true;
+     } catch (ClassNotFoundException e) {
+       LOG.info("HiveCatalogStore is not available.");
+     }
+     String driverClass = System.getProperty(CatalogConstants.STORE_CLASS);
+ 
+     if (hiveCatalogClassExists &&
+         driverClass != null && driverClass.equals(HIVE_CATALOG_CLASS_NAME)) {
+       try {
+         getClass().getClassLoader().loadClass(HIVE_CATALOG_CLASS_NAME);
+         String jdbcUri = "jdbc:derby:;databaseName="+ testBuildDir.toURI().getPath()  +
"/metastore_db;create=true";
+         c.set("hive.metastore.warehouse.dir", TajoConf.getWarehouseDir(c).toString() + "/default");
+         c.set("javax.jdo.option.ConnectionURL", jdbcUri);
+         c.set(TajoConf.ConfVars.WAREHOUSE_DIR.varname, conf.getVar(ConfVars.WAREHOUSE_DIR));
+         c.set(CatalogConstants.STORE_CLASS, HIVE_CATALOG_CLASS_NAME);
+         Path defaultDatabasePath = new Path(TajoConf.getWarehouseDir(c).toString() + "/default");
+         FileSystem fs = defaultDatabasePath.getFileSystem(c);
+         if (!fs.exists(defaultDatabasePath)) {
+           fs.mkdirs(defaultDatabasePath);
+         }
+         isHiveCatalogStoreUse = true;
+       } catch (ClassNotFoundException cnfe) {
+         throw new IOException(cnfe);
+       }
+     } else { // for derby
+       c.set(CatalogConstants.STORE_CLASS, "org.apache.tajo.catalog.store.MemStore");
+       c.set(CatalogConstants.CATALOG_URI, "jdbc:derby:" + testBuildDir.getAbsolutePath()
+ "/db");
+     }
+     c.setVar(ConfVars.CATALOG_ADDRESS, "localhost:0");
+   }
+ 
+   private void startTajoWorkers(int numSlaves) throws Exception {
+     for(int i = 0; i < 1; i++) {
+       TajoWorker tajoWorker = new TajoWorker();
+ 
+       TajoConf workerConf  = new TajoConf(this.conf);
+ 
+       workerConf.setVar(ConfVars.WORKER_INFO_ADDRESS, "localhost:0");
+       workerConf.setVar(ConfVars.WORKER_CLIENT_RPC_ADDRESS, "localhost:0");
+       workerConf.setVar(ConfVars.WORKER_PEER_RPC_ADDRESS, "localhost:0");
+ 
+       workerConf.setVar(ConfVars.WORKER_QM_RPC_ADDRESS, "localhost:0");
+       
+       tajoWorker.startWorker(workerConf, new String[0]);
+ 
+       LOG.info("MiniTajoCluster Worker #" + (i + 1) + " started.");
+       tajoWorkers.add(tajoWorker);
+     }
+   }
+ 
+   public TajoMaster getMaster() {
+     return this.tajoMaster;
+   }
+ 
+   public List<TajoWorker> getTajoWorkers() {
+     return this.tajoWorkers;
+   }
+ 
+   public void shutdownMiniTajoCluster() {
+     if(this.tajoMaster != null) {
+       this.tajoMaster.stop();
+     }
+     for(TajoWorker eachWorker: tajoWorkers) {
+       eachWorker.stopWorkerForce();
+     }
+     tajoWorkers.clear();
+     this.tajoMaster= null;
+   }
+ 
+   ////////////////////////////////////////////////////////
+   // Meta Cluster Section
+   ////////////////////////////////////////////////////////
+   /**
+    * @throws java.io.IOException If a cluster -- dfs or engine -- already running.
+    */
+   void isRunningCluster() throws IOException {
+     if (!isTajoClusterRunning && !isCatalogServerRunning && !isDFSRunning)
return;
+     throw new IOException("Cluster already running at " +
+         this.clusterTestBuildDir);
+   }
+ 
+   /**
+    * This method starts up a tajo cluster with a given number of clusters in
+    * distributed mode.
+    *
+    * @param numSlaves the number of tajo cluster to start up
+    * @throws Exception
+    */
+   public void startMiniCluster(final int numSlaves)
+       throws Exception {
+     startMiniCluster(numSlaves, null);
+   }
+ 
+   public void startMiniCluster(final int numSlaves, final String [] dataNodeHosts) throws
Exception {
+ 
+     int numDataNodes = numSlaves;
+     if(dataNodeHosts != null && dataNodeHosts.length != 0) {
+       numDataNodes = dataNodeHosts.length;
+     }
+ 
+     LOG.info("Starting up minicluster with 1 master(s) and " +
+         numSlaves + " worker(s) and " + numDataNodes + " datanode(s)");
+ 
+     // If we already bring up the cluster, fail.
+     isRunningCluster();
+     if (clusterTestBuildDir != null) {
+       LOG.info("Using passed path: " + clusterTestBuildDir);
+     }
+ 
+     startMiniDFSCluster(numDataNodes, clusterTestBuildDir, dataNodeHosts);
+     this.dfsCluster.waitClusterUp();
+ 
+     conf.setInt("hbase.hconnection.threads.core", 5);
+     conf.setInt("hbase.hconnection.threads.max", 50);
+     hbaseUtil = new HBaseTestClusterUtil(conf, clusterTestBuildDir);
+ 
+     startMiniTajoCluster(this.clusterTestBuildDir, numSlaves, false);
+   }
+ 
+   public void startMiniClusterInLocal(final int numSlaves) throws Exception {
+     isRunningCluster();
+ 
+     if (clusterTestBuildDir != null) {
+       LOG.info("Using passed path: " + clusterTestBuildDir);
+     }
+ 
+     startMiniTajoCluster(this.clusterTestBuildDir, numSlaves, true);
+   }
+ 
+   public void shutdownMiniCluster() throws IOException {
+     LOG.info("========================================");
+     LOG.info("Minicluster is stopping");
+     LOG.info("========================================");
+ 
+     try {
+       Thread.sleep(3000);
+     } catch (InterruptedException e) {
+       e.printStackTrace();
+     }
+ 
+     shutdownMiniTajoCluster();
+ 
+     if(this.catalogServer != null) {
+       shutdownCatalogCluster();
+       isCatalogServerRunning = false;
+     }
+ 
+     try {
+       Thread.sleep(3000);
+     } catch (InterruptedException e) {
+       e.printStackTrace();
+     }
+ 
+     if(this.dfsCluster != null) {
+       try {
+         FileSystem fs = this.dfsCluster.getFileSystem();
+         if (fs != null) fs.close();
+         this.dfsCluster.shutdown();
+       } catch (IOException e) {
+         System.err.println("error closing file system: " + e);
+       }
+       isDFSRunning = false;
+     }
+ 
+     if(this.clusterTestBuildDir != null && this.clusterTestBuildDir.exists()) {
+       if(!ShutdownHookManager.get().isShutdownInProgress()) {
+         //TODO clean test dir when ShutdownInProgress
+         LocalFileSystem localFS = LocalFileSystem.getLocal(conf);
+         localFS.delete(new Path(clusterTestBuildDir.toString()), true);
+         localFS.close();
+       }
+       this.clusterTestBuildDir = null;
+     }
+ 
+     if(hbaseUtil != null) {
+       hbaseUtil.stopZooKeeperCluster();
+       hbaseUtil.stopHBaseCluster();
+     }
+ 
+     LOG.info("Minicluster is down");
+     isTajoClusterRunning = false;
+   }
+ 
+   public TajoClient newTajoClient() throws Exception {
+     return new TajoClientImpl(ServiceTrackerFactory.get(getConfiguration()));
+   }
+ 
+   public static ResultSet run(String[] names,
+                               Schema[] schemas,
+                               KeyValueSet tableOption,
+                               String[][] tables,
+                               String query,
+                               TajoClient client) throws Exception {
+     TajoTestingCluster util = TpchTestBase.getInstance().getTestingCluster();
+ 
+     FileSystem fs = util.getDefaultFileSystem();
+     Path rootDir = TajoConf.getWarehouseDir(util.getConfiguration());
+     fs.mkdirs(rootDir);
+     for (int i = 0; i < names.length; i++) {
+       createTable(names[i], schemas[i], tableOption, tables[i]);
+     }
+     Thread.sleep(1000);
+     ResultSet res = client.executeQueryAndGetResult(query);
+     return res;
+   }
+ 
+   public static ResultSet run(String[] names,
+                               Schema[] schemas,
+                               KeyValueSet tableOption,
+                               String[][] tables,
+                               String query) throws Exception {
+     TpchTestBase instance = TpchTestBase.getInstance();
+     TajoTestingCluster util = instance.getTestingCluster();
+     while(true) {
+       if(util.getMaster().isMasterRunning()) {
+         break;
+       }
+       Thread.sleep(1000);
+     }
+     TajoConf conf = util.getConfiguration();
+     TajoClient client = new TajoClientImpl(ServiceTrackerFactory.get(conf));
+ 
+     try {
+       return run(names, schemas, tableOption, tables, query, client);
+     } finally {
+       client.close();
+     }
+   }
+ 
+   public static TajoClient newTajoClient(TajoTestingCluster util) throws SQLException, InterruptedException
{
+     while(true) {
+       if(util.getMaster().isMasterRunning()) {
+         break;
+       }
+       Thread.sleep(1000);
+     }
+     TajoConf conf = util.getConfiguration();
+     return new TajoClientImpl(ServiceTrackerFactory.get(conf));
+   }
+ 
+   public static void createTable(String tableName, Schema schema,
+                                  KeyValueSet tableOption, String[] tableDatas) throws Exception
{
+     createTable(tableName, schema, tableOption, tableDatas, 1);
+   }
+ 
+   public static void createTable(String tableName, Schema schema,
+                                  KeyValueSet tableOption, String[] tableDatas, int numDataFiles)
throws Exception {
+     TpchTestBase instance = TpchTestBase.getInstance();
+     TajoTestingCluster util = instance.getTestingCluster();
+     TajoClient client = newTajoClient(util);
+     try {
+       FileSystem fs = util.getDefaultFileSystem();
+       Path rootDir = TajoConf.getWarehouseDir(util.getConfiguration());
+       if (!fs.exists(rootDir)) {
+         fs.mkdirs(rootDir);
+       }
+       Path tablePath;
+       if (CatalogUtil.isFQTableName(tableName)) {
+         Pair<String, String> name = CatalogUtil.separateQualifierAndName(tableName);
+         tablePath = new Path(rootDir, new Path(name.getFirst(), name.getSecond()));
+       } else {
+         tablePath = new Path(rootDir, tableName);
+       }
+ 
+       fs.mkdirs(tablePath);
+       if (tableDatas.length > 0) {
+         int recordPerFile = tableDatas.length / numDataFiles;
+         if (recordPerFile == 0) {
+           recordPerFile = 1;
+         }
+         FSDataOutputStream out = null;
+         for (int j = 0; j < tableDatas.length; j++) {
+           if (out == null || j % recordPerFile == 0) {
+             if (out != null) {
+               out.close();
+             }
+             Path dfsPath = new Path(tablePath, tableName + j + ".tbl");
+             out = fs.create(dfsPath);
+           }
+           out.write((tableDatas[j] + "\n").getBytes());
+         }
+         if (out != null) {
+           out.close();
+         }
+       }
+       TableMeta meta = CatalogUtil.newTableMeta("TEXT", tableOption);
+       client.createExternalTable(tableName, schema, tablePath.toUri(), meta);
+     } finally {
+       client.close();
+     }
+   }
+ 
+     /**
+     * Write lines to a file.
+     *
+     * @param file File to write lines to
+     * @param lines Strings written to the file
+     * @throws java.io.IOException
+     */
+   private static void writeLines(File file, String... lines)
+       throws IOException {
+     Writer writer = Files.newWriter(file, Charsets.UTF_8);
+     try {
+       for (String line : lines) {
+         writer.write(line);
+         writer.write('\n');
+       }
+     } finally {
+       Closeables.closeQuietly(writer);
+     }
+   }
+ 
+   public void setAllTajoDaemonConfValue(String key, String value) {
+     tajoMaster.getContext().getConf().set(key, value);
+     setAllWorkersConfValue(key, value);
+   }
+ 
+   public void setAllWorkersConfValue(String key, String value) {
+     for (TajoWorker eachWorker: tajoWorkers) {
+       eachWorker.getConfig().set(key, value);
+     }
+   }
+ 
+   public void waitForQuerySubmitted(QueryId queryId) throws Exception {
+     waitForQuerySubmitted(queryId, 50);
+   }
+ 
+   public void waitForQuerySubmitted(QueryId queryId, int delay) throws Exception {
+     QueryMasterTask qmt = null;
+ 
+     int i = 0;
+     while (qmt == null || TajoClientUtil.isQueryWaitingForSchedule(qmt.getState())) {
+       try {
+         Thread.sleep(delay);
+ 
+         if (qmt == null) {
+           qmt = getQueryMasterTask(queryId);
+         }
+       } catch (InterruptedException e) {
+       }
+       if (++i > 200) {
+         throw new IOException("Timed out waiting for query to start");
+       }
+     }
+   }
+ 
+   public void waitForQueryState(Query query, TajoProtos.QueryState expected, int delay)
throws Exception {
+     int i = 0;
+     while (query == null || query.getSynchronizedState() != expected) {
+       try {
+         Thread.sleep(delay);
+       } catch (InterruptedException e) {
+       }
+       if (++i > 200) {
+         throw new IOException("Timed out waiting. expected: " + expected +
+             ", actual: " + query != null ? String.valueOf(query.getSynchronizedState())
: String.valueOf(query));
+       }
+     }
+   }
+ 
+   public void waitForStageState(Stage stage, StageState expected, int delay) throws Exception
{
+ 
+     int i = 0;
+     while (stage == null || stage.getSynchronizedState() != expected) {
+       try {
+         Thread.sleep(delay);
+       } catch (InterruptedException e) {
+       }
+       if (++i > 200) {
+         throw new IOException("Timed out waiting");
+       }
+     }
+   }
+ 
+   public QueryMasterTask getQueryMasterTask(QueryId queryId) {
+     QueryMasterTask qmt = null;
+     for (TajoWorker worker : getTajoWorkers()) {
+       qmt = worker.getWorkerContext().getQueryMaster().getQueryMasterTask(queryId, true);
+       if (qmt != null && queryId.equals(qmt.getQueryId())) {
+         break;
+       }
+     }
+     return qmt;
+   }
+ }


Mime
View raw message