TAJO-261: Rearrange default port numbers and config names. (hyunsik)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/d5128328
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/d5128328
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/d5128328
Branch: refs/heads/master
Commit: d51283284688dfa25550cfe773dfc52391d6b266
Parents: 93b435d
Author: Hyunsik Choi <hyunsik@apache.org>
Authored: Mon Oct 21 15:37:50 2013 +0900
Committer: Hyunsik Choi <hyunsik@apache.org>
Committed: Mon Oct 21 15:37:50 2013 +0900
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../apache/tajo/catalog/CatalogConstants.java | 1 -
.../src/main/resources/catalog-default.xml | 32 ----
.../org/apache/tajo/catalog/CatalogServer.java | 8 +-
.../src/test/resources/catalog-default.xml | 37 ----
.../java/org/apache/tajo/TajoConstants.java | 2 +-
.../java/org/apache/tajo/conf/TajoConf.java | 181 +++++++++++++------
.../java/org/apache/tajo/util/NetUtils.java | 102 +++++++++++
.../org/apache/tajo/benchmark/BenchmarkSet.java | 4 +-
.../java/org/apache/tajo/benchmark/TPCH.java | 7 +-
.../main/java/org/apache/tajo/cli/TajoCli.java | 138 +++++++++-----
.../java/org/apache/tajo/client/SQLStates.java | 33 ++++
.../java/org/apache/tajo/client/TajoClient.java | 25 ++-
.../java/org/apache/tajo/client/TajoDump.java | 16 +-
.../apache/tajo/engine/planner/LogicalPlan.java | 4 +-
.../engine/planner/PhysicalPlannerImpl.java | 8 +-
.../engine/planner/global/GlobalPlanner.java | 2 +-
.../planner/physical/ExternalSortExec.java | 2 +-
.../org/apache/tajo/master/GlobalEngine.java | 4 +-
.../java/org/apache/tajo/master/TajoMaster.java | 56 +++---
.../tajo/master/TajoMasterClientService.java | 61 +++++--
.../apache/tajo/master/TajoMasterService.java | 6 +-
.../apache/tajo/master/YarnContainerProxy.java | 4 +-
.../tajo/master/YarnTaskRunnerLauncherImpl.java | 2 +-
.../tajo/master/querymaster/QueryInfo.java | 4 +-
.../master/querymaster/QueryJobManager.java | 3 +-
.../tajo/master/querymaster/QueryMaster.java | 10 +-
.../master/querymaster/QueryMasterTask.java | 7 +-
.../master/querymaster/QueryUnitAttempt.java | 2 -
.../tajo/master/querymaster/Repartitioner.java | 2 +-
.../tajo/master/querymaster/SubQuery.java | 4 +-
.../master/rm/TajoWorkerResourceManager.java | 9 +-
.../apache/tajo/master/rm/WorkerResource.java | 18 +-
.../main/java/org/apache/tajo/util/JSPUtil.java | 9 +-
.../apache/tajo/webapp/StaticHttpServer.java | 9 +-
.../tajo/worker/TajoResourceAllocator.java | 4 +-
.../java/org/apache/tajo/worker/TajoWorker.java | 86 ++++-----
.../tajo/worker/TajoWorkerClientService.java | 1 -
.../tajo/worker/TajoWorkerManagerService.java | 8 +-
.../main/java/org/apache/tajo/worker/Task.java | 7 +-
.../java/org/apache/tajo/worker/TaskRunner.java | 9 +-
.../apache/tajo/worker/TaskRunnerManager.java | 8 +-
.../tajo/worker/YarnResourceAllocator.java | 2 +-
.../src/main/proto/ClientProtocol.proto | 171 ------------------
.../src/main/proto/ClientProtos.proto | 5 +-
.../main/proto/TajoMasterClientProtocol.proto | 2 +-
.../src/main/resources/catalog-default.xml | 32 ++++
.../src/main/resources/tajo-default.xml | 88 +--------
.../main/resources/webapps/admin/cluster.jsp | 4 +-
.../src/main/resources/webapps/admin/index.jsp | 43 +++--
.../src/main/resources/webapps/admin/query.jsp | 15 +-
.../org/apache/tajo/BackendTestingUtil.java | 30 ++-
.../apache/tajo/LocalTajoTestingUtility.java | 5 +-
.../org/apache/tajo/MiniTajoYarnCluster.java | 3 +-
.../org/apache/tajo/TajoTestingCluster.java | 49 +++--
.../org/apache/tajo/client/TestTajoClient.java | 129 ++++++-------
.../org/apache/tajo/storage/TestRowFile.java | 3 +-
.../src/test/resources/catalog-default.xml | 37 ----
.../src/test/resources/tajo-default.xml | 43 -----
.../tajo/pullserver/PullServerAuxService.java | 11 +-
.../tajo/pullserver/TajoPullServerService.java | 4 +-
.../tajo/storage/AbstractStorageManager.java | 12 +-
.../java/org/apache/tajo/storage/RawFile.java | 69 +++++--
.../tajo/storage/StorageManagerFactory.java | 39 ++--
.../tajo/storage/v2/DiskFileScanScheduler.java | 4 +-
.../java/org/apache/tajo/storage/v2/RCFile.java | 1 -
.../apache/tajo/storage/v2/RCFileScanner.java | 1 -
.../apache/tajo/storage/v2/ScanScheduler.java | 4 +-
.../tajo/storage/v2/StorageManagerV2.java | 2 +-
.../tajo/storage/v2/TestCSVCompression.java | 2 +-
.../apache/tajo/storage/v2/TestCSVScanner.java | 3 +-
.../apache/tajo/storage/v2/TestStorages.java | 3 +-
tajo-dist/src/main/bin/tajo | 50 +++--
tajo-dist/src/main/conf/tajo-env.sh | 15 +-
.../java/org/apache/tajo/util/NetUtils.java | 102 -----------
75 files changed, 882 insertions(+), 1038 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7226328..3668ad2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -56,6 +56,8 @@ Release 0.2.0 - unreleased
IMPROVEMENTS
+ TAJO-261: Rearrange default port numbers and config names. (hyunsik)
+
TAJO-236: Implement LogicalPlanVerifier to check if a logical plan is
valid. (hyunsik)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java
index 94736f7..d43efbf 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java
@@ -21,7 +21,6 @@ package org.apache.tajo.catalog;
public class CatalogConstants {
public static final String STORE_CLASS="tajo.catalog.store.class";
- //public static final String JDBC_DRIVER = "tajo.catalog.jdbc.driver";
public static final String CONNECTION_ID = "tajo.catalog.jdbc.connection.id";
public static final String CONNECTION_PASSWORD = "tajo.catalog.jdbc.connection.password";
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-catalog/tajo-catalog-common/src/main/resources/catalog-default.xml
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/resources/catalog-default.xml b/tajo-catalog/tajo-catalog-common/src/main/resources/catalog-default.xml
deleted file mode 100644
index 0e9e109..0000000
--- a/tajo-catalog/tajo-catalog-common/src/main/resources/catalog-default.xml
+++ /dev/null
@@ -1,32 +0,0 @@
-<?xml version="1.0"?>
-<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
-
-<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- -->
-
-<configuration>
- <property>
- <name>tajo.catalog.store.class</name>
- <value>org.apache.tajo.catalog.store.DerbyStore</value>
- </property>
-
- <property>
- <name>tajo.catalog.jdbc.uri</name>
- <value>jdbc:derby:/tmp/tcat-${user.name}/db;create=true</value>
- </property>
-</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
index 70cdd28..672912a 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
@@ -104,8 +104,7 @@ public class CatalogServer extends AbstractService {
Constructor<?> cons;
try {
- Class<?> storeClass =
- this.conf.getClass(CatalogConstants.STORE_CLASS, DerbyStore.class);
+ Class<?> storeClass = this.conf.getClass(CatalogConstants.STORE_CLASS, DerbyStore.class);
LOG.info("Catalog Store Class: " + storeClass.getCanonicalName());
cons = storeClass.
@@ -123,7 +122,8 @@ public class CatalogServer extends AbstractService {
}
public String getCatalogServerName() {
- return bindAddressStr + ", class=" + this.store.getClass().getSimpleName() + ", jdbc=" + conf.get(CatalogConstants.JDBC_URI);
+ return bindAddressStr + ", store=" + this.store.getClass().getSimpleName() + ", jdbc="
+ + conf.get(CatalogConstants.JDBC_URI);
}
private void initBuiltinFunctions(List<FunctionDesc> functions)
@@ -144,7 +144,7 @@ public class CatalogServer extends AbstractService {
this.bindAddress = NetUtils.getConnectAddress(this.rpcServer.getListenAddress());
this.bindAddressStr = NetUtils.normalizeInetSocketAddress(bindAddress);
- conf.set(ConfVars.CATALOG_ADDRESS.varname, bindAddressStr);
+ conf.setVar(ConfVars.CATALOG_ADDRESS, bindAddressStr);
} catch (Exception e) {
LOG.error("CatalogServer startup failed", e);
throw new CatalogException(e);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-catalog/tajo-catalog-server/src/test/resources/catalog-default.xml
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/test/resources/catalog-default.xml b/tajo-catalog/tajo-catalog-server/src/test/resources/catalog-default.xml
deleted file mode 100644
index a6b2183..0000000
--- a/tajo-catalog/tajo-catalog-server/src/test/resources/catalog-default.xml
+++ /dev/null
@@ -1,37 +0,0 @@
-<?xml version="1.0"?>
-<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
-
-<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- -->
-
-<configuration>
- <property>
- <name>tajo.catalog.master.addr</name>
- <value>0.0.0.0:0</value>
- </property>
-
- <property>
- <name>tajo.catalog.store.class</name>
- <value>org.apache.tajo.catalog.store.MemStore</value>
- </property>
-
- <property>
- <name>tajo.catalog.jdbc.uri</name>
- <value>jdbc:derby:target/test-data/tcat/db;create=true</value>
- </property>
-</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-common/src/main/java/org/apache/tajo/TajoConstants.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/TajoConstants.java b/tajo-common/src/main/java/org/apache/tajo/TajoConstants.java
index c84b8de..5bce0ae 100644
--- a/tajo-common/src/main/java/org/apache/tajo/TajoConstants.java
+++ b/tajo-common/src/main/java/org/apache/tajo/TajoConstants.java
@@ -19,11 +19,11 @@
package org.apache.tajo;
public class TajoConstants {
+ public static String TAJO_VERSION = "0.2.0-SNAPSHOT";
public static String SYSTEM_CONF_FILENAME = "system_conf.xml";
public static String SYSTEM_DIR_NAME = "system";
public static String WAREHOUSE_DIR_NAME = "warehouse";
- public static String STAGING_DIR_NAME = "staging";
public static String SYSTEM_RESOURCE_DIR_NAME = "resource";
public static String RESULT_DIR_NAME="RESULT";
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 8c2585e..475877b 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -21,11 +21,14 @@ package org.apache.tajo.conf;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.tajo.TajoConstants;
+import org.apache.tajo.util.NetUtils;
import java.io.PrintStream;
+import java.net.InetSocketAddress;
import java.util.Map;
public class TajoConf extends YarnConfiguration {
@@ -57,41 +60,70 @@ public class TajoConf extends YarnConfiguration {
}
public static enum ConfVars {
+
//////////////////////////////////
// Tajo System Configuration
//////////////////////////////////
// a username for a running Tajo cluster
- TAJO_USERNAME("tajo.cluster.username", "tajo"),
+ ROOT_DIR("tajo.rootdir", "file:///tmp/tajo-${user.name}/"),
+ USERNAME("tajo.username", "${user.name}"),
// Configurable System Directories
- ROOT_DIR("tajo.rootdir", "/tajo"),
- WAREHOUSE_DIR("tajo.wahrehouse-dir", EMPTY_VALUE),
- STAGING_ROOT_DIR("tajo.staging.root.dir", ""),
- TASK_LOCAL_DIR("tajo.task.localdir", ""), // local directory for temporal files
- SYSTEM_CONF_PATH("tajo.system.conf.path", ""),
- SYSTEM_CONF_REPLICA_COUNT("tajo.system.conf.replica.count", 20),
-
- // Service Addresses
- TASKRUNNER_LISTENER_ADDRESS("tajo.master.taskrunnerlistener.addr", "0.0.0.0:0"),
- CLIENT_SERVICE_ADDRESS("tajo.master.clientservice.addr", "127.0.0.1:9004"),
- TAJO_MASTER_SERVICE_ADDRESS("tajo.master.manager.addr", "0.0.0.0:9005"),
+ WAREHOUSE_DIR("tajo.warehouse.directory", EMPTY_VALUE),
+ STAGING_ROOT_DIR("tajo.staging.directory", "/tmp/tajo-${user.name}/staging"),
+
+ SYSTEM_CONF_PATH("tajo.system-conf.path", EMPTY_VALUE),
+ SYSTEM_CONF_REPLICA_COUNT("tajo.system-conf.replica-count", 20),
+
+ // Tajo Master Service Addresses
+ TAJO_MASTER_UMBILICAL_RPC_ADDRESS("tajo.master.umbilical-rpc.address", "localhost:26001"),
+ TAJO_MASTER_CLIENT_RPC_ADDRESS("tajo.master.client-rpc.address", "localhost:26002"),
+ TAJO_MASTER_INFO_ADDRESS("tajo.master.info-http.address", "0.0.0.0:26080"),
+
+ // Tajo Worker Service Addresses
+ WORKER_INFO_ADDRESS("tajo.worker.info-http.address", "0.0.0.0:28080"),
+ WORKER_PEER_RPC_ADDRESS("tajo.worker.peer-rpc.address", "0.0.0.0:28091"),
+ WORKER_CLIENT_RPC_ADDRESS("tajo.worker.client-rpc.address", "0.0.0.0:28092"),
+
+ // Tajo Worker Temporal Directories
+ WORKER_TEMPORAL_DIR("tajo.worker.tmpdir.locations", "/tmp/tajo-${user.name}/tmpdir"),
+
+ // Tajo Worker Resources
+ WORKER_RESOURCE_AVAILABLE_CPU_CORES("tajo.worker.resource.cpu-cores", 1),
+ WORKER_RESOURCE_AVAILABLE_MEMORY_MB("tajo.worker.resource.memory-mb", 1024),
+ WORKER_RESOURCE_AVAILABLE_DISKS("tajo.worker.resource.disks", 1),
+ WORKER_EXECUTION_MAX_SLOTS("tajo.worker.parallel-execution.max-num", 2),
+
+ // Tajo Worker Dedicated Resources
+ WORKER_RESOURCE_DEDICATED("tajo.worker.resource.dedicated", false),
+ WORKER_RESOURCE_DEDICATED_MEMORY_RATIO("tajo.worker.resource.dedicated-memory-ratio", 0.8f),
+
+ // Tajo Worker History
+ WORKER_HISTORY_EXPIRE_PERIOD("tajo.worker.history.expire-interval-minutes", 12 * 60), // 12 hours
// Resource Manager
- RESOURCE_MANAGER_CLASS("tajo.resource.manager", "org.apache.tajo.master.rm.YarnTajoResourceManager"),
+ RESOURCE_MANAGER_CLASS("tajo.resource.manager", "org.apache.tajo.master.rm.TajoWorkerResourceManager"),
- // Catalog Address
- CATALOG_ADDRESS("tajo.catalog.master.addr", "0.0.0.0:9002"),
+ // Catalog
+ CATALOG_ADDRESS("tajo.catalog.client-rpc.address", "localhost:26005"),
//////////////////////////////////
- // Worker
+ // for Yarn Resource Manager
//////////////////////////////////
/** how many launching TaskRunners in parallel */
- AM_TASKRUNNER_LAUNCH_PARALLEL_NUM("tajo.master.taskrunnerlauncher.parallel.num", 16),
- MAX_WORKER_PER_NODE("tajo.query.max-workernum.per.node", 8),
+ YARN_RM_QUERY_MASTER_MEMORY_MB("tajo.querymaster.memory-mb", 512),
+ YARN_RM_QUERY_MASTER_DISKS("tajo.yarn-rm.querymaster.disks", 1),
+ YARN_RM_TASKRUNNER_LAUNCH_PARALLEL_NUM("tajo.yarn-rm.parallel-task-runner-launcher-num", 16),
+ YARN_RM_WORKER_NUMBER_PER_NODE("tajo.yarn-rm.max-worker-num-per-node", 8),
+
+ //////////////////////////////////
+ // Query Configuration
+ //////////////////////////////////
+ QUERY_SESSION_TIMEOUT("tajo.query.session.timeout-sec", 60),
//////////////////////////////////
- // Pull Server
+ // Shuffle Configuration
//////////////////////////////////
PULLSERVER_PORT("tajo.pullserver.port", 0),
SHUFFLE_SSL_ENABLED_KEY("tajo.pullserver.ssl.enabled", false),
@@ -103,34 +135,43 @@ public class TajoConf extends YarnConfiguration {
// for RCFile
HIVEUSEEXPLICITRCFILEHEADER("tajo.exec.rcfile.use.explicit.header", true),
-
- //////////////////////////////////
- // Physical Executors
- //////////////////////////////////
- EXTENAL_SORT_BUFFER_NUM("tajo.sort.external.buffer", 1000000),
- BROADCAST_JOIN_THRESHOLD("tajo.join.broadcast.threshold", (long)5 * 1048576),
- INMEMORY_HASH_TABLE_DEFAULT_SIZE("tajo.join.inmemory.table.num", (long)1000000),
- INMEMORY_INNER_HASH_JOIN_THRESHOLD("tajo.join.inner.memhash.threshold", (long)256 * 1048576),
- INMEMORY_HASH_AGGREGATION_THRESHOLD("tajo.aggregation.hash.threshold", (long)256 * 1048576),
- INMEMORY_OUTER_HASH_AGGREGATION_THRESHOLD("tajo.join.outer.memhash.threshold", (long)256 * 1048576),
+ // for Storage Manager v2
+ STORAGE_MANAGER_VERSION_2("tajo.storage-manager.v2", false),
+ STORAGE_MANAGER_DISK_SCHEDULER_MAX_READ_BYTES_PER_SLOT("tajo.storage-manager.max-read-bytes", 8 * 1024 * 1024),
+ STORAGE_MANAGER_DISK_SCHEDULER_REPORT_INTERVAL("tajo.storage-manager.disk-scheduler.report-interval", 60 * 1000),
+ STORAGE_MANAGER_CONCURRENCY_PER_DISK("tajo.storage-manager.disk-scheduler.per-disk-concurrency", 2),
//////////////////////////////////////////
// Distributed Query Execution Parameters
//////////////////////////////////////////
- JOIN_TASK_VOLUME("tajo.join.task-volume.mb", 128),
- SORT_TASK_VOLUME("tajo.sort.task-volume.mb", 128),
- AGGREGATION_TASK_VOLUME("tajo.task-aggregation.volume.mb", 128),
+ DIST_QUERY_BROADCAST_JOIN_THRESHOLD("tajo.dist-query.join.broadcast.threshold-bytes", (long)5 * 1048576),
+
+ DIST_QUERY_JOIN_TASK_VOLUME("tajo.dist-query.join.task-volume-mb", 128),
+ DIST_QUERY_SORT_TASK_VOLUME("tajo.dist-query.sort.task-volume-mb", 128),
+ DIST_QUERY_GROUPBY_TASK_VOLUME("tajo.dist-query.groupby.task-volume-mb", 128),
- JOIN_PARTITION_VOLUME("tajo.join.part-volume.mb", 128),
- SORT_PARTITION_VOLUME("tajo.sort.part-volume.mb", 256),
- AGGREGATION_PARTITION_VOLUME("tajo.aggregation.part-volume.mb", 256),
+ DIST_QUERY_JOIN_PARTITION_VOLUME("tajo.dist-query.join.partition-volume-mb", 128),
+ DIST_QUERY_SORT_PARTITION_VOLUME("tajo.dist-query.sort.partition-volume-mb", 256),
+ DIST_QUERY_GROUPBY_PARTITION_VOLUME("tajo.dist-query.groupby.partition-volume-mb", 256),
+
+ //////////////////////////////////
+ // Physical Executors
+ //////////////////////////////////
+ EXECUTOR_SORT_EXTENAL_BUFFER_SIZE("tajo.executor.sort.external-buffer-num", 1000000),
+ EXECUTOR_INNER_JOIN_INMEMORY_HASH_TABLE_SIZE("tajo.executor.join.inner.in-memory-table-num", (long)1000000),
+ EXECUTOR_INNER_JOIN_INMEMORY_HASH_THRESHOLD("tajo.executor.join.inner.in-memory-hash-threshold-bytes",
+ (long)256 * 1048576),
+ EXECUTOR_OUTER_JOIN_INMEMORY_HASH_THRESHOLD("tajo.eecutor.join.outer.in-memory-hash-threshold-bytes",
+ (long)256 * 1048576),
+ EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD("tajo.executor.groupby.in-memory-hash-threshold-bytes",
+ (long)256 * 1048576),
//////////////////////////////////
// The Below is reserved
//////////////////////////////////
// GeoIP
- GEOIP_DATA("tajo.geoip.data", "/usr/local/share/GeoIP/GeoIP.dat"),
+ GEOIP_DATA("tajo.geoip.data", ""),
//////////////////////////////////
// Hive Configuration
@@ -205,13 +246,10 @@ public class TajoConf extends YarnConfiguration {
enum VarType {
STRING { void checkType(String value) throws Exception { } },
- INT { void checkType(String value) throws Exception { Integer
- .valueOf(value); } },
+ INT { void checkType(String value) throws Exception { Integer.valueOf(value); } },
LONG { void checkType(String value) throws Exception { Long.valueOf(value); } },
- FLOAT { void checkType(String value) throws Exception { Float
- .valueOf(value); } },
- BOOLEAN { void checkType(String value) throws Exception { Boolean
- .valueOf(value); } };
+ FLOAT { void checkType(String value) throws Exception { Float.valueOf(value); } },
+ BOOLEAN { void checkType(String value) throws Exception { Boolean.valueOf(value); } };
boolean isType(String value) {
try { checkType(value); } catch (Exception e) { return false; }
@@ -338,22 +376,27 @@ public class TajoConf extends YarnConfiguration {
}
}
+ public InetSocketAddress getSocketAddrVar(ConfVars var) {
+ final String address = getVar(var);
+ return NetUtils.createSocketAddr(address);
+ }
+
/////////////////////////////////////////////////////////////////////////////
// Tajo System Specific Methods
/////////////////////////////////////////////////////////////////////////////
- public static Path getTajoRootPath(TajoConf conf) {
+ public static Path getTajoRootDir(TajoConf conf) {
String rootPath = conf.getVar(ConfVars.ROOT_DIR);
Preconditions.checkNotNull(rootPath,
ConfVars.ROOT_DIR.varname + " must be set before a Tajo Cluster starts up");
return new Path(rootPath);
}
- public static Path getWarehousePath(TajoConf conf) {
+ public static Path getWarehouseDir(TajoConf conf) {
String warehousePath = conf.getVar(ConfVars.WAREHOUSE_DIR);
if (warehousePath == null || warehousePath.equals("")) {
- Path rootDir = getTajoRootPath(conf);
- warehousePath = new Path(rootDir, TajoConstants.WAREHOUSE_DIR_NAME).toString();
+ Path rootDir = getTajoRootDir(conf);
+ warehousePath = new Path(rootDir, TajoConstants.WAREHOUSE_DIR_NAME).toUri().toString();
conf.setVar(ConfVars.WAREHOUSE_DIR, warehousePath);
return new Path(warehousePath);
} else {
@@ -361,25 +404,45 @@ public class TajoConf extends YarnConfiguration {
}
}
- public static Path getSystemPath(TajoConf conf) {
- Path rootPath = getTajoRootPath(conf);
+ public static Path getSystemDir(TajoConf conf) {
+ Path rootPath = getTajoRootDir(conf);
return new Path(rootPath, TajoConstants.SYSTEM_DIR_NAME);
}
- public static Path getSystemResourcePath(TajoConf conf) {
- return new Path(getSystemPath(conf), TajoConstants.SYSTEM_RESOURCE_DIR_NAME);
+ public static Path getSystemResourceDir(TajoConf conf) {
+ return new Path(getSystemDir(conf), TajoConstants.SYSTEM_RESOURCE_DIR_NAME);
}
- public static Path getStagingRoot(TajoConf conf) {
- String stagingRootDir = conf.getVar(ConfVars.STAGING_ROOT_DIR);
- Preconditions.checkState(stagingRootDir != null && !stagingRootDir.equals(""),
- TajoConstants.STAGING_DIR_NAME + " must be set before starting a Tajo Cluster starts up");
- return new Path(stagingRootDir);
+ private static boolean hasScheme(String path) {
+ return path.indexOf("file:/") == 0 || path.indexOf("hdfs:/") == 0;
}
- public static Path getSystemConf(TajoConf conf) {
- String stagingRootDir = conf.getVar(ConfVars.SYSTEM_CONF_PATH);
- Preconditions.checkNotNull(stagingRootDir, ConfVars.SYSTEM_CONF_PATH.varname + " is not set.");
- return new Path(stagingRootDir);
+ public static Path getStagingDir(TajoConf conf) {
+ String stagingDirString = conf.getVar(ConfVars.STAGING_ROOT_DIR);
+ if (!hasScheme(stagingDirString)) {
+ Path warehousePath = getWarehouseDir(conf);
+ FileSystem fs;
+ try {
+ fs = warehousePath.getFileSystem(conf);
+ } catch (Throwable e) {
+ throw null;
+ }
+ Path path = new Path(fs.getUri().toString(), stagingDirString);
+ conf.setVar(ConfVars.STAGING_ROOT_DIR, path.toString());
+ return path;
+ }
+ return new Path(stagingDirString);
+ }
+
+ public static Path getSystemConfPath(TajoConf conf) {
+ String systemConfPathStr = conf.getVar(ConfVars.SYSTEM_CONF_PATH);
+ if (systemConfPathStr == null || systemConfPathStr.equals("")) {
+ Path systemResourcePath = getSystemResourceDir(conf);
+ Path systemConfPath = new Path(systemResourcePath, TajoConstants.SYSTEM_CONF_FILENAME);
+ conf.setVar(ConfVars.SYSTEM_CONF_PATH, systemConfPath.toString());
+ return systemConfPath;
+ } else {
+ return new Path(systemConfPathStr);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-common/src/main/java/org/apache/tajo/util/NetUtils.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/NetUtils.java b/tajo-common/src/main/java/org/apache/tajo/util/NetUtils.java
new file mode 100644
index 0000000..6a43be3
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/util/NetUtils.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util;
+
+import java.net.*;
+
+public class NetUtils {
+ public static String normalizeInetSocketAddress(InetSocketAddress addr) {
+ return addr.getAddress().getHostAddress() + ":" + addr.getPort();
+ }
+
+ public static InetSocketAddress createSocketAddr(String addr) {
+ String [] splitted = addr.split(":");
+ return new InetSocketAddress(splitted[0], Integer.parseInt(splitted[1]));
+ }
+
+ /**
+ * Util method to build socket addr from either:
+ * <host>
+ * <host>:<port>
+ * <fs>://<host>:<port>/<path>
+ */
+ public static InetSocketAddress createSocketAddr(String host, int port) {
+ return new InetSocketAddress(host, port);
+ }
+
+ public static InetSocketAddress createUnresolved(String addr) {
+ String [] splitted = addr.split(":");
+ return InetSocketAddress.createUnresolved(splitted[0], Integer.parseInt(splitted[1]));
+ }
+
+ /**
+ * Returns InetSocketAddress that a client can use to
+ * connect to the server. NettyServerBase.getListenerAddress() is not correct when
+ * the server binds to "0.0.0.0". This returns "hostname:port" of the server,
+ * or "127.0.0.1:port" when the getListenerAddress() returns "0.0.0.0:port".
+ *
+ * @param addr of a listener
+ * @return socket address that a client can use to connect to the server.
+ */
+ public static InetSocketAddress getConnectAddress(InetSocketAddress addr) {
+ if (!addr.isUnresolved() && addr.getAddress().isAnyLocalAddress()) {
+ try {
+ addr = new InetSocketAddress(InetAddress.getLocalHost(), addr.getPort());
+ } catch (UnknownHostException uhe) {
+ // shouldn't get here unless the host doesn't have a loopback iface
+ addr = new InetSocketAddress("127.0.0.1", addr.getPort());
+ }
+ }
+ return addr;
+ }
+
+ /**
+ * Given an InetAddress, checks to see if the address is a local address, by
+ * comparing the address with all the interfaces on the node.
+ * @param addr address to check if it is local node's address
+ * @return true if the address corresponds to the local node
+ */
+ public static boolean isLocalAddress(InetAddress addr) {
+ // Check if the address is any local or loop back
+ boolean local = addr.isAnyLocalAddress() || addr.isLoopbackAddress();
+
+ // Check if the address is defined on any interface
+ if (!local) {
+ try {
+ local = NetworkInterface.getByInetAddress(addr) != null;
+ } catch (SocketException e) {
+ local = false;
+ }
+ }
+ return local;
+ }
+
+ public static String normalizeHost(String host) {
+ try {
+ InetAddress address = InetAddress.getByName(host);
+ if (isLocalAddress(address)) {
+ return InetAddress.getLocalHost().getHostAddress();
+ } else {
+ return address.getHostAddress();
+ }
+ } catch (UnknownHostException e) {
+ }
+ return host;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java
index af3d002..929dae8 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java
@@ -42,9 +42,9 @@ public abstract class BenchmarkSet {
public void init(TajoConf conf, String dataDir) throws IOException {
this.dataDir = dataDir;
- if (System.getProperty(ConfVars.TASKRUNNER_LISTENER_ADDRESS.varname) != null) {
+ if (System.getProperty(ConfVars.WORKER_PEER_RPC_ADDRESS.varname) != null) {
tajo = new TajoClient(NetUtils.createSocketAddr(
- System.getProperty(ConfVars.TASKRUNNER_LISTENER_ADDRESS.varname)));
+ System.getProperty(ConfVars.WORKER_PEER_RPC_ADDRESS.varname)));
} else {
conf.set(CatalogConstants.STORE_CLASS, MemStore.class.getCanonicalName());
tajo = new TajoClient(conf);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/benchmark/TPCH.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/benchmark/TPCH.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/benchmark/TPCH.java
index 79915bb..7b33b09 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/benchmark/TPCH.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/benchmark/TPCH.java
@@ -30,6 +30,7 @@ import org.apache.tajo.common.TajoDataTypes.Type;
import org.apache.tajo.storage.CSVFile;
import java.io.IOException;
+import java.sql.SQLException;
public class TPCH extends BenchmarkSet {
private final Log LOG = LogFactory.getLog(TPCH.class);
@@ -166,6 +167,10 @@ public class TPCH extends BenchmarkSet {
private void loadTable(String tableName) throws ServiceException {
TableMeta meta = CatalogUtil.newTableMeta(getSchema(tableName), StoreType.CSV);
meta.putOption(CSVFile.DELIMITER, "|");
- tajo.createTable(tableName, new Path(dataDir, tableName), meta);
+ try {
+ tajo.createExternalTable(tableName, new Path(dataDir, tableName), meta);
+ } catch (SQLException s) {
+ throw new ServiceException(s);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/cli/TajoCli.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/cli/TajoCli.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/cli/TajoCli.java
index 8980eb7..984f6e4 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/cli/TajoCli.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/cli/TajoCli.java
@@ -18,7 +18,6 @@
package org.apache.tajo.cli;
-import com.google.protobuf.ServiceException;
import jline.console.ConsoleReader;
import jline.console.history.FileHistory;
import jline.console.history.PersistentHistory;
@@ -26,6 +25,7 @@ import org.apache.commons.cli.*;
import org.apache.commons.lang.StringUtils;
import org.apache.tajo.QueryId;
import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.TajoConstants;
import org.apache.tajo.TajoProtos.QueryState;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.TableDesc;
@@ -64,9 +64,10 @@ public class TajoCli {
private static final Class [] registeredCommands = {
DescTableCommand.class,
HelpCommand.class,
- AttachCommand.class,
DetachCommand.class,
- ExitCommand.class
+ ExitCommand.class,
+ Copyright.class,
+ Version.class
};
private static final String HOME_DIR = System.getProperty("user.home");
@@ -101,17 +102,17 @@ public class TajoCli {
// if there is no "-h" option,
if(hostName == null) {
- if (conf.getVar(ConfVars.CLIENT_SERVICE_ADDRESS) != null) {
+ if (conf.getVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS) != null) {
// it checks if the client service address is given in configuration and distributed mode.
// if so, it sets entryAddr.
- hostName = conf.getVar(ConfVars.CLIENT_SERVICE_ADDRESS).split(":")[0];
+ hostName = conf.getVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS).split(":")[0];
}
}
if (port == null) {
- if (conf.getVar(ConfVars.CLIENT_SERVICE_ADDRESS) != null) {
+ if (conf.getVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS) != null) {
// it checks if the client service address is given in configuration and distributed mode.
// if so, it sets entryAddr.
- port = Integer.parseInt(conf.getVar(ConfVars.CLIENT_SERVICE_ADDRESS).split(":")[1]);
+ port = Integer.parseInt(conf.getVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS).split(":")[1]);
}
}
@@ -119,7 +120,7 @@ public class TajoCli {
System.err.println("ERROR: cannot find valid Tajo server address");
System.exit(-1);
} else if (hostName != null && port != null) {
- conf.setVar(ConfVars.CLIENT_SERVICE_ADDRESS, hostName+":"+port);
+ conf.setVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, hostName+":"+port);
client = new TajoClient(conf);
} else if (hostName == null && port == null) {
client = new TajoClient(conf);
@@ -254,12 +255,10 @@ public class TajoCli {
private void invokeCommand(String [] cmds) {
// this command should be moved to GlobalEngine
- Command invoked = null;
+ Command invoked;
try {
invoked = commands.get(cmds[0]);
invoked.invoke(cmds);
- } catch (IllegalArgumentException iae) {
- sout.println("usage: " + invoked.getCommand() + " " + invoked.getUsage());
} catch (Throwable t) {
sout.println(t.getMessage());
}
@@ -284,7 +283,7 @@ public class TajoCli {
sout.flush();
((PersistentHistory)this.reader.getHistory()).flush();
System.exit(0);
- } else if (cmds[0].equalsIgnoreCase("attach") || cmds[0].equalsIgnoreCase("detach")) {
+ } else if (cmds[0].equalsIgnoreCase("detach") && cmds.length > 1 && cmds[1].equalsIgnoreCase("table")) {
// this command should be moved to GlobalEngine
invokeCommand(cmds);
@@ -496,7 +495,7 @@ public class TajoCli {
@Override
public String getUsage() {
- return "[TB_NAME]";
+ return "[table_name]";
}
@Override
@@ -514,13 +513,28 @@ public class TajoCli {
@Override
public void invoke(String[] cmd) throws Exception {
- for (Map.Entry<String,Command> entry : commands.entrySet()) {
- sout.print(entry.getKey());
- sout.print(" ");
- sout.print(entry.getValue().getUsage());
- sout.print("\t");
- sout.println(entry.getValue().getDescription());
- }
+ sout.println();
+
+ sout.println("General");
+ sout.println(" \\copyright show Apache License 2.0");
+ sout.println(" \\version show Tajo version");
+ sout.println(" \\? show help");
+ sout.println(" \\q quit tsql");
+ sout.println();
+ sout.println();
+
+ sout.println("Informational");
+ sout.println(" \\d list tables");
+ sout.println(" \\d NAME describe table");
+ sout.println();
+ sout.println();
+
+ sout.println("Documentations");
+ sout.println(" tsql guide http://wiki.apache.org/tajo/tsql");
+ sout.println(" Query language http://wiki.apache.org/tajo/QueryLanguage");
+ sout.println(" Functions http://wiki.apache.org/tajo/Functions");
+ sout.println(" Backup & restore http://wiki.apache.org/tajo/BackupAndRestore");
+ sout.println(" Configuration http://wiki.apache.org/tajo/Configuration");
sout.println();
}
@@ -536,65 +550,97 @@ public class TajoCli {
}
// TODO - This should be dealt as a DDL statement instead of a command
- public class AttachCommand extends Command {
+ public class DetachCommand extends Command {
@Override
public String getCommand() {
- return "attach";
+ return "detach";
}
@Override
public void invoke(String[] cmd) throws Exception {
if (cmd.length != 3) {
- throw new IllegalArgumentException();
- }
- if (!client.existTable(cmd[1])) {
- client.attachTable(cmd[1], cmd[2]);
- sout.println("attached " + cmd[1] + " (" + cmd[2] + ")");
+ throw new IllegalArgumentException("usage: detach table [tb_name]");
} else {
- sout.println("ERROR: relation \"" + cmd[1] + "\" already exists");
+ if (client.existTable(cmd[2])) {
+ client.detachTable(cmd[2]);
+ sout.println("Table \"" + cmd[2] + "\" is detached.");
+ } else {
+ sout.println("ERROR: table \"" + cmd[1] + "\" does not exist");
+ }
}
}
@Override
public String getUsage() {
- return "TB_NAME PATH";
+ return "table [table_name]";
}
@Override
public String getDescription() {
- return "attach a existing table as a given table name";
+ return "detach a table, but it does not remove the table directory.";
}
}
- // TODO - This should be dealt as a DDL statement instead of a command
- public class DetachCommand extends Command {
+ public class Version extends Command {
+
@Override
public String getCommand() {
- return "detach";
+ return "\\version";
}
@Override
public void invoke(String[] cmd) throws Exception {
- if (cmd.length != 2) {
- throw new IllegalArgumentException();
- } else {
- if (client.existTable(cmd[1])) {
- client.detachTable(cmd[1]);
- sout.println("detached " + cmd[1] + " from tajo");
- } else {
- sout.println("ERROR: table \"" + cmd[1] + "\" does not exist");
- }
- }
+ sout.println(TajoConstants.TAJO_VERSION);
}
@Override
public String getUsage() {
- return "TB_NAME";
+ return "";
}
@Override
public String getDescription() {
- return "detach a table, but it does not remove the table directory.";
+ return "show Apache License 2.0";
+ }
+ }
+
+ public class Copyright extends Command {
+
+ @Override
+ public String getCommand() {
+ return "\\copyright";
+ }
+
+ @Override
+ public void invoke(String[] cmd) throws Exception {
+ sout.println();
+ sout.println(
+ " Licensed to the Apache Software Foundation (ASF) under one\n" +
+ " or more contributor license agreements. See the NOTICE file\n" +
+ " distributed with this work for additional information\n" +
+ " regarding copyright ownership. The ASF licenses this file\n" +
+ " to you under the Apache License, Version 2.0 (the\n" +
+ " \"License\"); you may not use this file except in compliance\n" +
+ " with the License. You may obtain a copy of the License at\n" +
+ "\n" +
+ " http://www.apache.org/licenses/LICENSE-2.0\n" +
+ "\n" +
+ " Unless required by applicable law or agreed to in writing, software\n" +
+ " distributed under the License is distributed on an \"AS IS\" BASIS,\n" +
+ " WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n" +
+ " See the License for the specific language governing permissions and\n" +
+ " limitations under the License.");
+ sout.println();
+ }
+
+ @Override
+ public String getUsage() {
+ return "";
+ }
+
+ @Override
+ public String getDescription() {
+ return "show Apache License 2.0";
}
}
@@ -637,7 +683,7 @@ public class TajoCli {
try {
invoked.invoke(arguments);
} catch (IllegalArgumentException ige) {
- sout.println("usage: " + invoked.getCommand() + " " + invoked.getUsage());
+ sout.println(ige.getMessage());
} catch (Exception e) {
sout.println(e.getMessage());
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/SQLStates.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/SQLStates.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/SQLStates.java
new file mode 100644
index 0000000..888170b
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/SQLStates.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.client;
+
+public enum SQLStates {
+ ER_NO_SUCH_TABLE("42S02");
+
+ private String state;
+
+ SQLStates(String state) {
+ this.state = state;
+ }
+
+ public String getState() {
+ return state;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoClient.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoClient.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoClient.java
index 19fa618..d94c99f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoClient.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoClient.java
@@ -43,6 +43,7 @@ import org.apache.tajo.util.NetUtils;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.sql.ResultSet;
+import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -64,7 +65,7 @@ public class TajoClient {
public TajoClient(TajoConf conf) throws IOException {
this.conf = conf;
this.conf.set("tajo.disk.scheduler.report.interval", "0");
- String masterAddr = this.conf.getVar(ConfVars.CLIENT_SERVICE_ADDRESS);
+ String masterAddr = this.conf.getVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS);
InetSocketAddress addr = NetUtils.createSocketAddr(masterAddr);
connect(addr);
}
@@ -299,14 +300,18 @@ public class TajoClient {
return tajoMasterService.detachTable(null, builder.build()).getValue();
}
- public TableDesc createTable(String name, Path path, TableMeta meta)
- throws ServiceException {
+ public TableDesc createExternalTable(String name, Path path, TableMeta meta)
+ throws SQLException, ServiceException {
CreateTableRequest.Builder builder = CreateTableRequest.newBuilder();
builder.setName(name);
- builder.setPath(path.toString());
+ builder.setPath(path.toUri().toString());
builder.setMeta(meta.getProto());
- TableResponse res = tajoMasterService.createTable(null, builder.build());
- return CatalogUtil.newTableDesc(res.getTableDesc());
+ TableResponse res = tajoMasterService.createExternalTable(null, builder.build());
+ if (res.getResultCode() == ResultCode.OK) {
+ return CatalogUtil.newTableDesc(res.getTableDesc());
+ } else {
+ throw new SQLException(res.getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState());
+ }
}
public boolean dropTable(String name) throws ServiceException {
@@ -325,14 +330,14 @@ public class TajoClient {
return res.getTablesList();
}
- public TableDesc getTableDesc(String tableName) throws ServiceException {
+ public TableDesc getTableDesc(String tableName) throws SQLException, ServiceException {
GetTableDescRequest.Builder build = GetTableDescRequest.newBuilder();
build.setTableName(tableName);
TableResponse res = tajoMasterService.getTableDesc(null, build.build());
- if (res == null) {
- return null;
- } else {
+ if (res.getResultCode() == ResultCode.OK) {
return CatalogUtil.newTableDesc(res.getTableDesc());
+ } else {
+ throw new SQLException(res.getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoDump.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoDump.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoDump.java
index 84d5646..486ff9f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoDump.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoDump.java
@@ -21,16 +21,14 @@ package org.apache.tajo.client;
import com.google.common.collect.Lists;
import com.google.protobuf.ServiceException;
import org.apache.commons.cli.*;
-import org.apache.commons.cli.Options;
import org.apache.tajo.catalog.DDLBuilder;
import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.function.builtin.Date;
import java.io.IOException;
-import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.io.Writer;
+import java.sql.SQLException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Calendar;
@@ -51,7 +49,7 @@ public class TajoDump {
formatter.printHelp( "tajo_dump [options] [table_name]", options );
}
- public static void main(String [] args) throws ParseException, IOException, ServiceException {
+ public static void main(String [] args) throws ParseException, IOException, ServiceException, SQLException {
TajoConf conf = new TajoConf();
CommandLineParser parser = new PosixParser();
@@ -68,17 +66,17 @@ public class TajoDump {
// if there is no "-h" option,
if(hostName == null) {
- if (conf.getVar(TajoConf.ConfVars.CLIENT_SERVICE_ADDRESS) != null) {
+ if (conf.getVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS) != null) {
// it checks if the client service address is given in configuration and distributed mode.
// if so, it sets entryAddr.
- hostName = conf.getVar(TajoConf.ConfVars.CLIENT_SERVICE_ADDRESS).split(":")[0];
+ hostName = conf.getVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS).split(":")[0];
}
}
if (port == null) {
- if (conf.getVar(TajoConf.ConfVars.CLIENT_SERVICE_ADDRESS) != null) {
+ if (conf.getVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS) != null) {
// it checks if the client service address is given in configuration and distributed mode.
// if so, it sets entryAddr.
- port = Integer.parseInt(conf.getVar(TajoConf.ConfVars.CLIENT_SERVICE_ADDRESS).split(":")[1]);
+ port = Integer.parseInt(conf.getVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS).split(":")[1]);
}
}
@@ -87,7 +85,7 @@ public class TajoDump {
System.err.println("ERROR: cannot find valid Tajo server address");
System.exit(-1);
} else if (hostName != null && port != null) {
- conf.setVar(TajoConf.ConfVars.CLIENT_SERVICE_ADDRESS, hostName+":"+port);
+ conf.setVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, hostName+":"+port);
client = new TajoClient(conf);
} else if (hostName == null && port == null) {
client = new TajoClient(conf);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
index df6081d..4a305ae 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
@@ -84,7 +84,9 @@ public class LogicalPlan {
}
public String newNonameColumnName(String prefix) {
- return "?" + prefix + "_" + (noNameColumnId++);
+ String suffix = noNameColumnId == 0 ? "" : String.valueOf(noNameColumnId);
+ noNameColumnId++;
+ return "?" + prefix + suffix;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index a8aaa68..4a3940c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -292,7 +292,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
long leftSize = estimateSizeRecursive(context, leftLineage);
long rightSize = estimateSizeRecursive(context, rightLineage);
- final long threshold = conf.getLongVar(TajoConf.ConfVars.INMEMORY_INNER_HASH_JOIN_THRESHOLD);
+ final long threshold = conf.getLongVar(TajoConf.ConfVars.EXECUTOR_INNER_JOIN_INMEMORY_HASH_THRESHOLD);
boolean hashJoin = false;
if (leftSize < threshold || rightSize < threshold) {
@@ -363,7 +363,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
String [] rightLineage = PlannerUtil.getLineage(plan.getRightChild());
long rightTableVolume = estimateSizeRecursive(context, rightLineage);
- if (rightTableVolume < conf.getLongVar(TajoConf.ConfVars.INMEMORY_OUTER_HASH_AGGREGATION_THRESHOLD)) {
+ if (rightTableVolume < conf.getLongVar(TajoConf.ConfVars.EXECUTOR_OUTER_JOIN_INMEMORY_HASH_THRESHOLD)) {
// we can implement left outer join using hash join, using the right operand as the build relation
LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Hash Join].");
return new HashLeftOuterJoinExec(context, plan, leftExec, rightExec);
@@ -381,7 +381,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
// blocking, but merge join is blocking as well)
String [] outerLineage4 = PlannerUtil.getLineage(plan.getLeftChild());
long outerSize = estimateSizeRecursive(context, outerLineage4);
- if (outerSize < conf.getLongVar(TajoConf.ConfVars.INMEMORY_OUTER_HASH_AGGREGATION_THRESHOLD)){
+ if (outerSize < conf.getLongVar(TajoConf.ConfVars.EXECUTOR_OUTER_JOIN_INMEMORY_HASH_THRESHOLD)){
LOG.info("Right Outer Join (" + plan.getPID() +") chooses [Hash Join].");
return new HashLeftOuterJoinExec(context, plan, rightExec, leftExec);
} else {
@@ -701,7 +701,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
String [] outerLineage = PlannerUtil.getLineage(groupbyNode.getChild());
long estimatedSize = estimateSizeRecursive(context, outerLineage);
- final long threshold = conf.getLongVar(TajoConf.ConfVars.INMEMORY_HASH_AGGREGATION_THRESHOLD);
+ final long threshold = conf.getLongVar(TajoConf.ConfVars.EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD);
// if the relation size is less than the threshold,
// the hash aggregation will be used.
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
index e540570..37f0f9e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
@@ -171,7 +171,7 @@ public class GlobalPlanner {
TableMeta leftMeta = leftScan.getTableDesc().getMeta();
TableMeta rightMeta = rightScan.getTableDesc().getMeta();
- long broadcastThreshold = conf.getLongVar(TajoConf.ConfVars.BROADCAST_JOIN_THRESHOLD);
+ long broadcastThreshold = conf.getLongVar(TajoConf.ConfVars.DIST_QUERY_BROADCAST_JOIN_THRESHOLD);
if (leftMeta.getStat().getNumBytes() < broadcastThreshold) {
leftBroadcasted = true;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
index 19fde88..12bd30b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
@@ -50,7 +50,7 @@ public class ExternalSortExec extends SortExec {
super(context, plan.getInSchema(), plan.getOutSchema(), child, plan.getSortKeys());
this.plan = plan;
- this.MEM_TUPLE_NUM = context.getConf().getIntVar(ConfVars.EXTENAL_SORT_BUFFER_NUM);
+ this.MEM_TUPLE_NUM = context.getConf().getIntVar(ConfVars.EXECUTOR_SORT_EXTENAL_BUFFER_SIZE);
this.tupleSlots = new ArrayList<Tuple>(MEM_TUPLE_NUM);
this.sortTmpDir = new Path(context.getWorkDir(), UUID.randomUUID().toString());
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
index 9d6b4ff..c3eb5f7 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -241,7 +241,7 @@ public class GlobalEngine extends AbstractService {
}
if(!createTable.isExternal()){
- Path tablePath = new Path(sm.getTableBaseDir(), createTable.getTableName().toLowerCase());
+ Path tablePath = new Path(sm.getWarehouseDir(), createTable.getTableName().toLowerCase());
createTable.setPath(tablePath);
} else {
Preconditions.checkState(createTable.hasPath(), "ERROR: LOCATION must be given.");
@@ -353,7 +353,7 @@ public class GlobalEngine extends AbstractService {
StoreTableNode storeTableNode = plan.getRootBlock().getStoreTableNode();
String tableName = storeTableNode.getTableName();
queryContext.setOutputTable(tableName);
- queryContext.setOutputPath(new Path(TajoConf.getWarehousePath(context.getConf()), tableName));
+ queryContext.setOutputPath(new Path(TajoConf.getWarehouseDir(context.getConf()), tableName));
queryContext.setCreateTable();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java
index 6fba7f6..f4118bb 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java
@@ -22,6 +22,7 @@ import com.google.protobuf.ServiceException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -36,6 +37,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.service.CompositeService;
import org.apache.hadoop.yarn.service.Service;
import org.apache.hadoop.yarn.util.RackResolver;
+import org.apache.tajo.TajoConstants;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.proto.CatalogProtos.FunctionType;
import org.apache.tajo.common.TajoDataTypes.Type;
@@ -46,8 +48,8 @@ import org.apache.tajo.engine.function.InCountry;
import org.apache.tajo.engine.function.builtin.*;
import org.apache.tajo.engine.function.string.*;
import org.apache.tajo.master.querymaster.QueryJobManager;
+import org.apache.tajo.master.rm.TajoWorkerResourceManager;
import org.apache.tajo.master.rm.WorkerResourceManager;
-import org.apache.tajo.master.rm.YarnTajoResourceManager;
import org.apache.tajo.storage.AbstractStorageManager;
import org.apache.tajo.storage.StorageManagerFactory;
import org.apache.tajo.util.CommonTestingUtil;
@@ -60,6 +62,7 @@ import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.lang.reflect.Constructor;
+import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
@@ -72,21 +75,21 @@ public class TajoMaster extends CompositeService {
/** rw-r--r-- */
@SuppressWarnings("OctalInteger")
- final public static FsPermission TAJO_ROOT_DIR_PERMISSION = FsPermission.createImmutable((short) 0644);
+ final public static FsPermission TAJO_ROOT_DIR_PERMISSION = FsPermission.createImmutable((short) 0755);
/** rw-r--r-- */
@SuppressWarnings("OctalInteger")
- final public static FsPermission SYSTEM_DIR_PERMISSION = FsPermission.createImmutable((short) 0644);
+ final public static FsPermission SYSTEM_DIR_PERMISSION = FsPermission.createImmutable((short) 0755);
/** rw-r--r-- */
- final public static FsPermission SYSTEM_RESOURCE_DIR_PERMISSION = FsPermission.createImmutable((short) 0644);
+ final public static FsPermission SYSTEM_RESOURCE_DIR_PERMISSION = FsPermission.createImmutable((short) 0755);
/** rw-r--r-- */
@SuppressWarnings("OctalInteger")
- final public static FsPermission WAREHOUSE_DIR_PERMISSION = FsPermission.createImmutable((short) 0644);
+ final public static FsPermission WAREHOUSE_DIR_PERMISSION = FsPermission.createImmutable((short) 0755);
/** rw-r--r-- */
@SuppressWarnings("OctalInteger")
- final public static FsPermission STAGING_ROOTDIR_PERMISSION = FsPermission.createImmutable((short) 0644);
+ final public static FsPermission STAGING_ROOTDIR_PERMISSION = FsPermission.createImmutable((short) 0755);
/** rw-r--r-- */
@SuppressWarnings("OctalInteger")
- final public static FsPermission SYSTEM_CONF_FILE_PERMISSION = FsPermission.createImmutable((short) 0644);
+ final public static FsPermission SYSTEM_CONF_FILE_PERMISSION = FsPermission.createImmutable((short) 0755);
private MasterContext context;
@@ -118,11 +121,11 @@ public class TajoMaster extends CompositeService {
}
public String getMasterName() {
- return tajoMasterService.getBindAddress().getHostName() + ":" + tajoMasterService.getBindAddress().getPort();
+ return NetUtils.normalizeInetSocketAddress(tajoMasterService.getBindAddress());
}
public String getVersion() {
- return "0.2.0";
+ return TajoConstants.TAJO_VERSION;
}
public TajoMasterClientService getTajoMasterClientService() {
@@ -176,7 +179,7 @@ public class TajoMaster extends CompositeService {
private void initResourceManager() throws Exception {
Class<WorkerResourceManager> resourceManagerClass = (Class<WorkerResourceManager>)
- systemConf.getClass(ConfVars.RESOURCE_MANAGER_CLASS.varname, YarnTajoResourceManager.class);
+ systemConf.getClass(ConfVars.RESOURCE_MANAGER_CLASS.varname, TajoWorkerResourceManager.class);
Constructor<WorkerResourceManager> constructor = resourceManagerClass.getConstructor(MasterContext.class);
resourceManager = constructor.newInstance(context);
resourceManager.init(context.getConf());
@@ -184,8 +187,8 @@ public class TajoMaster extends CompositeService {
private void initWebServer() throws Exception {
if (!systemConf.get(CommonTestingUtil.TAJO_TEST, "FALSE").equalsIgnoreCase("TRUE")) {
- int httpPort = systemConf.getInt("tajo.master.http.port", 8080);
- webServer = StaticHttpServer.getInstance(this ,"admin", null, httpPort ,
+ InetSocketAddress address = systemConf.getSocketAddrVar(ConfVars.TAJO_MASTER_INFO_ADDRESS);
+ webServer = StaticHttpServer.getInstance(this ,"admin", address.getHostName(), address.getPort(),
true, null, context.getConf(), null);
webServer.addServlet("queryServlet", "/query_exec", QueryExecutorServlet.class);
webServer.start();
@@ -194,12 +197,12 @@ public class TajoMaster extends CompositeService {
private void checkAndInitializeSystemDirectories() throws IOException {
// Get Tajo root dir
- this.tajoRootPath = TajoConf.getTajoRootPath(systemConf);
+ this.tajoRootPath = TajoConf.getTajoRootDir(systemConf);
LOG.info("Tajo Root Directory: " + tajoRootPath);
// Check and Create Tajo root dir
this.defaultFS = tajoRootPath.getFileSystem(systemConf);
- systemConf.set("fs.defaultFS", defaultFS.getUri().toString());
+ systemConf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, defaultFS.getUri().toString());
LOG.info("FileSystem (" + this.defaultFS.getUri() + ") is initialized.");
if (!defaultFS.exists(tajoRootPath)) {
defaultFS.mkdirs(tajoRootPath, new FsPermission(TAJO_ROOT_DIR_PERMISSION));
@@ -207,20 +210,20 @@ public class TajoMaster extends CompositeService {
}
// Check and Create system and system resource dir
- Path systemPath = TajoConf.getSystemPath(systemConf);
+ Path systemPath = TajoConf.getSystemDir(systemConf);
if (!defaultFS.exists(systemPath)) {
defaultFS.mkdirs(systemPath, new FsPermission(SYSTEM_DIR_PERMISSION));
LOG.info("System dir '" + systemPath + "' is created");
}
- Path systemResourcePath = TajoConf.getSystemResourcePath(systemConf);
+ Path systemResourcePath = TajoConf.getSystemResourceDir(systemConf);
if (!defaultFS.exists(systemResourcePath)) {
defaultFS.mkdirs(systemResourcePath, new FsPermission(SYSTEM_RESOURCE_DIR_PERMISSION));
LOG.info("System resource dir '" + systemResourcePath + "' is created");
}
// Get Warehouse dir
- this.wareHousePath = TajoConf.getWarehousePath(systemConf);
- LOG.info("Tajo Warehouse Dir: " + wareHousePath);
+ this.wareHousePath = TajoConf.getWarehouseDir(systemConf);
+ LOG.info("Tajo Warehouse dir: " + wareHousePath);
// Check and Create Warehouse dir
if (!defaultFS.exists(wareHousePath)) {
@@ -228,9 +231,11 @@ public class TajoMaster extends CompositeService {
LOG.info("Warehouse dir '" + wareHousePath + "' is created");
}
- Path stagingPath = TajoConf.getStagingRoot(systemConf);
+ Path stagingPath = TajoConf.getStagingDir(systemConf);
+ LOG.info("Staging dir: " + wareHousePath);
if (!defaultFS.exists(stagingPath)) {
defaultFS.mkdirs(stagingPath, new FsPermission(STAGING_ROOTDIR_PERMISSION));
+ LOG.info("Staging dir '" + stagingPath + "' is created");
}
}
@@ -420,11 +425,16 @@ public class TajoMaster extends CompositeService {
private void writeSystemConf() throws IOException {
// Storing the system configs
- Path systemResourcePath = TajoConf.getSystemResourcePath(systemConf);
- Path systemConfPath = new Path(systemResourcePath, "system_conf.xml");
- systemConf.setVar(ConfVars.SYSTEM_CONF_PATH, systemConfPath.toUri().toString());
+ Path systemConfPath = TajoConf.getSystemConfPath(systemConf);
+
+ if (!defaultFS.exists(systemConfPath.getParent())) {
+ defaultFS.mkdirs(systemConfPath.getParent());
+ }
+
+ if (defaultFS.exists(systemConfPath)) {
+ defaultFS.delete(systemConfPath, false);
+ }
- defaultFS.delete(systemConfPath, true);
FSDataOutputStream out = FileSystem.create(defaultFS, systemConfPath,
new FsPermission(SYSTEM_CONF_FILE_PERMISSION));
try {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index 1ed0f54..6b7d8d3 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -81,7 +81,7 @@ public class TajoMasterClientService extends AbstractService {
public void start() {
// start the rpc server
- String confClientServiceAddr = conf.getVar(ConfVars.CLIENT_SERVICE_ADDRESS);
+ String confClientServiceAddr = conf.getVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS);
InetSocketAddress initIsa = NetUtils.createSocketAddr(confClientServiceAddr);
try {
server = new BlockingRpcServer(TajoMasterClientProtocol.class, clientHandler, initIsa);
@@ -90,7 +90,7 @@ public class TajoMasterClientService extends AbstractService {
}
server.start();
bindAddress = NetUtils.getConnectAddress(server.getListenAddress());
- this.conf.setVar(ConfVars.CLIENT_SERVICE_ADDRESS, NetUtils.normalizeInetSocketAddress(bindAddress));
+ this.conf.setVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, NetUtils.normalizeInetSocketAddress(bindAddress));
LOG.info("Instantiated TajoMasterClientService at " + this.bindAddress);
super.start();
}
@@ -287,26 +287,63 @@ public class TajoMasterClientService extends AbstractService {
String name = request.getTableName();
if (catalog.existsTable(name)) {
return TableResponse.newBuilder()
+ .setResultCode(ResultCode.OK)
.setTableDesc((TableDescProto) catalog.getTableDesc(name).getProto())
.build();
} else {
- return null;
+ return TableResponse.newBuilder()
+ .setResultCode(ResultCode.ERROR)
+ .setErrorMessage("No such a table: " + request.getTableName())
+ .build();
}
}
@Override
- public TableResponse createTable(RpcController controller, CreateTableRequest request)
+ public TableResponse createExternalTable(RpcController controller, CreateTableRequest request)
throws ServiceException {
- Path path = new Path(request.getPath());
- TableMeta meta = new TableMetaImpl(request.getMeta());
- TableDesc desc;
try {
- desc = context.getGlobalEngine().createTableOnDirectory(request.getName(), meta, path, false);
- } catch (Exception e) {
- return TableResponse.newBuilder().setErrorMessage(e.getMessage()).build();
- }
+ Path path = new Path(request.getPath());
+ FileSystem fs = path.getFileSystem(conf);
+
+ if (!fs.exists(path)) {
+ throw new IOException("No such a directory: " + path);
+ }
+
+ TableMeta meta = new TableMetaImpl(request.getMeta());
+
+ if (meta.getStat() == null) {
+ meta.setStat(new TableStat());
+ }
+
+ TableStat stat = meta.getStat();
+ long totalSize;
+ try {
+ totalSize = fs.getContentSummary(path).getSpaceConsumed();
+ } catch (IOException e) {
+ String message =
+ "Cannot get the volume of the table \"" + request.getName() + "\" from " + request.getPath();
+ LOG.warn(message);
+ throw new IOException(message, e);
+ }
+ stat.setNumBytes(totalSize);
- return TableResponse.newBuilder().setTableDesc((TableDescProto) desc.getProto()).build();
+ TableDesc desc;
+ try {
+ desc = context.getGlobalEngine().createTableOnDirectory(request.getName(), meta, path, false);
+ } catch (Exception e) {
+ return TableResponse.newBuilder()
+ .setResultCode(ResultCode.ERROR)
+ .setErrorMessage(e.getMessage()).build();
+ }
+
+ return TableResponse.newBuilder()
+ .setResultCode(ResultCode.OK)
+ .setTableDesc((TableDescProto) desc.getProto()).build();
+ } catch (IOException ioe) {
+ return TableResponse.newBuilder()
+ .setResultCode(ResultCode.ERROR)
+ .setErrorMessage(ioe.getMessage()).build();
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
index 4b27e46..04b562c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
@@ -58,7 +58,7 @@ public class TajoMasterService extends AbstractService {
@Override
public void start() {
- String confMasterServiceAddr = conf.getVar(TajoConf.ConfVars.TAJO_MASTER_SERVICE_ADDRESS);
+ String confMasterServiceAddr = conf.getVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS);
InetSocketAddress initIsa = NetUtils.createSocketAddr(confMasterServiceAddr);
try {
server = new AsyncRpcServer(TajoMasterProtocol.class, masterHandler, initIsa);
@@ -67,7 +67,7 @@ public class TajoMasterService extends AbstractService {
}
server.start();
bindAddress = NetUtils.getConnectAddress(server.getListenAddress());
- this.conf.setVar(TajoConf.ConfVars.TAJO_MASTER_SERVICE_ADDRESS,
+ this.conf.setVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS,
NetUtils.normalizeInetSocketAddress(bindAddress));
LOG.info("Instantiated TajoMasterService at " + this.bindAddress);
super.start();
@@ -139,7 +139,7 @@ public class TajoMasterService extends AbstractService {
WorkerResource workerResource = new WorkerResource();
String[] tokens = eachWorkerResource.getWorkerHostAndPort().split(":");
workerResource.setAllocatedHost(tokens[0]);
- workerResource.setManagerPort(Integer.parseInt(tokens[1]));
+ workerResource.setPeerRpcPort(Integer.parseInt(tokens[1]));
workerResource.setMemoryMBSlots(eachWorkerResource.getMemoryMBSlots());
workerResource.setDiskSlots(eachWorkerResource.getDiskSlots());
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnContainerProxy.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnContainerProxy.java
index c4fd8b8..5117700 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnContainerProxy.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnContainerProxy.java
@@ -296,7 +296,7 @@ public class YarnContainerProxy extends ContainerProxy {
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
FileSystem fs = null;
FileContext fsCtx = null;
- LOG.info("defaultFS: " + conf.get("fs.defaultFS"));
+ LOG.info("defaultFS: " + conf.get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY));
try {
fs = FileSystem.get(conf);
fsCtx = FileContext.getFileContext(conf);
@@ -305,7 +305,7 @@ public class YarnContainerProxy extends ContainerProxy {
}
try {
- Path systemConfPath = TajoConf.getSystemConf(conf);
+ Path systemConfPath = TajoConf.getSystemConfPath(conf);
if (!fs.exists(systemConfPath)) {
LOG.error("system_conf.xml (" + systemConfPath.toString() + ") Not Found");
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnTaskRunnerLauncherImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnTaskRunnerLauncherImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnTaskRunnerLauncherImpl.java
index c07d759..c8792ca 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnTaskRunnerLauncherImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnTaskRunnerLauncherImpl.java
@@ -68,7 +68,7 @@ public class YarnTaskRunnerLauncherImpl extends AbstractService implements TaskR
this.context = context;
this.yarnRPC = yarnRPC;
executorService = Executors.newFixedThreadPool(
- context.getConf().getIntVar(TajoConf.ConfVars.AM_TASKRUNNER_LAUNCH_PARALLEL_NUM));
+ context.getConf().getIntVar(TajoConf.ConfVars.YARN_RM_TASKRUNNER_LAUNCH_PARALLEL_NUM));
}
public void start() {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
index 89084d1..f89a017 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
@@ -66,7 +66,7 @@ public class QueryInfo {
if(queryMasterResource == null) {
return 0;
}
- return queryMasterResource.getManagerPort();
+ return queryMasterResource.getPeerRpcPort();
}
public int getQueryMasterClientPort() {
@@ -122,6 +122,6 @@ public class QueryInfo {
@Override
public String toString() {
- return queryId.toString() + "state=" + queryState +", queryMaster=" + queryMasterResource;
+ return queryId.toString() + "state=" + queryState +",progress=" + progress + ", queryMaster=" + queryMasterResource;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
index faa5a5c..f5f029f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
@@ -171,13 +171,14 @@ public class QueryJobManager extends CompositeService {
if(queryHeartbeat.getTajoWorkerHost() != null) {
WorkerResource queryMasterResource = new WorkerResource();
queryMasterResource.setAllocatedHost(queryHeartbeat.getTajoWorkerHost());
- queryMasterResource.setManagerPort(queryHeartbeat.getTajoWorkerPort());
+ queryMasterResource.setPeerRpcPort(queryHeartbeat.getTajoWorkerPort());
queryMasterResource.setClientPort(queryHeartbeat.getTajoWorkerClientPort());
queryMasterResource.setPullServerPort(queryHeartbeat.getTajoWorkerPullServerPort());
queryInfo.setQueryMasterResource(queryMasterResource);
}
queryInfo.setLastMessage(queryHeartbeat.getStatusMessage());
queryInfo.setQueryState(queryHeartbeat.getState());
+ queryInfo.setProgress(queryHeartbeat.getQueryProgress());
if (queryHeartbeat.hasQueryFinishTime()) {
queryInfo.setFinishTime(queryHeartbeat.getQueryFinishTime());
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
index fa6790d..2860b17 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
@@ -53,7 +53,7 @@ import static org.apache.tajo.ipc.TajoMasterProtocol.TajoHeartbeat;
public class QueryMaster extends CompositeService implements EventHandler {
private static final Log LOG = LogFactory.getLog(QueryMaster.class.getName());
- private static int QUERY_SESSION_TIMEOUT = 60 * 1000; //60 sec
+ private int querySessionTimeout;
private Clock clock;
@@ -91,7 +91,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
try {
systemConf = (TajoConf)conf;
- QUERY_SESSION_TIMEOUT = 60 * 1000;
+ querySessionTimeout = systemConf.getIntVar(TajoConf.ConfVars.QUERY_SESSION_TIMEOUT);
queryMasterContext = new QueryMasterContext(systemConf);
clock = new SystemClock();
@@ -368,7 +368,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
try {
long lastHeartbeat = eachTask.getLastClientHeartbeat();
long time = System.currentTimeMillis() - lastHeartbeat;
- if(lastHeartbeat > 0 && time > QUERY_SESSION_TIMEOUT) {
+ if(lastHeartbeat > 0 && time > querySessionTimeout * 1000) {
LOG.warn("Query " + eachTask.getQueryId() + " stopped cause query sesstion timeout: " + time + " ms");
eachTask.expiredSessionTimeout();
}
@@ -383,8 +383,8 @@ public class QueryMaster extends CompositeService implements EventHandler {
class FinishedQueryMasterTaskCleanThread extends Thread {
public void run() {
- int expireIntervalTime = systemConf.getInt("tajo.worker.history.expire.interval.min", 12 * 60); //12 hour
- LOG.info("FinishedQueryMasterTaskCleanThread started: expireIntervalTime=" + expireIntervalTime);
+ int expireIntervalTime = systemConf.getIntVar(TajoConf.ConfVars.WORKER_HISTORY_EXPIRE_PERIOD);
+ LOG.info("FinishedQueryMasterTaskCleanThread started: expire interval minutes = " + expireIntervalTime);
while(!queryMasterStop.get()) {
try {
Thread.sleep(60 * 1000 * 60); // hourly
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
index ae1508c..26cba45 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
@@ -113,8 +113,7 @@ public class QueryMasterTask extends CompositeService {
try {
queryTaskContext = new QueryMasterTaskContext();
- String resourceManagerClassName = conf.get("tajo.resource.manager",
- TajoWorkerResourceManager.class.getCanonicalName());
+ String resourceManagerClassName = systemConf.getVar(TajoConf.ConfVars.RESOURCE_MANAGER_CLASS);
if(resourceManagerClassName.indexOf(TajoWorkerResourceManager.class.getName()) >= 0) {
resourceAllocator = new TajoResourceAllocator(queryTaskContext);
@@ -307,7 +306,7 @@ public class QueryMasterTask extends CompositeService {
ugi = UserGroupInformation.getLoginUser();
realUser = ugi.getShortUserName();
currentUser = UserGroupInformation.getCurrentUser().getShortUserName();
- FileSystem defaultFS = FileSystem.get(systemConf);
+ FileSystem defaultFS = TajoConf.getWarehouseDir(systemConf).getFileSystem(systemConf);
Path stagingDir = null;
Path outputDir = null;
@@ -316,7 +315,7 @@ public class QueryMasterTask extends CompositeService {
// Create Output Directory
////////////////////////////////////////////
- stagingDir = new Path(TajoConf.getStagingRoot(systemConf), queryId.toString());
+ stagingDir = new Path(TajoConf.getStagingDir(systemConf), queryId.toString());
if (defaultFS.exists(stagingDir)) {
throw new IOException("The staging directory '" + stagingDir + "' already exists");
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
index 9b9c63f..f01cb75 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
@@ -273,7 +273,6 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
@Override
public void transition(QueryUnitAttempt queryUnitAttempt,
TaskAttemptEvent taskAttemptEvent) {
- LOG.info(">>>>>>>>> Already Assigned: " + queryUnitAttempt.getId());
}
}
@@ -283,7 +282,6 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
@Override
public void transition(QueryUnitAttempt queryUnitAttempt,
TaskAttemptEvent taskAttemptEvent) {
- LOG.info(">>>>>>>>> Already Done: " + queryUnitAttempt.getId());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
index 10795ea..178e9b5 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -161,7 +161,7 @@ public class Repartitioner {
// of a larger table
int largerIdx = stats[0].getNumBytes() >= stats[1].getNumBytes() ? 0 : 1;
int desireJoinTaskVolumn = subQuery.getContext().getConf().
- getIntVar(ConfVars.JOIN_TASK_VOLUME);
+ getIntVar(ConfVars.DIST_QUERY_JOIN_TASK_VOLUME);
// calculate the number of tasks according to the data size
int mb = (int) Math.ceil((double)stats[largerIdx].getNumBytes() / 1048576);
|