tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject git commit: TAJO-129: Enable the constructor of NettyServerBase to take a service name. (hyunsik)
Date Fri, 16 Aug 2013 10:49:59 GMT
Updated Branches:
  refs/heads/master f36dbe863 -> 6899815fc


TAJO-129: Enable the constructor of NettyServerBase to take a service name. (hyunsik)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/6899815f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/6899815f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/6899815f

Branch: refs/heads/master
Commit: 6899815fc066793318787c43de960cc52017fcc4
Parents: f36dbe8
Author: Hyunsik Choi <hyunsik@apache.org>
Authored: Thu Aug 15 15:23:32 2013 +0900
Committer: Hyunsik Choi <hyunsik@apache.org>
Committed: Thu Aug 15 15:24:30 2013 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 +
 .../org/apache/tajo/catalog/CatalogServer.java  |  8 +--
 .../org/apache/tajo/catalog/TestCatalog.java    |  1 -
 .../java/org/apache/tajo/conf/TajoConf.java     |  1 +
 .../java/org/apache/tajo/client/TajoClient.java | 27 ++++---
 .../tajo/master/TajoMasterClientService.java    | 10 +--
 .../tajo/master/querymaster/QueryMaster.java    | 11 ++-
 .../querymaster/QueryMasterClientService.java   |  9 +--
 .../querymaster/QueryMasterManagerService.java  | 14 ++--
 .../org/apache/tajo/TajoTestingCluster.java     |  7 +-
 .../org/apache/tajo/rpc/NettyServerBase.java    | 75 +++++++++++++++-----
 .../apache/tajo/rpc/ProtoAsyncRpcClient.java    |  2 +-
 .../apache/tajo/rpc/ProtoAsyncRpcServer.java    |  2 +-
 .../apache/tajo/rpc/ProtoBlockingRpcClient.java |  2 +-
 .../apache/tajo/rpc/ProtoBlockingRpcServer.java |  2 +-
 .../java/org/apache/tajo/util/NetUtils.java     | 40 ++++++++++-
 .../org/apache/tajo/rpc/TestProtoAsyncRpc.java  | 52 +++++---------
 .../apache/tajo/rpc/TestProtoBlockingRpc.java   | 39 +++-------
 18 files changed, 174 insertions(+), 131 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6899815f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0e84b29..45d1f17 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -15,6 +15,9 @@ Release 0.2.0 - unreleased
 
   IMPROVEMENTS
 
+    TAJO-129: Enable the constructor of NettyServerBase to take a service
+    name. (hyunsik)
+
     TAJO-91: Launch QueryMaster on NodeManager per query. 
     (hyoungjunkim via hyunsik)
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6899815f/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
index e82afc8..dd36c3e 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
@@ -128,13 +128,11 @@ public class CatalogServer extends AbstractService {
     // Creation of a HSA will force a resolve.
     InetSocketAddress initIsa = NetUtils.createSocketAddr(serverAddr);
     try {
-      this.rpcServer = new ProtoBlockingRpcServer(
-          CatalogProtocol.class,
-          handler, initIsa);
+      this.rpcServer = new ProtoBlockingRpcServer(CatalogProtocol.class, handler, initIsa);
       this.rpcServer.start();
 
-      this.bindAddress = this.rpcServer.getBindAddress();
-      this.serverName = NetUtils.getIpPortString(bindAddress);
+      this.bindAddress = NetUtils.getConnectAddress(this.rpcServer.getListenAddress());
+      this.serverName = NetUtils.normalizeInetSocketAddress(bindAddress);
       conf.setVar(ConfVars.CATALOG_ADDRESS, serverName);
     } catch (Exception e) {
       LOG.error("Cannot start RPC Server of CatalogServer", e);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6899815f/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
index 1ff7e61..6d08521 100644
--- a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
+++ b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
@@ -78,7 +78,6 @@ public class TestCatalog {
 		assertTrue(catalog.existsTable("getTable"));
 		
 		TableDesc meta2 = catalog.getTableDesc("getTable");
-		System.out.println(meta2);
 		
 		catalog.deleteTable("getTable");
 		assertFalse(catalog.existsTable("getTable"));

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6899815f/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index fb7c268..f4b20ff 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -67,6 +67,7 @@ public class TajoConf extends YarnConfiguration {
     TASKRUNNER_LISTENER_ADDRESS("tajo.master.taskrunnerlistener.addr", "0.0.0.0:0"), // used
internally
     CLIENT_SERVICE_ADDRESS("tajo.master.clientservice.addr", "127.0.0.1:9004"),
     QUERY_MASTER_MANAGER_SERVICE_ADDRESS("tajo.master.querymastermanager.addr", "127.0.0.1:9005"),
+    QUERY_MASTER_CLIENT_SERVICE_ADDRESS("tajo.qmm.client.addr", "0.0.0.0:0"),
 
     //////////////////////////////////
     // Catalog Configuration

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6899815f/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoClient.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoClient.java
b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoClient.java
index 5b4b064..cd8706e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoClient.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoClient.java
@@ -22,7 +22,6 @@ import com.google.protobuf.ServiceException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.net.NetUtils;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.TajoProtos.QueryState;
 import org.apache.tajo.catalog.CatalogUtil;
@@ -31,14 +30,15 @@ import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.engine.query.ResultSetImpl;
-import org.apache.tajo.ipc.TajoMasterClientProtocol;
-import org.apache.tajo.ipc.TajoMasterClientProtocol.*;
-import org.apache.tajo.ipc.QueryMasterClientProtocol;
-import org.apache.tajo.ipc.QueryMasterClientProtocol.*;
 import org.apache.tajo.ipc.ClientProtos;
 import org.apache.tajo.ipc.ClientProtos.*;
+import org.apache.tajo.ipc.QueryMasterClientProtocol;
+import org.apache.tajo.ipc.QueryMasterClientProtocol.QueryMasterClientProtocolService;
+import org.apache.tajo.ipc.TajoMasterClientProtocol;
+import org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProtocolService;
 import org.apache.tajo.rpc.ProtoBlockingRpcClient;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringProto;
+import org.apache.tajo.util.NetUtils;
 import org.apache.tajo.util.TajoIdUtils;
 
 import java.io.IOException;
@@ -90,7 +90,7 @@ public class TajoClient {
     }
 
     LOG.info("connected to tajo cluster (" +
-        org.apache.tajo.util.NetUtils.getIpPortString(addr) + ")");
+        org.apache.tajo.util.NetUtils.normalizeInetSocketAddress(addr) + ")");
   }
 
   public void close() {
@@ -108,10 +108,11 @@ public class TajoClient {
       try {
         queryMasterConnectionMap.get(queryId).killQuery(null, queryId.getProto());
       } catch (Exception e) {
-        LOG.warn("Fail to close query:" + queryId + "," + e.getMessage(), e);
+        LOG.warn("Fail to close a QueryMaster connection (qid=" + queryId + ", msg=" + e.getMessage()
+ ")", e);
       }
       queryMasterClientMap.get(queryId).close();
-      LOG.info("Closed QueryMaster connection(" + queryId + "," + queryMasterClientMap.get(queryId).getRemoteAddress()
+ ")");
+      LOG.info("Closed a QueryMaster connection (qid=" + queryId + ", addr="
+          + queryMasterClientMap.get(queryId).getRemoteAddress() + ")");
       queryMasterClientMap.remove(queryId);
       queryMasterConnectionMap.remove(queryId);
     }
@@ -169,7 +170,6 @@ public class TajoClient {
 
       String queryMasterHost = res.getQueryMasterHost();
       if(queryMasterHost != null && !queryMasterHost.isEmpty()) {
-        LOG.info("=========> connect to querymaster:" + queryMasterHost);
         connectionToQueryMaster(queryId, queryMasterHost, res.getQueryMasterPort());
       }
     }
@@ -185,8 +185,8 @@ public class TajoClient {
       queryMasterConnectionMap.put(queryId, service);
       queryMasterClientMap.put(queryId, client);
 
-      LOG.debug("connected to Query Master (" +
-              org.apache.tajo.util.NetUtils.getIpPortString(addr) + ")");
+      LOG.info("Connected to Query Master (qid=" + queryId + ", addr=" +
+          NetUtils.normalizeInetSocketAddress(addr) + ")");
     } catch (Exception e) {
       LOG.error(e.getMessage());
       throw new RuntimeException(e);
@@ -221,8 +221,7 @@ public class TajoClient {
 
     while(status != null && isQueryRunnning(status.getState())) {
       try {
-//        Thread.sleep(500);
-        Thread.sleep(2000);
+        Thread.sleep(1000);
       } catch (InterruptedException e) {
         e.printStackTrace();
       }
@@ -238,7 +237,7 @@ public class TajoClient {
       }
 
     } else {
-      LOG.warn("=====>Query failed:" + status.getState());
+      LOG.warn("Query " + status.getQueryId() + ") failed: " + status.getState());
 
       //TODO throw SQLException(?)
       return createNullResultSet(queryId);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6899815f/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index 273a3c1..b433718 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -26,7 +26,6 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptIdProto;
 import org.apache.hadoop.yarn.service.AbstractService;
 import org.apache.tajo.QueryId;
@@ -48,6 +47,7 @@ import org.apache.tajo.rpc.ProtoBlockingRpcServer;
 import org.apache.tajo.rpc.RemoteException;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringProto;
+import org.apache.tajo.util.NetUtils;
 import org.apache.tajo.util.TajoIdUtils;
 
 import java.io.IOException;
@@ -88,10 +88,9 @@ public class TajoMasterClientService extends AbstractService {
       LOG.error(e);
     }
     server.start();
-    bindAddress = server.getBindAddress();
-    this.conf.setVar(ConfVars.CLIENT_SERVICE_ADDRESS,
-        org.apache.tajo.util.NetUtils.getIpPortString(bindAddress));
-    LOG.info("Instantiated TajoMasterClientService at " + this.bindAddress);
+    bindAddress = NetUtils.getConnectAddress(server.getListenAddress());
+    this.conf.setVar(ConfVars.CLIENT_SERVICE_ADDRESS, NetUtils.normalizeInetSocketAddress(bindAddress));
+    LOG.info("TajoMasterClientService startup");
     super.start();
   }
 
@@ -100,6 +99,7 @@ public class TajoMasterClientService extends AbstractService {
     if (server != null) {
       server.shutdown();
     }
+    LOG.info("TajoMasterClientService shutdown");
     super.stop();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6899815f/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
index 871ba77..4b15c61 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.Clock;
@@ -68,6 +67,7 @@ import org.apache.tajo.rpc.ProtoBlockingRpcClient;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.storage.StorageManager;
 import org.apache.tajo.storage.StorageUtil;
+import org.apache.tajo.util.NetUtils;
 import org.apache.tajo.util.TajoIdUtils;
 
 import java.io.IOException;
@@ -270,14 +270,13 @@ public class QueryMaster extends CompositeService implements EventHandler
{
         this.rpcServer = new ProtoAsyncRpcServer(QueryMasterProtocol.class, this, initIsa);
         this.rpcServer.start();
 
-        this.bindAddr = rpcServer.getBindAddress();
-        this.addr = bindAddr.getHostName() + ":" + bindAddr.getPort();
+        this.bindAddr = NetUtils.getConnectAddress(rpcServer.getListenAddress());
+        this.addr = NetUtils.normalizeInetSocketAddress(this.bindAddr);
       } catch (Exception e) {
         LOG.error(e.getMessage(), e);
       }
-      // Get the master address
-      LOG.info(QueryMasterService.class.getSimpleName() + " is bind to " + addr);
       queryConf.setVar(TajoConf.ConfVars.TASKRUNNER_LISTENER_ADDRESS, addr);
+      LOG.info("QueryMasterService startup");
     }
 
     @Override
@@ -311,8 +310,8 @@ public class QueryMaster extends CompositeService implements EventHandler
{
       if(clientSessionTimeoutCheckThread != null) {
         clientSessionTimeoutCheckThread.interrupt();
       }
-      LOG.info("QueryMasterService stopped");
       super.stop();
+      LOG.info("QueryMasterService stopped");
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6899815f/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterClientService.java
b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterClientService.java
index 1a326fe..74298e5 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterClientService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterClientService.java
@@ -32,6 +32,7 @@ import org.apache.tajo.ipc.ClientProtos;
 import org.apache.tajo.ipc.QueryMasterClientProtocol;
 import org.apache.tajo.rpc.ProtoBlockingRpcServer;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.util.NetUtils;
 import org.apache.tajo.util.TajoIdUtils;
 
 import java.net.InetAddress;
@@ -67,14 +68,14 @@ public class QueryMasterClientService extends AbstractService {
       this.rpcServer = new ProtoBlockingRpcServer(QueryMasterClientProtocol.class, serviceHandler,
initIsa);
       this.rpcServer.start();
 
-      this.bindAddr = rpcServer.getBindAddress();
-      this.addr = bindAddr.getHostName() + ":" + bindAddr.getPort();
+      this.bindAddr = NetUtils.getConnectAddress(rpcServer.getListenAddress());
+      this.addr = NetUtils.normalizeInetSocketAddress(bindAddr);
     } catch (Exception e) {
       LOG.error(e.getMessage(), e);
     }
     // Get the master address
-    LOG.info(QueryMasterClientService.class.getSimpleName() + " is bind to " + addr);
-    //queryConf.setVar(TajoConf.ConfVars.TASKRUNNER_LISTENER_ADDRESS, addr);
+    LOG.info(QueryMasterClientService.class.getSimpleName() + " (" + queryContext.getQueryId()
+ ") listens on "
+        + addr);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6899815f/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
index a3c7b75..65f237c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
@@ -22,7 +22,6 @@ import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.yarn.service.AbstractService;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.conf.TajoConf;
@@ -32,9 +31,12 @@ import org.apache.tajo.ipc.QueryMasterManagerProtocol.QueryHeartbeatResponse;
 import org.apache.tajo.master.TajoMaster;
 import org.apache.tajo.rpc.ProtoBlockingRpcServer;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;
+import org.apache.tajo.util.NetUtils;
 
 import java.net.InetSocketAddress;
 
+import static org.apache.tajo.conf.TajoConf.ConfVars;
+
 public class QueryMasterManagerService extends AbstractService {
   private final static Log LOG = LogFactory.getLog(QueryMasterManagerService.class);
 
@@ -57,7 +59,7 @@ public class QueryMasterManagerService extends AbstractService {
   @Override
   public void start() {
     // TODO resolve hostname
-    String confMasterServiceAddr = conf.getVar(TajoConf.ConfVars.QUERY_MASTER_MANAGER_SERVICE_ADDRESS);
+    String confMasterServiceAddr = conf.getVar(ConfVars.QUERY_MASTER_MANAGER_SERVICE_ADDRESS);
     InetSocketAddress initIsa = NetUtils.createSocketAddr(confMasterServiceAddr);
     try {
       server = new ProtoBlockingRpcServer(QueryMasterManagerProtocol.class, masterHandler,
initIsa);
@@ -65,10 +67,9 @@ public class QueryMasterManagerService extends AbstractService {
       LOG.error(e);
     }
     server.start();
-    bindAddress = server.getBindAddress();
-    this.conf.setVar(TajoConf.ConfVars.QUERY_MASTER_MANAGER_SERVICE_ADDRESS,
-            org.apache.tajo.util.NetUtils.getIpPortString(bindAddress));
-    LOG.info("Instantiated QueryMasterManagerService at " + this.bindAddress);
+    bindAddress = NetUtils.getConnectAddress(server.getListenAddress());
+    this.conf.setVar(ConfVars.QUERY_MASTER_MANAGER_SERVICE_ADDRESS, NetUtils.normalizeInetSocketAddress(bindAddress));
+    LOG.info("QueryMasterManagerService startup");
     super.start();
   }
 
@@ -78,6 +79,7 @@ public class QueryMasterManagerService extends AbstractService {
       server.shutdown();
       server = null;
     }
+    LOG.info("QueryMasterManagerService shutdown");
     super.stop();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6899815f/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
index 695fcd6..30770cb 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -39,7 +39,6 @@ import org.apache.tajo.master.TajoMaster;
 import org.apache.tajo.util.NetUtils;
 
 import java.io.*;
-import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.URL;
 import java.sql.ResultSet;
@@ -209,7 +208,7 @@ public class TajoTestingCluster {
     catalogServer = new MiniCatalogServer(conf);
     CatalogServer catServer = catalogServer.getCatalogServer();
     InetSocketAddress sockAddr = catServer.getBindAddress();
-    c.setVar(ConfVars.CATALOG_ADDRESS, NetUtils.getIpPortString(sockAddr));
+    c.setVar(ConfVars.CATALOG_ADDRESS, NetUtils.normalizeInetSocketAddress(sockAddr));
 
     return this.catalogServer;
   }
@@ -352,10 +351,10 @@ public class TajoTestingCluster {
       yarnCluster.start();
 
       conf.set(YarnConfiguration.RM_ADDRESS,
-          NetUtils.getIpPortString(yarnCluster.getResourceManager().
+          NetUtils.normalizeInetSocketAddress(yarnCluster.getResourceManager().
               getClientRMService().getBindAddress()));
       conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS,
-          NetUtils.getIpPortString(yarnCluster.getResourceManager().
+          NetUtils.normalizeInetSocketAddress(yarnCluster.getResourceManager().
               getApplicationMasterService().getBindAddress()));
 
       URL url = Thread.currentThread().getContextClassLoader().getResource("yarn-site.xml");

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6899815f/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
index 9d63317..b520b3e 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
@@ -20,6 +20,7 @@ package org.apache.tajo.rpc;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.util.NetUtils;
 import org.jboss.netty.bootstrap.ServerBootstrap;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelFactory;
@@ -32,10 +33,14 @@ import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.util.Random;
 import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class NettyServerBase {
   private static final Log LOG = LogFactory.getLog(NettyServerBase.class);
+  private static final String DEFAULT_PREFIX = "RpcServer_";
+  private static final AtomicInteger sequenceId = new AtomicInteger(0);
 
+  protected String serviceName;
   protected InetSocketAddress serverAddr;
   protected InetSocketAddress bindAddress;
   protected ChannelFactory factory;
@@ -43,17 +48,19 @@ public class NettyServerBase {
   protected ServerBootstrap bootstrap;
   protected Channel channel;
 
-  public NettyServerBase(InetSocketAddress addr) {
-    if (addr.getPort() == 0) {
-      try {
-        int port = getUnusedPort();
-        serverAddr = new InetSocketAddress(addr.getHostName(), port);
-      } catch (IOException e) {
-        LOG.error(e);
-      }
-    } else {
-      serverAddr = addr;
-    }
+  private InetSocketAddress initIsa;
+
+  public NettyServerBase(InetSocketAddress address) {
+    this.initIsa = address;
+  }
+
+  public NettyServerBase(String serviceName, InetSocketAddress addr) {
+    this.serviceName = serviceName;
+    this.initIsa = addr;
+  }
+
+  public void setName(String name) {
+    this.serviceName = name;
   }
 
   public void init(ChannelPipelineFactory pipeline) {
@@ -73,15 +80,30 @@ public class NettyServerBase {
     bootstrap.setOption("child.receiveBufferSize", 1048576 * 2);
   }
 
-  public InetSocketAddress getBindAddress() {
+  public InetSocketAddress getListenAddress() {
     return this.bindAddress;
   }
 
   public void start() {
+    if (serviceName == null) {
+      this.serviceName = getNextDefaultServiceName();
+    }
+
+    if (initIsa.getPort() == 0) {
+      try {
+        int port = getUnusedPort();
+        serverAddr = new InetSocketAddress(initIsa.getHostName(), port);
+      } catch (IOException e) {
+        LOG.error(e);
+      }
+    } else {
+      serverAddr = initIsa;
+    }
+
     this.channel = bootstrap.bind(serverAddr);
     this.bindAddress = (InetSocketAddress) channel.getLocalAddress();
 
-    LOG.info("RpcServer on " + this.bindAddress);
+    LOG.info("Rpc (" + serviceName + ") listens on " + this.bindAddress);
   }
 
   public Channel getChannel() {
@@ -95,16 +117,33 @@ public class NettyServerBase {
     if(factory != null) {
       factory.releaseExternalResources();
     }
-    LOG.info("RpcServer (" + org.apache.tajo.util.NetUtils.getIpPortString(bindAddress)
-        + ") shutdown");
+    LOG.info("Rpc (" + serviceName + ") listened on "
+        + NetUtils.normalizeInetSocketAddress(bindAddress)+ ") shutdown");
+  }
+
+  private static String getNextDefaultServiceName() {
+    return DEFAULT_PREFIX + sequenceId.getAndIncrement();
   }
 
-  private static final Random randomPort = new Random(System.currentTimeMillis());
+  private static final int startPortRange = 10000;
+  private static final int endPortRange = 50000;
+  private static final Random rnd = new Random(System.currentTimeMillis());
+  // each system has a different starting port number within the given range.
+  private static final AtomicInteger nextPortNum =
+      new AtomicInteger(startPortRange+ rnd.nextInt(endPortRange - startPortRange));
+
+
   private synchronized static int getUnusedPort() throws IOException {
     while (true) {
-      int port = randomPort.nextInt(10000) + 50000;
+      int port = nextPortNum.getAndIncrement();
+      if (port >= endPortRange) {
+        synchronized (nextPortNum) {
+          nextPortNum.set(startPortRange);
+          port = nextPortNum.getAndIncrement();
+        }
+      }
       if (available(port)) {
-        LOG.info("Found unused port:" + port);
+        LOG.info("Detect an unused port:" + port);
         return port;
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6899815f/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoAsyncRpcClient.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoAsyncRpcClient.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoAsyncRpcClient.java
index 6afbdf5..c58db58 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoAsyncRpcClient.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoAsyncRpcClient.java
@@ -165,7 +165,7 @@ public class ProtoAsyncRpcClient extends NettyClientBase {
 
   private String getErrorMessage(String message) {
     return "Exception [" + protocol.getCanonicalName() +
-        "(" + NetUtils.getIpPortString((InetSocketAddress)
+        "(" + NetUtils.normalizeInetSocketAddress((InetSocketAddress)
         getChannel().getRemoteAddress()) + ")]: " + message;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6899815f/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoAsyncRpcServer.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoAsyncRpcServer.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoAsyncRpcServer.java
index 46ad761..56faaba 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoAsyncRpcServer.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoAsyncRpcServer.java
@@ -42,7 +42,7 @@ public class ProtoAsyncRpcServer extends NettyServerBase {
                              final Object instance,
                              final InetSocketAddress bindAddress)
       throws Exception {
-    super(bindAddress);
+    super(protocol.getSimpleName(), bindAddress);
 
     String serviceClassName = protocol.getName() + "$" +
         protocol.getSimpleName() + "Service";

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6899815f/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoBlockingRpcClient.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoBlockingRpcClient.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoBlockingRpcClient.java
index 8b61ce4..2018f6d 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoBlockingRpcClient.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoBlockingRpcClient.java
@@ -130,7 +130,7 @@ public class ProtoBlockingRpcClient extends NettyClientBase {
 
   private String getErrorMessage(String message) {
     return "Exception [" + protocol.getCanonicalName() +
-        "(" + NetUtils.getIpPortString((InetSocketAddress)
+        "(" + NetUtils.normalizeInetSocketAddress((InetSocketAddress)
         getChannel().getRemoteAddress()) + ")]: " + message;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6899815f/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoBlockingRpcServer.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoBlockingRpcServer.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoBlockingRpcServer.java
index d4ea8d4..cc47b7b 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoBlockingRpcServer.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoBlockingRpcServer.java
@@ -41,7 +41,7 @@ public class ProtoBlockingRpcServer extends NettyServerBase {
                                 final InetSocketAddress bindAddress)
       throws Exception {
 
-    super(bindAddress);
+    super(protocol.getSimpleName(), bindAddress);
 
     String serviceClassName = protocol.getName() + "$" +
         protocol.getSimpleName() + "Service";

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6899815f/tajo-rpc/src/main/java/org/apache/tajo/util/NetUtils.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/util/NetUtils.java b/tajo-rpc/src/main/java/org/apache/tajo/util/NetUtils.java
index 1c2b13c..f64fa59 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/util/NetUtils.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/util/NetUtils.java
@@ -18,10 +18,12 @@
 
 package org.apache.tajo.util;
 
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
 
 public class NetUtils {
-  public static String getIpPortString(InetSocketAddress addr) {
+  public static String normalizeInetSocketAddress(InetSocketAddress addr) {
     return addr.getAddress().getHostAddress() + ":" + addr.getPort();
   }
 
@@ -29,4 +31,40 @@ public class NetUtils {
     String [] splitted = addr.split(":");
     return new InetSocketAddress(splitted[0], Integer.parseInt(splitted[1]));
   }
+
+  /**
+   * Util method to build socket addr from either:
+   *   <host>
+   *   <host>:<port>
+   *   <fs>://<host>:<port>/<path>
+   */
+  public static InetSocketAddress createSocketAddr(String host, int port) {
+    return new InetSocketAddress(host, port);
+  }
+
+  public static InetSocketAddress createUnresolved(String addr) {
+    String [] splitted = addr.split(":");
+    return InetSocketAddress.createUnresolved(splitted[0], Integer.parseInt(splitted[1]));
+  }
+
+  /**
+   * Returns InetSocketAddress that a client can use to
+   * connect to the server. NettyServerBase.getListenerAddress() is not correct when
+   * the server binds to "0.0.0.0". This returns "hostname:port" of the server,
+   * or "127.0.0.1:port" when the getListenerAddress() returns "0.0.0.0:port".
+   *
+   * @param addr of a listener
+   * @return socket address that a client can use to connect to the server.
+   */
+  public static InetSocketAddress getConnectAddress(InetSocketAddress addr) {
+    if (!addr.isUnresolved() && addr.getAddress().isAnyLocalAddress()) {
+      try {
+        addr = new InetSocketAddress(InetAddress.getLocalHost(), addr.getPort());
+      } catch (UnknownHostException uhe) {
+        // shouldn't get here unless the host doesn't have a loopback iface
+        addr = new InetSocketAddress("127.0.0.1", addr.getPort());
+      }
+    }
+    return addr;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6899815f/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestProtoAsyncRpc.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestProtoAsyncRpc.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestProtoAsyncRpc.java
index cc371c4..69d68b8 100644
--- a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestProtoAsyncRpc.java
+++ b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestProtoAsyncRpc.java
@@ -19,18 +19,17 @@
 package org.apache.tajo.rpc;
 
 import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
 import org.apache.tajo.rpc.test.DummyProtocol;
 import org.apache.tajo.rpc.test.DummyProtocol.DummyProtocolService.Interface;
 import org.apache.tajo.rpc.test.TestProtos.EchoMessage;
 import org.apache.tajo.rpc.test.TestProtos.SumRequest;
 import org.apache.tajo.rpc.test.TestProtos.SumResponse;
 import org.apache.tajo.rpc.test.impl.DummyProtocolAsyncImpl;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
 
 import java.net.InetSocketAddress;
 import java.util.concurrent.CountDownLatch;
@@ -57,16 +56,21 @@ public class TestProtoAsyncRpc {
     server = new ProtoAsyncRpcServer(DummyProtocol.class,
         service, new InetSocketAddress(0));
     server.start();
-    client = new ProtoAsyncRpcClient(DummyProtocol.class, server.getBindAddress());
+    client = new ProtoAsyncRpcClient(DummyProtocol.class, server.getListenAddress());
     stub = client.getStub();
   }
 
   @After
   public void tearDown() throws Exception {
-    client.close();
-    server.shutdown();
+    if(client != null) {
+      client.close();
+    }
+    if(server != null) {
+      server.shutdown();
+    }
   }
 
+  boolean calledMarker = false;
   @Test
   public void testRpc() throws Exception {
 
@@ -87,14 +91,17 @@ public class TestProtoAsyncRpc {
 
     EchoMessage echoMessage = EchoMessage.newBuilder()
         .setMessage(MESSAGE).build();
-
-    stub.echo(null, echoMessage, new RpcCallback<EchoMessage>() {
+    RpcCallback<EchoMessage> callback = new RpcCallback<EchoMessage>() {
       @Override
       public void run(EchoMessage parameter) {
         echo = parameter.getMessage();
         assertEquals(MESSAGE, echo);
+        calledMarker = true;
       }
-    });
+    };
+    stub.echo(null, echoMessage, callback);
+    Thread.sleep(1000);
+    assertTrue(calledMarker);
   }
 
   private CountDownLatch testNullLatch;
@@ -114,31 +121,6 @@ public class TestProtoAsyncRpc {
     assertTrue(service.getNullCalled);
   }
 
-  private CountDownLatch testGetErrorLatch;
-
-  //@Test
-  // TODO - to be fixed
-  public void testGetError() throws Exception {
-    EchoMessage echoMessage2 = EchoMessage.newBuilder()
-        .setMessage("[Don't Worry! It's an exception message for unit test]").
-            build();
-
-    testGetErrorLatch = new CountDownLatch(1);
-    RpcController controller = new NettyRpcController();
-    stub.getError(controller, echoMessage2, new RpcCallback<EchoMessage>() {
-      @Override
-      public void run(EchoMessage parameter) {
-        assertNull(parameter);
-        LOG.info("testGetError retrieved");
-        testGetErrorLatch.countDown();
-      }
-    });
-    testGetErrorLatch.await(1000, TimeUnit.MILLISECONDS);
-    assertTrue(service.getErrorCalled);
-    assertTrue(controller.failed());
-    assertEquals(echoMessage2.getMessage(), controller.errorText());
-  }
-
   @Test
   public void testCallFuture() throws Exception {
     EchoMessage echoMessage = EchoMessage.newBuilder()

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6899815f/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestProtoBlockingRpc.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestProtoBlockingRpc.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestProtoBlockingRpc.java
index eeecc3d..2defc96 100644
--- a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestProtoBlockingRpc.java
+++ b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestProtoBlockingRpc.java
@@ -19,9 +19,7 @@
 package org.apache.tajo.rpc;
 
 import com.google.protobuf.RpcController;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.*;
 import org.apache.tajo.rpc.test.DummyProtocol;
 import org.apache.tajo.rpc.test.DummyProtocol.DummyProtocolService.BlockingInterface;
 import org.apache.tajo.rpc.test.TestProtos.EchoMessage;
@@ -38,13 +36,13 @@ import static org.junit.Assert.*;
 public class TestProtoBlockingRpc {
   public static String MESSAGE = "TestProtoBlockingRpc";
 
-  private static ProtoBlockingRpcServer server;
-  private static ProtoBlockingRpcClient client;
-  private static BlockingInterface stub;
-  private static DummyProtocolBlockingImpl service;
+  private ProtoBlockingRpcServer server;
+  private ProtoBlockingRpcClient client;
+  private BlockingInterface stub;
+  private DummyProtocolBlockingImpl service;
 
-  @BeforeClass
-  public static void setUp() throws Exception {
+  @Before
+  public void setUp() throws Exception {
     service = new DummyProtocolBlockingImpl();
     server = new ProtoBlockingRpcServer(DummyProtocol.class, service,
         new InetSocketAddress(10000));
@@ -55,8 +53,8 @@ public class TestProtoBlockingRpc {
     stub = client.getStub();
   }
 
-  @AfterClass
-  public static void tearDown() throws Exception {
+  @After
+  public void tearDown() throws Exception {
     if(client != null) {
       client.close();
     }
@@ -97,8 +95,6 @@ public class TestProtoBlockingRpc {
               .setMessage(MESSAGE)
               .build();
           stub.deley(null, message);
-//          client.close();
-//          client = null;
         } catch (Exception e) {
           e.printStackTrace();
           error.append(e.getMessage());
@@ -129,29 +125,16 @@ public class TestProtoBlockingRpc {
     };
     shutdownThread.start();
 
-    latch.await(10 * 1000, TimeUnit.MILLISECONDS);
+    latch.await(5 * 1000, TimeUnit.MILLISECONDS);
 
     assertTrue(latch.getCount() == 0);
 
     synchronized(error) {
-      error.wait(10 * 1000);
+      error.wait(5 * 1000);
     }
 
     if(!error.toString().isEmpty()) {
       fail(error.toString());
     }
   }
-
-  //@Test
-  public void testGetError() throws Exception {
-    EchoMessage echoMessage2 = EchoMessage.newBuilder()
-        .setMessage("[Don't Worry! It's an exception message for unit test]").
-            build();
-
-    RpcController controller = new NettyRpcController();
-    assertNull(stub.getError(controller, echoMessage2));
-    assertTrue(service.getErrorCalled);
-    assertTrue(controller.failed());
-    assertEquals(echoMessage2.getMessage(), controller.errorText());
-  }
 }
\ No newline at end of file


Mime
View raw message