From commits-return-5380-archive-asf-public=cust-asf.ponee.io@zeppelin.apache.org Thu Jun 13 07:42:13 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id A291A18064E for ; Thu, 13 Jun 2019 09:42:12 +0200 (CEST) Received: (qmail 77669 invoked by uid 500); 13 Jun 2019 07:42:11 -0000 Mailing-List: contact commits-help@zeppelin.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@zeppelin.apache.org Delivered-To: mailing list commits@zeppelin.apache.org Received: (qmail 77660 invoked by uid 99); 13 Jun 2019 07:42:11 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 13 Jun 2019 07:42:11 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id A522787ACF; Thu, 13 Jun 2019 07:42:11 +0000 (UTC) Date: Thu, 13 Jun 2019 07:42:11 +0000 To: "commits@zeppelin.apache.org" Subject: [zeppelin] branch master updated: [ZEPPELIN-3623] Create interpreter process in the cluster mode MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <156041173130.24011.6092345575319513089@gitbox.apache.org> From: liuxun@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: zeppelin X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: 4ee6a82d11e1cd0d11f74a6dda0676a88fc4a6ed X-Git-Newrev: fd6e80cfc49647ac57ca53e1dff9cdfbae164865 X-Git-Rev: fd6e80cfc49647ac57ca53e1dff9cdfbae164865 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. liuxun pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git The following commit(s) were added to refs/heads/master by this push: new fd6e80c [ZEPPELIN-3623] Create interpreter process in the cluster mode fd6e80c is described below commit fd6e80cfc49647ac57ca53e1dff9cdfbae164865 Author: Xun Liu AuthorDate: Thu Jun 13 11:22:45 2019 +0800 [ZEPPELIN-3623] Create interpreter process in the cluster mode ### What is this PR for? In distributed cluster deployment mode, look for servers with idle resources in the cluster and create an interpreter process. In single-server deployment mode, the interpreter process is created directly on the machine. Just set the cluster server list in `zeppelin-site.xml` ``` zeppelin.cluster.addr 10.120.196.234:6000,10.120.196.235:6000,10.120.196.236:6000 ``` The interpreter process can be created in the cluster server through any zeppelin server. [Cluster Design Document](https://docs.google.com/document/d/1a8QLSyR3M5AhlG1GIYuDTj6bwazeuVDKCRRBm-Qa3Bw/edit#heading=h.s41ckl271z8s) ### What type of PR is it? [Feature] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-3623 ### How should this be tested? * [CI Pass](https://travis-ci.org/liuxunorg/zeppelin/builds/537437337) ### Screenshots (if appropriate) #### The note is executed on the zeppelin-server-234 and the interpreter process is created on the zeppelin-server-236 host. ![ClusterCreateIntpProcess](https://user-images.githubusercontent.com/3677382/58382741-7dc61680-8000-11e9-8379-78b8e6743b63.gif) ### Questions: * Does the licenses files need update? * Is there breaking changes for older versions? * Does this needs documentation? Author: Xun Liu Closes #3372 from liuxunorg/ZEPPELIN-3623 and squashes the following commits: ea4ded954 [Xun Liu] Delete unused cluster thrift interfaces 712bba50f [Xun Liu] Add licese header. 3c8922cb6 [Xun Liu] Get the conf path through the `ZEPPELIN_HOME` or `ZEPPELIN_CONF_DIR` environment variable. b88c09942 [Xun Liu] The ClusterInterpreterLauncherTest is tested, Turn off cluster. 83f540622 [Xun Liu] [ZEPPELIN-3623] Create interpreter process in the cluster mode --- zeppelin-interpreter-api/pom.xml | 6 +- .../apache/zeppelin/cluster/ClusterManager.java | 3 +- .../zeppelin/cluster/ClusterManagerClient.java | 4 +- .../zeppelin/cluster/ClusterManagerServer.java | 228 +++-- .../apache/zeppelin/cluster/ClusterMonitor.java | 16 +- .../zeppelin/cluster/ClusterStateMachine.java | 18 +- .../ClusterEvent.java} | 9 +- .../ClusterEventListener.java} | 13 +- .../apache/zeppelin/cluster/meta/ClusterMeta.java | 14 +- .../zeppelin/cluster/meta/ClusterMetaType.java | 4 +- .../zeppelin/conf/ZeppelinConfiguration.java | 34 +- .../remote/RemoteInterpreterServer.java | 57 +- .../thrift/ClusterIntpProcParameters.java | 914 ------------------- .../interpreter/thrift/ClusterManagerService.java | 995 --------------------- .../src/main/thrift/ClusterManagerService.thrift | 32 - zeppelin-interpreter/src/main/thrift/genthrift.sh | 1 - .../zeppelin/cluster/ClusterMultiNodeTest.java | 159 ++++ ...ManagerTest.java => ClusterSingleNodeTest.java} | 66 +- zeppelin-plugins/launcher/cluster/pom.xml | 90 ++ .../launcher/ClusterInterpreterLauncher.java | 200 +++++ .../launcher/ClusterInterpreterProcess.java | 141 +++ .../launcher/ClusterInterpreterLauncherTest.java | 101 +++ .../interpreter/launcher/ClusterMockTest.java | 79 +- zeppelin-plugins/pom.xml | 1 + .../org/apache/zeppelin/server/ZeppelinServer.java | 18 +- .../zeppelin/interpreter/InterpreterSetting.java | 6 + .../remote/RemoteInterpreterManagedProcess.java | 4 + 27 files changed, 1023 insertions(+), 2190 deletions(-) diff --git a/zeppelin-interpreter-api/pom.xml b/zeppelin-interpreter-api/pom.xml index a43ee89..c1f0ae8 100644 --- a/zeppelin-interpreter-api/pom.xml +++ b/zeppelin-interpreter-api/pom.xml @@ -68,8 +68,6 @@ org.apache.commons:commons-exec log4j:log4j - com.esotericsoftware:kryo - com.esotericsoftware:reflectasm @@ -138,6 +136,10 @@ io ${shaded.dependency.prefix}.io + + com.esotericsoftware + ${shaded.dependency.prefix}.com.esotericsoftware + diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManager.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManager.java index 9c70a2e..2b2cd50 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManager.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManager.java @@ -252,7 +252,7 @@ public abstract class ClusterManager { while (!raftInitialized()) { retry++; if (0 == retry % 30) { - LOGGER.error("Raft incomplete initialization! retry[{}]", retry); + LOGGER.warn("Raft incomplete initialization! retry[{}]", retry); } Thread.sleep(100); } @@ -268,6 +268,7 @@ public abstract class ClusterManager { if (true == success) { // The operation was successfully deleted clusterMetaQueue.remove(metaEntity); + LOGGER.info("Cluster Meta Consume success! {}", metaEntity); } else { LOGGER.error("Cluster Meta Consume faild!"); } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManagerClient.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManagerClient.java index c969bd6..57d51e3 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManagerClient.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManagerClient.java @@ -18,7 +18,7 @@ package org.apache.zeppelin.cluster; import io.atomix.primitive.PrimitiveState; -import static org.apache.zeppelin.cluster.meta.ClusterMetaType.IntpProcessMeta; +import static org.apache.zeppelin.cluster.meta.ClusterMetaType.INTP_PROCESS_META; /** * Cluster management client class instantiated in zeppelin-interperter @@ -63,7 +63,7 @@ public class ClusterManagerClient extends ClusterManager { // Instantiated cluster monitoring class clusterMonitor = new ClusterMonitor(this); - clusterMonitor.start(IntpProcessMeta, metaKey); + clusterMonitor.start(INTP_PROCESS_META, metaKey); } public void shutdown() { diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManagerServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManagerServer.java index 41e670a..eb64393 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManagerServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManagerServer.java @@ -16,6 +16,8 @@ */ package org.apache.zeppelin.cluster; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.MoreExecutors; import io.atomix.cluster.*; import io.atomix.cluster.discovery.BootstrapDiscoveryProvider; import io.atomix.cluster.impl.DefaultClusterMembershipService; @@ -30,20 +32,10 @@ import io.atomix.protocols.raft.storage.RaftStorage; import io.atomix.storage.StorageLevel; import io.atomix.utils.net.Address; import org.apache.commons.lang.StringUtils; -import org.apache.thrift.TException; -import org.apache.thrift.protocol.TBinaryProtocol; -import org.apache.thrift.protocol.TProtocol; -import org.apache.thrift.server.TThreadPoolServer; -import org.apache.thrift.transport.TServerSocket; -import org.apache.thrift.transport.TSocket; -import org.apache.thrift.transport.TTransport; +import org.apache.zeppelin.cluster.event.ClusterEventListener; import org.apache.zeppelin.cluster.meta.ClusterMeta; import org.apache.zeppelin.cluster.protocol.RaftServerMessagingProtocol; -import org.apache.zeppelin.conf.ZeppelinConfiguration; -import org.apache.zeppelin.interpreter.InterpreterFactoryInterface; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils; -import org.apache.zeppelin.interpreter.thrift.ClusterIntpProcParameters; -import org.apache.zeppelin.interpreter.thrift.ClusterManagerService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,20 +48,22 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.SimpleFileVisitor; import java.nio.file.attribute.BasicFileAttributes; +import java.time.Duration; import java.util.*; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.BiFunction; -import static org.apache.zeppelin.cluster.meta.ClusterMetaType.ServerMeta; +import static org.apache.zeppelin.cluster.meta.ClusterMetaType.SERVER_META; /** * Cluster management server class instantiated in zeppelin-server * 1. Create a raft server * 2. Remotely create interpreter's thrift service */ -public class ClusterManagerServer extends ClusterManager - implements ClusterManagerService.Iface { +public class ClusterManagerServer extends ClusterManager { private static Logger LOGGER = LoggerFactory.getLogger(ClusterManagerServer.class); private static ClusterManagerServer instance = null; @@ -79,22 +73,15 @@ public class ClusterManagerServer extends ClusterManager protected MessagingService messagingService = null; - // zeppelin cluster manager thrift service - private TThreadPoolServer clusterManagerTserver = null; - private ClusterManagerService.Processor clusterManagerProcessor = null; - - // Find interpreter by note - private InterpreterFactoryInterface interpreterFactory = null; - // Connect to the interpreter process that has been created public static String CONNET_EXISTING_PROCESS = "CONNET_EXISTING_PROCESS"; + private List clusterEventListeners = new ArrayList<>(); + // zeppelin cluster event + public static String ZEPL_CLUSTER_EVENT_TOPIC = "ZEPL_CLUSTER_EVENT_TOPIC"; + private ClusterManagerServer() { super(); - - clusterManagerProcessor = new ClusterManagerService.Processor<>(this); - - deleteRaftSystemData(); } public static ClusterManagerServer getInstance() { @@ -106,23 +93,46 @@ public class ClusterManagerServer extends ClusterManager } } - public void start(InterpreterFactoryInterface interpreterFactory) { + public void start() { if (!zconf.isClusterMode()) { return; } - this.interpreterFactory = interpreterFactory; - initThread(); // Instantiated raftServer monitoring class String clusterName = getClusterNodeName(); clusterMonitor = new ClusterMonitor(this); - clusterMonitor.start(ServerMeta, clusterName); + clusterMonitor.start(SERVER_META, clusterName); super.start(); } + @VisibleForTesting + public void initTestCluster(String clusterAddrList, String host, int port) { + this.zeplServerHost = host; + this.raftServerPort = port; + + // clear + clusterNodes.clear(); + raftAddressMap.clear(); + clusterMemberIds.clear(); + + String cluster[] = clusterAddrList.split(","); + for (int i = 0; i < cluster.length; i++) { + String[] parts = cluster[i].split(":"); + String clusterHost = parts[0]; + int clusterPort = Integer.valueOf(parts[1]); + + String memberId = clusterHost + ":" + clusterPort; + Address address = Address.from(clusterHost, clusterPort); + Node node = Node.builder().withId(memberId).withAddress(address).build(); + clusterNodes.add(node); + raftAddressMap.put(MemberId.from(memberId), address); + clusterMemberIds.add(MemberId.from(memberId)); + } + } + @Override public boolean raftInitialized() { if (null != raftServer && raftServer.isRunning() @@ -145,32 +155,6 @@ public class ClusterManagerServer extends ClusterManager return true; } - protected void deleteRaftSystemData() { - String zeppelinHome = zconf.getZeppelinHome(); - Path directory = new File(zeppelinHome, ".data").toPath(); - if (Files.exists(directory)) { - try { - Files.walkFileTree(directory, new SimpleFileVisitor() { - @Override - public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) - throws IOException { - Files.delete(file); - return FileVisitResult.CONTINUE; - } - - @Override - public FileVisitResult postVisitDirectory(Path dir, IOException exc) - throws IOException { - Files.delete(dir); - return FileVisitResult.CONTINUE; - } - }); - } catch (IOException e) { - e.printStackTrace(); - } - } - } - private void initThread() { // RaftServer Thread new Thread(new Runnable() { @@ -206,11 +190,15 @@ public class ClusterManagerServer extends ClusterManager bootstrapService, new MembershipConfig()); + File atomixDateDir = com.google.common.io.Files.createTempDir(); + atomixDateDir.deleteOnExit(); + RaftServer.Builder builder = RaftServer.builder(member.id()) .withMembershipService(clusterService) .withProtocol(protocol) .withStorage(RaftStorage.builder() .withStorageLevel(StorageLevel.MEMORY) + .withDirectory(atomixDateDir) .withSerializer(storageSerializer) .withMaxSegmentSize(1024 * 1024) .build()); @@ -218,46 +206,10 @@ public class ClusterManagerServer extends ClusterManager raftServer = builder.build(); raftServer.bootstrap(clusterMemberIds); - LOGGER.info("RaftServer run() <<<"); - } - }).start(); - - // Cluster manager thrift thread - new Thread(new Runnable() { - @Override - public void run() { - LOGGER.info("TServerThread run() >>>"); - - ZeppelinConfiguration zconf = new ZeppelinConfiguration(); - String portRange = zconf.getZeppelinServerRPCPortRange(); - - try { - TServerSocket serverTransport = RemoteInterpreterUtils.createTServerSocket(portRange); - int tserverPort = serverTransport.getServerSocket().getLocalPort(); - - clusterManagerTserver = new TThreadPoolServer( - new TThreadPoolServer.Args(serverTransport).processor(clusterManagerProcessor)); - LOGGER.info("Starting raftServer manager Tserver on port {}", tserverPort); - - String nodeName = getClusterNodeName(); - HashMap meta = new HashMap(); - meta.put(ClusterMeta.NODE_NAME, nodeName); - meta.put(ClusterMeta.SERVER_TSERVER_HOST, zeplServerHost); - meta.put(ClusterMeta.SERVER_TSERVER_PORT, tserverPort); - meta.put(ClusterMeta.SERVER_START_TIME, new Date()); - - putClusterMeta(ServerMeta, nodeName, meta); - } catch (UnknownHostException e) { - LOGGER.error(e.getMessage()); - } catch (SocketException e) { - LOGGER.error(e.getMessage()); - } catch (IOException e) { - LOGGER.error(e.getMessage()); - } - - clusterManagerTserver.serve(); + messagingService.registerHandler(ZEPL_CLUSTER_EVENT_TOPIC, + subscribeClusterEvent, MoreExecutors.directExecutor()); - LOGGER.info("TServerThread run() <<<"); + LOGGER.info("RaftServer run() <<<"); } }).start(); } @@ -270,58 +222,34 @@ public class ClusterManagerServer extends ClusterManager try { // delete local machine meta - deleteClusterMeta(ServerMeta, getClusterNodeName()); + deleteClusterMeta(SERVER_META, getClusterNodeName()); Thread.sleep(300); clusterMonitor.shutdown(); // wait raft commit metadata Thread.sleep(300); } catch (InterruptedException e) { - LOGGER.error(e.getMessage()); + LOGGER.error(e.getMessage(), e); } if (null != raftServer && raftServer.isRunning()) { try { raftServer.shutdown().get(3, TimeUnit.SECONDS); } catch (InterruptedException e) { - LOGGER.error(e.getMessage()); + LOGGER.error(e.getMessage(), e); } catch (ExecutionException e) { - LOGGER.error(e.getMessage()); + LOGGER.error(e.getMessage(), e); } catch (TimeoutException e) { - LOGGER.error(e.getMessage()); + LOGGER.error(e.getMessage(), e); } } - clusterManagerTserver.stop(); - super.shutdown(); } - public boolean openRemoteInterpreterProcess( - String host, int port, final ClusterIntpProcParameters clusterIntpProcParameters) - throws TException { - LOGGER.info("host: {}, port: {}, clusterIntpProcParameters: {}", - host, port, clusterIntpProcParameters); - - try (TTransport transport = new TSocket(host, port)) { - transport.open(); - TProtocol protocol = new TBinaryProtocol(transport); - ClusterManagerService.Client client = new ClusterManagerService.Client(protocol); - - return client.createClusterInterpreterProcess(clusterIntpProcParameters); - } - } - - @Override - public boolean createClusterInterpreterProcess(ClusterIntpProcParameters clusterIntpProcParameters) { - // TODO: ZEPPELIN-3623 - - return true; - } - // Obtain the server node whose resources are idle in the cluster public HashMap getIdleNodeMeta() { HashMap idleNodeMeta = null; - HashMap> clusterMeta = getClusterMeta(ServerMeta, ""); + HashMap> clusterMeta = getClusterMeta(SERVER_META, ""); long memoryIdle = 0; for (Map.Entry> entry : clusterMeta.entrySet()) { @@ -344,4 +272,56 @@ public class ClusterManagerServer extends ClusterManager return idleNodeMeta; } + + public void unicastClusterEvent(String host, int port, String msg) { + LOGGER.info("send unicastClusterEvent message {}", msg); + + Address address = Address.from(host, port); + CompletableFuture response = messagingService.sendAndReceive(address, + ZEPL_CLUSTER_EVENT_TOPIC, msg.getBytes(), Duration.ofSeconds(2)); + response.whenComplete((r, e) -> { + if (null == e) { + LOGGER.error(e.getMessage(), e); + } else { + LOGGER.info("unicastClusterEvent success! {}", msg); + } + }); + } + + public void broadcastClusterEvent(String msg) { + LOGGER.info("send broadcastClusterEvent message {}", msg); + + for (Node node : clusterNodes) { + if (StringUtils.equals(node.address().host(), zeplServerHost) + && node.address().port() == raftServerPort) { + // skip myself + continue; + } + + CompletableFuture response = messagingService.sendAndReceive(node.address(), + ZEPL_CLUSTER_EVENT_TOPIC, msg.getBytes(), Duration.ofSeconds(2)); + response.whenComplete((r, e) -> { + if (null == e) { + LOGGER.error(e.getMessage(), e); + } else { + LOGGER.info("broadcastClusterNoteEvent success! {}", msg); + } + }); + } + } + + private BiFunction subscribeClusterEvent = (address, data) -> { + String message = new String(data); + LOGGER.info("subscribeClusterEvent() {}", message); + + for (ClusterEventListener eventListener : clusterEventListeners) { + eventListener.onClusterEvent(message); + } + + return null; + }; + + public void addClusterEventListeners(ClusterEventListener listener) { + clusterEventListeners.add(listener); + } } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterMonitor.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterMonitor.java index 86fa6a7..4fe8d98 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterMonitor.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterMonitor.java @@ -27,8 +27,8 @@ import java.lang.management.ManagementFactory; import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; -import static org.apache.zeppelin.cluster.meta.ClusterMetaType.IntpProcessMeta; -import static org.apache.zeppelin.cluster.meta.ClusterMetaType.ServerMeta; +import static org.apache.zeppelin.cluster.meta.ClusterMetaType.INTP_PROCESS_META; +import static org.apache.zeppelin.cluster.meta.ClusterMetaType.SERVER_META; /** * cluster monitoring @@ -94,11 +94,11 @@ public class ClusterMonitor { public void run() { while (running.get()) { switch (clusterMetaType) { - case ServerMeta: + case SERVER_META: sendMachineUsage(); checkHealthy(); break; - case IntpProcessMeta: + case INTP_PROCESS_META: sendHeartbeat(); break; default: @@ -136,6 +136,10 @@ public class ClusterMonitor { Map> clusterMeta = clusterManager.getClusterMeta(metaType, ""); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("clusterMeta : {}", clusterMeta); + } + for (Map.Entry> entry : clusterMeta.entrySet()) { String key = entry.getKey(); Map meta = entry.getValue(); @@ -172,7 +176,7 @@ public class ClusterMonitor { mapMonitorUtil.put(ClusterMeta.HEARTBEAT, new Date()); mapMonitorUtil.put(ClusterMeta.STATUS, ClusterMeta.ONLINE_STATUS); - clusterManager.putClusterMeta(IntpProcessMeta, metaKey, mapMonitorUtil); + clusterManager.putClusterMeta(INTP_PROCESS_META, metaKey, mapMonitorUtil); } // send the usage of each service @@ -212,7 +216,7 @@ public class ClusterMonitor { mapMonitorUtil.put(ClusterMeta.STATUS, ClusterMeta.ONLINE_STATUS); String clusterName = clusterManager.getClusterNodeName(); - clusterManager.putClusterMeta(ServerMeta, clusterName, mapMonitorUtil); + clusterManager.putClusterMeta(SERVER_META, clusterName, mapMonitorUtil); } private UsageUtil getMachineUsage() { diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterStateMachine.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterStateMachine.java index 460f6ac..dc07daa 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterStateMachine.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterStateMachine.java @@ -90,12 +90,12 @@ public class ClusterStateMachine extends AbstractPrimitiveService { logger.debug("ClusterStateMachine.backup()"); } - // backup ServerMeta + // backup SERVER_META // cluster meta map struct // cluster_name -> {server_tserver_host,server_tserver_port,cpu_capacity,...} Map> mapServerMeta - = clusterMeta.get(ClusterMetaType.ServerMeta, ""); - // write all ServerMeta size + = clusterMeta.get(ClusterMetaType.SERVER_META, ""); + // write all SERVER_META size writer.writeInt(mapServerMeta.size()); for (Map.Entry> entry : mapServerMeta.entrySet()) { // write cluster_name @@ -111,11 +111,11 @@ public class ClusterStateMachine extends AbstractPrimitiveService { } } - // backup IntpProcessMeta + // backup INTP_PROCESS_META // Interpreter meta map struct // IntpGroupId -> {server_tserver_host,server_tserver_port,...} Map> mapIntpProcMeta - = clusterMeta.get(ClusterMetaType.IntpProcessMeta, ""); + = clusterMeta.get(ClusterMetaType.INTP_PROCESS_META, ""); // write interpreter size writer.writeInt(mapIntpProcMeta.size()); for (Map.Entry> entry : mapIntpProcMeta.entrySet()) { @@ -140,7 +140,7 @@ public class ClusterStateMachine extends AbstractPrimitiveService { } clusterMeta = new ClusterMeta(); - // read all ServerMeta size + // read all SERVER_META size int nServerMeta = reader.readInt(); for (int i = 0; i < nServerMeta; i++) { // read cluster_name @@ -153,12 +153,12 @@ public class ClusterStateMachine extends AbstractPrimitiveService { String key = reader.readString(); Object value = reader.readObject(); - clusterMeta.put(ClusterMetaType.ServerMeta, + clusterMeta.put(ClusterMetaType.SERVER_META, clusterName, Maps.immutableEntry(key, value)); } } - // read all IntpProcessMeta size + // read all INTP_PROCESS_META size int nIntpMeta = reader.readInt(); for (int i = 0; i < nIntpMeta; i++) { // read interpreter name @@ -171,7 +171,7 @@ public class ClusterStateMachine extends AbstractPrimitiveService { String key = reader.readString(); Object value = reader.readObject(); - clusterMeta.put(ClusterMetaType.IntpProcessMeta, + clusterMeta.put(ClusterMetaType.INTP_PROCESS_META, intpName, Maps.immutableEntry(key, value)); } } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaType.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/event/ClusterEvent.java similarity index 86% copy from zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaType.java copy to zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/event/ClusterEvent.java index c6229bd..0e1120c 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaType.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/event/ClusterEvent.java @@ -14,12 +14,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.zeppelin.cluster.meta; +package org.apache.zeppelin.cluster.event; /** - * Type of cluster metadata + * Cluster Event */ -public enum ClusterMetaType { - ServerMeta, - IntpProcessMeta +public enum ClusterEvent { + CREATE_INTP_PROCESS } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaType.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/event/ClusterEventListener.java similarity index 67% copy from zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaType.java copy to zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/event/ClusterEventListener.java index c6229bd..00c0b3d 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaType.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/event/ClusterEventListener.java @@ -14,12 +14,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.zeppelin.cluster.meta; +package org.apache.zeppelin.cluster.event; /** - * Type of cluster metadata + * Listen for the NEW_NOTE、DEL_NOTE、REMOVE_NOTE_TO_TRASH ... event + * of the notebook in the NotebookServer#onMessage() function. */ -public enum ClusterMetaType { - ServerMeta, - IntpProcessMeta +public interface ClusterEventListener { + public static final String CLUSTER_EVENT = "CLUSTER_EVENT"; + public static final String CLUSTER_EVENT_MSG = "CLUSTER_EVENT_MSG"; + + void onClusterEvent(String msg); } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMeta.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMeta.java index a26007c..e862635 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMeta.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMeta.java @@ -37,8 +37,6 @@ public class ClusterMeta implements Serializable { // zeppelin-server meta public static String SERVER_HOST = "SERVER_HOST"; public static String SERVER_PORT = "SERVER_PORT"; - public static String SERVER_TSERVER_HOST = "SERVER_TSERVER_HOST"; - public static String SERVER_TSERVER_PORT = "SERVER_TSERVER_PORT"; public static String SERVER_START_TIME = "SERVER_START_TIME"; // interperter-process meta @@ -73,7 +71,7 @@ public class ClusterMeta implements Serializable { Map mapValue = (Map) value; switch (type) { - case ServerMeta: + case SERVER_META: // Because it may be partially updated metadata information if (mapServerMeta.containsKey(key)) { Map values = mapServerMeta.get(key); @@ -82,7 +80,7 @@ public class ClusterMeta implements Serializable { mapServerMeta.put(key, mapValue); } break; - case IntpProcessMeta: + case INTP_PROCESS_META: if (mapInterpreterMeta.containsKey(key)) { Map values = mapInterpreterMeta.get(key); values.putAll(mapValue); @@ -97,7 +95,7 @@ public class ClusterMeta implements Serializable { Map values = null; switch (type) { - case ServerMeta: + case SERVER_META: if (null == key || StringUtils.isEmpty(key)) { return mapServerMeta; } @@ -107,7 +105,7 @@ public class ClusterMeta implements Serializable { logger.warn("can not find key : {}", key); } break; - case IntpProcessMeta: + case INTP_PROCESS_META: if (null == key || StringUtils.isEmpty(key)) { return mapInterpreterMeta; } @@ -127,14 +125,14 @@ public class ClusterMeta implements Serializable { public Map remove(ClusterMetaType type, String key) { switch (type) { - case ServerMeta: + case SERVER_META: if (mapServerMeta.containsKey(key)) { return mapServerMeta.remove(key); } else { logger.warn("can not find key : {}", key); } break; - case IntpProcessMeta: + case INTP_PROCESS_META: if (mapInterpreterMeta.containsKey(key)) { return mapInterpreterMeta.remove(key); } else { diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaType.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaType.java index c6229bd..cf995f5 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaType.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaType.java @@ -20,6 +20,6 @@ package org.apache.zeppelin.cluster.meta; * Type of cluster metadata */ public enum ClusterMetaType { - ServerMeta, - IntpProcessMeta + SERVER_META, + INTP_PROCESS_META } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index be792c7..d4824dc 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -18,6 +18,7 @@ package org.apache.zeppelin.conf; import java.io.File; +import java.io.IOException; import java.net.URL; import java.util.Arrays; import java.util.HashMap; @@ -28,6 +29,7 @@ import java.util.function.Predicate; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.XMLConfiguration; import org.apache.commons.configuration.tree.ConfigurationNode; +import org.apache.commons.exec.environment.EnvironmentUtils; import org.apache.commons.lang.StringUtils; import org.apache.zeppelin.util.Util; import org.slf4j.Logger; @@ -120,6 +122,37 @@ public class ZeppelinConfiguration extends XMLConfiguration { } if (url == null) { + try { + Map procEnv = EnvironmentUtils.getProcEnvironment(); + if (procEnv.containsKey("ZEPPELIN_HOME")) { + String zconfDir = (String) procEnv.get("ZEPPELIN_HOME"); + File file = new File(zconfDir + File.separator + + "conf" + File.separator + ZEPPELIN_SITE_XML); + if (file.exists()) { + url = file.toURL(); + } + } + } catch (IOException e) { + LOG.error(e.getMessage(), e); + } + } + + if (url == null) { + try { + Map procEnv = EnvironmentUtils.getProcEnvironment(); + if (procEnv.containsKey("ZEPPELIN_CONF_DIR")) { + String zconfDir = (String) procEnv.get("ZEPPELIN_CONF_DIR"); + File file = new File(zconfDir + File.separator + ZEPPELIN_SITE_XML); + if (file.exists()) { + url = file.toURL(); + } + } + } catch (IOException e) { + LOG.error(e.getMessage(), e); + } + } + + if (url == null) { LOG.warn("Failed to load configuration, proceeding with a default"); conf = new ZeppelinConfiguration(); } else { @@ -144,7 +177,6 @@ public class ZeppelinConfiguration extends XMLConfiguration { return conf; } - private String getStringValue(String name, String d) { String value = this.properties.get(name); if (value != null) { diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index 1d4d231..3dc41f4 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -29,6 +29,10 @@ import org.apache.thrift.transport.TServerSocket; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; +import org.apache.zeppelin.cluster.ClusterManagerClient; +import org.apache.zeppelin.cluster.meta.ClusterMeta; +import org.apache.zeppelin.cluster.meta.ClusterMetaType; +import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.dep.DependencyResolver; import org.apache.zeppelin.display.AngularObject; import org.apache.zeppelin.display.AngularObjectRegistry; @@ -85,6 +89,7 @@ import java.net.URL; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collections; +import java.util.Date; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; @@ -132,6 +137,10 @@ public class RemoteInterpreterServer extends Thread private boolean isTest; + // cluster manager client + ClusterManagerClient clusterManagerClient = ClusterManagerClient.getInstance(); + ZeppelinConfiguration zconf = ZeppelinConfiguration.create(); + public RemoteInterpreterServer(String intpEventServerHost, int intpEventServerPort, String interpreterGroupId, @@ -178,6 +187,8 @@ public class RemoteInterpreterServer extends Thread server = new TThreadPoolServer( new TThreadPoolServer.Args(serverTransport).processor(processor)); remoteWorksResponsePool = Collections.synchronizedMap(new HashMap()); + + clusterManagerClient.start(interpreterGroupId); } @Override @@ -196,16 +207,24 @@ public class RemoteInterpreterServer extends Thread } } - if (!interrupted) { - RegisterInfo registerInfo = new RegisterInfo(host, port, interpreterGroupId); - try { - intpEventServiceClient.registerInterpreterProcess(registerInfo); - } catch (TException e) { - logger.error("Error while registering interpreter: {}", registerInfo, e); + if (zconf.isClusterMode()) { + // Cluster mode, discovering interpreter processes through metadata registration + // TODO (Xun): Unified use of cluster metadata for process discovery of all operating modes + // 1. Can optimize the startup logic of the process + // 2. Can solve the problem that running the interpreter's IP in docker may be a virtual IP + putClusterMeta(); + } else { + if (!interrupted) { + RegisterInfo registerInfo = new RegisterInfo(host, port, interpreterGroupId); try { - shutdown(); - } catch (TException e1) { - logger.warn("Exception occurs while shutting down", e1); + intpEventServiceClient.registerInterpreterProcess(registerInfo); + } catch (TException e) { + logger.error("Error while registering interpreter: {}", registerInfo, e); + try { + shutdown(); + } catch (TException e1) { + logger.warn("Exception occurs while shutting down", e1); + } } } } @@ -303,6 +322,26 @@ public class RemoteInterpreterServer extends Thread System.exit(0); } + // Submit interpreter process metadata information to cluster metadata + private void putClusterMeta() { + if (!zconf.isClusterMode()){ + return; + } + String nodeName = clusterManagerClient.getClusterNodeName(); + + // commit interpreter meta + HashMap meta = new HashMap<>(); + meta.put(ClusterMeta.NODE_NAME, nodeName); + meta.put(ClusterMeta.INTP_PROCESS_ID, interpreterGroupId); + meta.put(ClusterMeta.INTP_TSERVER_HOST, host); + meta.put(ClusterMeta.INTP_TSERVER_PORT, port); + meta.put(ClusterMeta.INTP_START_TIME, new Date()); + meta.put(ClusterMeta.HEARTBEAT, new Date()); + meta.put(ClusterMeta.STATUS, ClusterMeta.ONLINE_STATUS); + + clusterManagerClient.putClusterMeta(ClusterMetaType.INTP_PROCESS_META, interpreterGroupId, meta); + } + @Override public void createInterpreter(String interpreterGroupId, String sessionId, String className, Map properties, String userName) throws TException { diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ClusterIntpProcParameters.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ClusterIntpProcParameters.java deleted file mode 100644 index 4cd82f0..0000000 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ClusterIntpProcParameters.java +++ /dev/null @@ -1,914 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Autogenerated by Thrift Compiler (0.12.0) - * - * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING - * @generated - */ -package org.apache.zeppelin.interpreter.thrift; - -@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.12.0)", date = "2019-06-10") -public class ClusterIntpProcParameters implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { - private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("ClusterIntpProcParameters"); - - private static final org.apache.thrift.protocol.TField HOST_FIELD_DESC = new org.apache.thrift.protocol.TField("host", org.apache.thrift.protocol.TType.STRING, (short)1); - private static final org.apache.thrift.protocol.TField PORT_FIELD_DESC = new org.apache.thrift.protocol.TField("port", org.apache.thrift.protocol.TType.I32, (short)2); - private static final org.apache.thrift.protocol.TField USER_NAME_FIELD_DESC = new org.apache.thrift.protocol.TField("userName", org.apache.thrift.protocol.TType.STRING, (short)3); - private static final org.apache.thrift.protocol.TField NOTE_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("noteId", org.apache.thrift.protocol.TType.STRING, (short)4); - private static final org.apache.thrift.protocol.TField REPL_NAME_FIELD_DESC = new org.apache.thrift.protocol.TField("replName", org.apache.thrift.protocol.TType.STRING, (short)5); - private static final org.apache.thrift.protocol.TField DEFAULT_INTERPRETER_SETTING_FIELD_DESC = new org.apache.thrift.protocol.TField("defaultInterpreterSetting", org.apache.thrift.protocol.TType.STRING, (short)6); - - private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new ClusterIntpProcParametersStandardSchemeFactory(); - private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new ClusterIntpProcParametersTupleSchemeFactory(); - - public @org.apache.thrift.annotation.Nullable java.lang.String host; // required - public int port; // required - public @org.apache.thrift.annotation.Nullable java.lang.String userName; // required - public @org.apache.thrift.annotation.Nullable java.lang.String noteId; // required - public @org.apache.thrift.annotation.Nullable java.lang.String replName; // required - public @org.apache.thrift.annotation.Nullable java.lang.String defaultInterpreterSetting; // required - - /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ - public enum _Fields implements org.apache.thrift.TFieldIdEnum { - HOST((short)1, "host"), - PORT((short)2, "port"), - USER_NAME((short)3, "userName"), - NOTE_ID((short)4, "noteId"), - REPL_NAME((short)5, "replName"), - DEFAULT_INTERPRETER_SETTING((short)6, "defaultInterpreterSetting"); - - private static final java.util.Map byName = new java.util.HashMap(); - - static { - for (_Fields field : java.util.EnumSet.allOf(_Fields.class)) { - byName.put(field.getFieldName(), field); - } - } - - /** - * Find the _Fields constant that matches fieldId, or null if its not found. - */ - @org.apache.thrift.annotation.Nullable - public static _Fields findByThriftId(int fieldId) { - switch(fieldId) { - case 1: // HOST - return HOST; - case 2: // PORT - return PORT; - case 3: // USER_NAME - return USER_NAME; - case 4: // NOTE_ID - return NOTE_ID; - case 5: // REPL_NAME - return REPL_NAME; - case 6: // DEFAULT_INTERPRETER_SETTING - return DEFAULT_INTERPRETER_SETTING; - default: - return null; - } - } - - /** - * Find the _Fields constant that matches fieldId, throwing an exception - * if it is not found. - */ - public static _Fields findByThriftIdOrThrow(int fieldId) { - _Fields fields = findByThriftId(fieldId); - if (fields == null) throw new java.lang.IllegalArgumentException("Field " + fieldId + " doesn't exist!"); - return fields; - } - - /** - * Find the _Fields constant that matches name, or null if its not found. - */ - @org.apache.thrift.annotation.Nullable - public static _Fields findByName(java.lang.String name) { - return byName.get(name); - } - - private final short _thriftId; - private final java.lang.String _fieldName; - - _Fields(short thriftId, java.lang.String fieldName) { - _thriftId = thriftId; - _fieldName = fieldName; - } - - public short getThriftFieldId() { - return _thriftId; - } - - public java.lang.String getFieldName() { - return _fieldName; - } - } - - // isset id assignments - private static final int __PORT_ISSET_ID = 0; - private byte __isset_bitfield = 0; - public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; - static { - java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); - tmpMap.put(_Fields.HOST, new org.apache.thrift.meta_data.FieldMetaData("host", org.apache.thrift.TFieldRequirementType.DEFAULT, - new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); - tmpMap.put(_Fields.PORT, new org.apache.thrift.meta_data.FieldMetaData("port", org.apache.thrift.TFieldRequirementType.DEFAULT, - new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32))); - tmpMap.put(_Fields.USER_NAME, new org.apache.thrift.meta_data.FieldMetaData("userName", org.apache.thrift.TFieldRequirementType.DEFAULT, - new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); - tmpMap.put(_Fields.NOTE_ID, new org.apache.thrift.meta_data.FieldMetaData("noteId", org.apache.thrift.TFieldRequirementType.DEFAULT, - new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); - tmpMap.put(_Fields.REPL_NAME, new org.apache.thrift.meta_data.FieldMetaData("replName", org.apache.thrift.TFieldRequirementType.DEFAULT, - new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); - tmpMap.put(_Fields.DEFAULT_INTERPRETER_SETTING, new org.apache.thrift.meta_data.FieldMetaData("defaultInterpreterSetting", org.apache.thrift.TFieldRequirementType.DEFAULT, - new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); - metaDataMap = java.util.Collections.unmodifiableMap(tmpMap); - org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(ClusterIntpProcParameters.class, metaDataMap); - } - - public ClusterIntpProcParameters() { - } - - public ClusterIntpProcParameters( - java.lang.String host, - int port, - java.lang.String userName, - java.lang.String noteId, - java.lang.String replName, - java.lang.String defaultInterpreterSetting) - { - this(); - this.host = host; - this.port = port; - setPortIsSet(true); - this.userName = userName; - this.noteId = noteId; - this.replName = replName; - this.defaultInterpreterSetting = defaultInterpreterSetting; - } - - /** - * Performs a deep copy on other. - */ - public ClusterIntpProcParameters(ClusterIntpProcParameters other) { - __isset_bitfield = other.__isset_bitfield; - if (other.isSetHost()) { - this.host = other.host; - } - this.port = other.port; - if (other.isSetUserName()) { - this.userName = other.userName; - } - if (other.isSetNoteId()) { - this.noteId = other.noteId; - } - if (other.isSetReplName()) { - this.replName = other.replName; - } - if (other.isSetDefaultInterpreterSetting()) { - this.defaultInterpreterSetting = other.defaultInterpreterSetting; - } - } - - public ClusterIntpProcParameters deepCopy() { - return new ClusterIntpProcParameters(this); - } - - @Override - public void clear() { - this.host = null; - setPortIsSet(false); - this.port = 0; - this.userName = null; - this.noteId = null; - this.replName = null; - this.defaultInterpreterSetting = null; - } - - @org.apache.thrift.annotation.Nullable - public java.lang.String getHost() { - return this.host; - } - - public ClusterIntpProcParameters setHost(@org.apache.thrift.annotation.Nullable java.lang.String host) { - this.host = host; - return this; - } - - public void unsetHost() { - this.host = null; - } - - /** Returns true if field host is set (has been assigned a value) and false otherwise */ - public boolean isSetHost() { - return this.host != null; - } - - public void setHostIsSet(boolean value) { - if (!value) { - this.host = null; - } - } - - public int getPort() { - return this.port; - } - - public ClusterIntpProcParameters setPort(int port) { - this.port = port; - setPortIsSet(true); - return this; - } - - public void unsetPort() { - __isset_bitfield = org.apache.thrift.EncodingUtils.clearBit(__isset_bitfield, __PORT_ISSET_ID); - } - - /** Returns true if field port is set (has been assigned a value) and false otherwise */ - public boolean isSetPort() { - return org.apache.thrift.EncodingUtils.testBit(__isset_bitfield, __PORT_ISSET_ID); - } - - public void setPortIsSet(boolean value) { - __isset_bitfield = org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __PORT_ISSET_ID, value); - } - - @org.apache.thrift.annotation.Nullable - public java.lang.String getUserName() { - return this.userName; - } - - public ClusterIntpProcParameters setUserName(@org.apache.thrift.annotation.Nullable java.lang.String userName) { - this.userName = userName; - return this; - } - - public void unsetUserName() { - this.userName = null; - } - - /** Returns true if field userName is set (has been assigned a value) and false otherwise */ - public boolean isSetUserName() { - return this.userName != null; - } - - public void setUserNameIsSet(boolean value) { - if (!value) { - this.userName = null; - } - } - - @org.apache.thrift.annotation.Nullable - public java.lang.String getNoteId() { - return this.noteId; - } - - public ClusterIntpProcParameters setNoteId(@org.apache.thrift.annotation.Nullable java.lang.String noteId) { - this.noteId = noteId; - return this; - } - - public void unsetNoteId() { - this.noteId = null; - } - - /** Returns true if field noteId is set (has been assigned a value) and false otherwise */ - public boolean isSetNoteId() { - return this.noteId != null; - } - - public void setNoteIdIsSet(boolean value) { - if (!value) { - this.noteId = null; - } - } - - @org.apache.thrift.annotation.Nullable - public java.lang.String getReplName() { - return this.replName; - } - - public ClusterIntpProcParameters setReplName(@org.apache.thrift.annotation.Nullable java.lang.String replName) { - this.replName = replName; - return this; - } - - public void unsetReplName() { - this.replName = null; - } - - /** Returns true if field replName is set (has been assigned a value) and false otherwise */ - public boolean isSetReplName() { - return this.replName != null; - } - - public void setReplNameIsSet(boolean value) { - if (!value) { - this.replName = null; - } - } - - @org.apache.thrift.annotation.Nullable - public java.lang.String getDefaultInterpreterSetting() { - return this.defaultInterpreterSetting; - } - - public ClusterIntpProcParameters setDefaultInterpreterSetting(@org.apache.thrift.annotation.Nullable java.lang.String defaultInterpreterSetting) { - this.defaultInterpreterSetting = defaultInterpreterSetting; - return this; - } - - public void unsetDefaultInterpreterSetting() { - this.defaultInterpreterSetting = null; - } - - /** Returns true if field defaultInterpreterSetting is set (has been assigned a value) and false otherwise */ - public boolean isSetDefaultInterpreterSetting() { - return this.defaultInterpreterSetting != null; - } - - public void setDefaultInterpreterSettingIsSet(boolean value) { - if (!value) { - this.defaultInterpreterSetting = null; - } - } - - public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) { - switch (field) { - case HOST: - if (value == null) { - unsetHost(); - } else { - setHost((java.lang.String)value); - } - break; - - case PORT: - if (value == null) { - unsetPort(); - } else { - setPort((java.lang.Integer)value); - } - break; - - case USER_NAME: - if (value == null) { - unsetUserName(); - } else { - setUserName((java.lang.String)value); - } - break; - - case NOTE_ID: - if (value == null) { - unsetNoteId(); - } else { - setNoteId((java.lang.String)value); - } - break; - - case REPL_NAME: - if (value == null) { - unsetReplName(); - } else { - setReplName((java.lang.String)value); - } - break; - - case DEFAULT_INTERPRETER_SETTING: - if (value == null) { - unsetDefaultInterpreterSetting(); - } else { - setDefaultInterpreterSetting((java.lang.String)value); - } - break; - - } - } - - @org.apache.thrift.annotation.Nullable - public java.lang.Object getFieldValue(_Fields field) { - switch (field) { - case HOST: - return getHost(); - - case PORT: - return getPort(); - - case USER_NAME: - return getUserName(); - - case NOTE_ID: - return getNoteId(); - - case REPL_NAME: - return getReplName(); - - case DEFAULT_INTERPRETER_SETTING: - return getDefaultInterpreterSetting(); - - } - throw new java.lang.IllegalStateException(); - } - - /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */ - public boolean isSet(_Fields field) { - if (field == null) { - throw new java.lang.IllegalArgumentException(); - } - - switch (field) { - case HOST: - return isSetHost(); - case PORT: - return isSetPort(); - case USER_NAME: - return isSetUserName(); - case NOTE_ID: - return isSetNoteId(); - case REPL_NAME: - return isSetReplName(); - case DEFAULT_INTERPRETER_SETTING: - return isSetDefaultInterpreterSetting(); - } - throw new java.lang.IllegalStateException(); - } - - @Override - public boolean equals(java.lang.Object that) { - if (that == null) - return false; - if (that instanceof ClusterIntpProcParameters) - return this.equals((ClusterIntpProcParameters)that); - return false; - } - - public boolean equals(ClusterIntpProcParameters that) { - if (that == null) - return false; - if (this == that) - return true; - - boolean this_present_host = true && this.isSetHost(); - boolean that_present_host = true && that.isSetHost(); - if (this_present_host || that_present_host) { - if (!(this_present_host && that_present_host)) - return false; - if (!this.host.equals(that.host)) - return false; - } - - boolean this_present_port = true; - boolean that_present_port = true; - if (this_present_port || that_present_port) { - if (!(this_present_port && that_present_port)) - return false; - if (this.port != that.port) - return false; - } - - boolean this_present_userName = true && this.isSetUserName(); - boolean that_present_userName = true && that.isSetUserName(); - if (this_present_userName || that_present_userName) { - if (!(this_present_userName && that_present_userName)) - return false; - if (!this.userName.equals(that.userName)) - return false; - } - - boolean this_present_noteId = true && this.isSetNoteId(); - boolean that_present_noteId = true && that.isSetNoteId(); - if (this_present_noteId || that_present_noteId) { - if (!(this_present_noteId && that_present_noteId)) - return false; - if (!this.noteId.equals(that.noteId)) - return false; - } - - boolean this_present_replName = true && this.isSetReplName(); - boolean that_present_replName = true && that.isSetReplName(); - if (this_present_replName || that_present_replName) { - if (!(this_present_replName && that_present_replName)) - return false; - if (!this.replName.equals(that.replName)) - return false; - } - - boolean this_present_defaultInterpreterSetting = true && this.isSetDefaultInterpreterSetting(); - boolean that_present_defaultInterpreterSetting = true && that.isSetDefaultInterpreterSetting(); - if (this_present_defaultInterpreterSetting || that_present_defaultInterpreterSetting) { - if (!(this_present_defaultInterpreterSetting && that_present_defaultInterpreterSetting)) - return false; - if (!this.defaultInterpreterSetting.equals(that.defaultInterpreterSetting)) - return false; - } - - return true; - } - - @Override - public int hashCode() { - int hashCode = 1; - - hashCode = hashCode * 8191 + ((isSetHost()) ? 131071 : 524287); - if (isSetHost()) - hashCode = hashCode * 8191 + host.hashCode(); - - hashCode = hashCode * 8191 + port; - - hashCode = hashCode * 8191 + ((isSetUserName()) ? 131071 : 524287); - if (isSetUserName()) - hashCode = hashCode * 8191 + userName.hashCode(); - - hashCode = hashCode * 8191 + ((isSetNoteId()) ? 131071 : 524287); - if (isSetNoteId()) - hashCode = hashCode * 8191 + noteId.hashCode(); - - hashCode = hashCode * 8191 + ((isSetReplName()) ? 131071 : 524287); - if (isSetReplName()) - hashCode = hashCode * 8191 + replName.hashCode(); - - hashCode = hashCode * 8191 + ((isSetDefaultInterpreterSetting()) ? 131071 : 524287); - if (isSetDefaultInterpreterSetting()) - hashCode = hashCode * 8191 + defaultInterpreterSetting.hashCode(); - - return hashCode; - } - - @Override - public int compareTo(ClusterIntpProcParameters other) { - if (!getClass().equals(other.getClass())) { - return getClass().getName().compareTo(other.getClass().getName()); - } - - int lastComparison = 0; - - lastComparison = java.lang.Boolean.valueOf(isSetHost()).compareTo(other.isSetHost()); - if (lastComparison != 0) { - return lastComparison; - } - if (isSetHost()) { - lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.host, other.host); - if (lastComparison != 0) { - return lastComparison; - } - } - lastComparison = java.lang.Boolean.valueOf(isSetPort()).compareTo(other.isSetPort()); - if (lastComparison != 0) { - return lastComparison; - } - if (isSetPort()) { - lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.port, other.port); - if (lastComparison != 0) { - return lastComparison; - } - } - lastComparison = java.lang.Boolean.valueOf(isSetUserName()).compareTo(other.isSetUserName()); - if (lastComparison != 0) { - return lastComparison; - } - if (isSetUserName()) { - lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.userName, other.userName); - if (lastComparison != 0) { - return lastComparison; - } - } - lastComparison = java.lang.Boolean.valueOf(isSetNoteId()).compareTo(other.isSetNoteId()); - if (lastComparison != 0) { - return lastComparison; - } - if (isSetNoteId()) { - lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.noteId, other.noteId); - if (lastComparison != 0) { - return lastComparison; - } - } - lastComparison = java.lang.Boolean.valueOf(isSetReplName()).compareTo(other.isSetReplName()); - if (lastComparison != 0) { - return lastComparison; - } - if (isSetReplName()) { - lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.replName, other.replName); - if (lastComparison != 0) { - return lastComparison; - } - } - lastComparison = java.lang.Boolean.valueOf(isSetDefaultInterpreterSetting()).compareTo(other.isSetDefaultInterpreterSetting()); - if (lastComparison != 0) { - return lastComparison; - } - if (isSetDefaultInterpreterSetting()) { - lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.defaultInterpreterSetting, other.defaultInterpreterSetting); - if (lastComparison != 0) { - return lastComparison; - } - } - return 0; - } - - @org.apache.thrift.annotation.Nullable - public _Fields fieldForId(int fieldId) { - return _Fields.findByThriftId(fieldId); - } - - public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException { - scheme(iprot).read(iprot, this); - } - - public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { - scheme(oprot).write(oprot, this); - } - - @Override - public java.lang.String toString() { - java.lang.StringBuilder sb = new java.lang.StringBuilder("ClusterIntpProcParameters("); - boolean first = true; - - sb.append("host:"); - if (this.host == null) { - sb.append("null"); - } else { - sb.append(this.host); - } - first = false; - if (!first) sb.append(", "); - sb.append("port:"); - sb.append(this.port); - first = false; - if (!first) sb.append(", "); - sb.append("userName:"); - if (this.userName == null) { - sb.append("null"); - } else { - sb.append(this.userName); - } - first = false; - if (!first) sb.append(", "); - sb.append("noteId:"); - if (this.noteId == null) { - sb.append("null"); - } else { - sb.append(this.noteId); - } - first = false; - if (!first) sb.append(", "); - sb.append("replName:"); - if (this.replName == null) { - sb.append("null"); - } else { - sb.append(this.replName); - } - first = false; - if (!first) sb.append(", "); - sb.append("defaultInterpreterSetting:"); - if (this.defaultInterpreterSetting == null) { - sb.append("null"); - } else { - sb.append(this.defaultInterpreterSetting); - } - first = false; - sb.append(")"); - return sb.toString(); - } - - public void validate() throws org.apache.thrift.TException { - // check for required fields - // check for sub-struct validity - } - - private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { - try { - write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out))); - } catch (org.apache.thrift.TException te) { - throw new java.io.IOException(te); - } - } - - private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, java.lang.ClassNotFoundException { - try { - // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor. - __isset_bitfield = 0; - read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); - } catch (org.apache.thrift.TException te) { - throw new java.io.IOException(te); - } - } - - private static class ClusterIntpProcParametersStandardSchemeFactory implements org.apache.thrift.scheme.SchemeFactory { - public ClusterIntpProcParametersStandardScheme getScheme() { - return new ClusterIntpProcParametersStandardScheme(); - } - } - - private static class ClusterIntpProcParametersStandardScheme extends org.apache.thrift.scheme.StandardScheme { - - public void read(org.apache.thrift.protocol.TProtocol iprot, ClusterIntpProcParameters struct) throws org.apache.thrift.TException { - org.apache.thrift.protocol.TField schemeField; - iprot.readStructBegin(); - while (true) - { - schemeField = iprot.readFieldBegin(); - if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { - break; - } - switch (schemeField.id) { - case 1: // HOST - if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { - struct.host = iprot.readString(); - struct.setHostIsSet(true); - } else { - org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); - } - break; - case 2: // PORT - if (schemeField.type == org.apache.thrift.protocol.TType.I32) { - struct.port = iprot.readI32(); - struct.setPortIsSet(true); - } else { - org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); - } - break; - case 3: // USER_NAME - if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { - struct.userName = iprot.readString(); - struct.setUserNameIsSet(true); - } else { - org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); - } - break; - case 4: // NOTE_ID - if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { - struct.noteId = iprot.readString(); - struct.setNoteIdIsSet(true); - } else { - org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); - } - break; - case 5: // REPL_NAME - if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { - struct.replName = iprot.readString(); - struct.setReplNameIsSet(true); - } else { - org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); - } - break; - case 6: // DEFAULT_INTERPRETER_SETTING - if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { - struct.defaultInterpreterSetting = iprot.readString(); - struct.setDefaultInterpreterSettingIsSet(true); - } else { - org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); - } - break; - default: - org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); - } - iprot.readFieldEnd(); - } - iprot.readStructEnd(); - - // check for required fields of primitive type, which can't be checked in the validate method - struct.validate(); - } - - public void write(org.apache.thrift.protocol.TProtocol oprot, ClusterIntpProcParameters struct) throws org.apache.thrift.TException { - struct.validate(); - - oprot.writeStructBegin(STRUCT_DESC); - if (struct.host != null) { - oprot.writeFieldBegin(HOST_FIELD_DESC); - oprot.writeString(struct.host); - oprot.writeFieldEnd(); - } - oprot.writeFieldBegin(PORT_FIELD_DESC); - oprot.writeI32(struct.port); - oprot.writeFieldEnd(); - if (struct.userName != null) { - oprot.writeFieldBegin(USER_NAME_FIELD_DESC); - oprot.writeString(struct.userName); - oprot.writeFieldEnd(); - } - if (struct.noteId != null) { - oprot.writeFieldBegin(NOTE_ID_FIELD_DESC); - oprot.writeString(struct.noteId); - oprot.writeFieldEnd(); - } - if (struct.replName != null) { - oprot.writeFieldBegin(REPL_NAME_FIELD_DESC); - oprot.writeString(struct.replName); - oprot.writeFieldEnd(); - } - if (struct.defaultInterpreterSetting != null) { - oprot.writeFieldBegin(DEFAULT_INTERPRETER_SETTING_FIELD_DESC); - oprot.writeString(struct.defaultInterpreterSetting); - oprot.writeFieldEnd(); - } - oprot.writeFieldStop(); - oprot.writeStructEnd(); - } - - } - - private static class ClusterIntpProcParametersTupleSchemeFactory implements org.apache.thrift.scheme.SchemeFactory { - public ClusterIntpProcParametersTupleScheme getScheme() { - return new ClusterIntpProcParametersTupleScheme(); - } - } - - private static class ClusterIntpProcParametersTupleScheme extends org.apache.thrift.scheme.TupleScheme { - - @Override - public void write(org.apache.thrift.protocol.TProtocol prot, ClusterIntpProcParameters struct) throws org.apache.thrift.TException { - org.apache.thrift.protocol.TTupleProtocol oprot = (org.apache.thrift.protocol.TTupleProtocol) prot; - java.util.BitSet optionals = new java.util.BitSet(); - if (struct.isSetHost()) { - optionals.set(0); - } - if (struct.isSetPort()) { - optionals.set(1); - } - if (struct.isSetUserName()) { - optionals.set(2); - } - if (struct.isSetNoteId()) { - optionals.set(3); - } - if (struct.isSetReplName()) { - optionals.set(4); - } - if (struct.isSetDefaultInterpreterSetting()) { - optionals.set(5); - } - oprot.writeBitSet(optionals, 6); - if (struct.isSetHost()) { - oprot.writeString(struct.host); - } - if (struct.isSetPort()) { - oprot.writeI32(struct.port); - } - if (struct.isSetUserName()) { - oprot.writeString(struct.userName); - } - if (struct.isSetNoteId()) { - oprot.writeString(struct.noteId); - } - if (struct.isSetReplName()) { - oprot.writeString(struct.replName); - } - if (struct.isSetDefaultInterpreterSetting()) { - oprot.writeString(struct.defaultInterpreterSetting); - } - } - - @Override - public void read(org.apache.thrift.protocol.TProtocol prot, ClusterIntpProcParameters struct) throws org.apache.thrift.TException { - org.apache.thrift.protocol.TTupleProtocol iprot = (org.apache.thrift.protocol.TTupleProtocol) prot; - java.util.BitSet incoming = iprot.readBitSet(6); - if (incoming.get(0)) { - struct.host = iprot.readString(); - struct.setHostIsSet(true); - } - if (incoming.get(1)) { - struct.port = iprot.readI32(); - struct.setPortIsSet(true); - } - if (incoming.get(2)) { - struct.userName = iprot.readString(); - struct.setUserNameIsSet(true); - } - if (incoming.get(3)) { - struct.noteId = iprot.readString(); - struct.setNoteIdIsSet(true); - } - if (incoming.get(4)) { - struct.replName = iprot.readString(); - struct.setReplNameIsSet(true); - } - if (incoming.get(5)) { - struct.defaultInterpreterSetting = iprot.readString(); - struct.setDefaultInterpreterSettingIsSet(true); - } - } - } - - private static S scheme(org.apache.thrift.protocol.TProtocol proto) { - return (org.apache.thrift.scheme.StandardScheme.class.equals(proto.getScheme()) ? STANDARD_SCHEME_FACTORY : TUPLE_SCHEME_FACTORY).getScheme(); - } -} - diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ClusterManagerService.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ClusterManagerService.java deleted file mode 100644 index 5c46b0e..0000000 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ClusterManagerService.java +++ /dev/null @@ -1,995 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Autogenerated by Thrift Compiler (0.12.0) - * - * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING - * @generated - */ -package org.apache.zeppelin.interpreter.thrift; - -@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.12.0)", date = "2019-06-10") -public class ClusterManagerService { - - public interface Iface { - - public boolean createClusterInterpreterProcess(ClusterIntpProcParameters intpProcParameters) throws org.apache.thrift.TException; - - } - - public interface AsyncIface { - - public void createClusterInterpreterProcess(ClusterIntpProcParameters intpProcParameters, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException; - - } - - public static class Client extends org.apache.thrift.TServiceClient implements Iface { - public static class Factory implements org.apache.thrift.TServiceClientFactory { - public Factory() {} - public Client getClient(org.apache.thrift.protocol.TProtocol prot) { - return new Client(prot); - } - public Client getClient(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) { - return new Client(iprot, oprot); - } - } - - public Client(org.apache.thrift.protocol.TProtocol prot) - { - super(prot, prot); - } - - public Client(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) { - super(iprot, oprot); - } - - public boolean createClusterInterpreterProcess(ClusterIntpProcParameters intpProcParameters) throws org.apache.thrift.TException - { - send_createClusterInterpreterProcess(intpProcParameters); - return recv_createClusterInterpreterProcess(); - } - - public void send_createClusterInterpreterProcess(ClusterIntpProcParameters intpProcParameters) throws org.apache.thrift.TException - { - createClusterInterpreterProcess_args args = new createClusterInterpreterProcess_args(); - args.setIntpProcParameters(intpProcParameters); - sendBase("createClusterInterpreterProcess", args); - } - - public boolean recv_createClusterInterpreterProcess() throws org.apache.thrift.TException - { - createClusterInterpreterProcess_result result = new createClusterInterpreterProcess_result(); - receiveBase(result, "createClusterInterpreterProcess"); - if (result.isSetSuccess()) { - return result.success; - } - throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "createClusterInterpreterProcess failed: unknown result"); - } - - } - public static class AsyncClient extends org.apache.thrift.async.TAsyncClient implements AsyncIface { - public static class Factory implements org.apache.thrift.async.TAsyncClientFactory { - private org.apache.thrift.async.TAsyncClientManager clientManager; - private org.apache.thrift.protocol.TProtocolFactory protocolFactory; - public Factory(org.apache.thrift.async.TAsyncClientManager clientManager, org.apache.thrift.protocol.TProtocolFactory protocolFactory) { - this.clientManager = clientManager; - this.protocolFactory = protocolFactory; - } - public AsyncClient getAsyncClient(org.apache.thrift.transport.TNonblockingTransport transport) { - return new AsyncClient(protocolFactory, clientManager, transport); - } - } - - public AsyncClient(org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.async.TAsyncClientManager clientManager, org.apache.thrift.transport.TNonblockingTransport transport) { - super(protocolFactory, clientManager, transport); - } - - public void createClusterInterpreterProcess(ClusterIntpProcParameters intpProcParameters, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException { - checkReady(); - createClusterInterpreterProcess_call method_call = new createClusterInterpreterProcess_call(intpProcParameters, resultHandler, this, ___protocolFactory, ___transport); - this.___currentMethod = method_call; - ___manager.call(method_call); - } - - public static class createClusterInterpreterProcess_call extends org.apache.thrift.async.TAsyncMethodCall { - private ClusterIntpProcParameters intpProcParameters; - public createClusterInterpreterProcess_call(ClusterIntpProcParameters intpProcParameters, org.apache.thrift.async.AsyncMethodCallback resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException { - super(client, protocolFactory, transport, resultHandler, false); - this.intpProcParameters = intpProcParameters; - } - - public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException { - prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("createClusterInterpreterProcess", org.apache.thrift.protocol.TMessageType.CALL, 0)); - createClusterInterpreterProcess_args args = new createClusterInterpreterProcess_args(); - args.setIntpProcParameters(intpProcParameters); - args.write(prot); - prot.writeMessageEnd(); - } - - public java.lang.Boolean getResult() throws org.apache.thrift.TException { - if (getState() != org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) { - throw new java.lang.IllegalStateException("Method call not finished!"); - } - org.apache.thrift.transport.TMemoryInputTransport memoryTransport = new org.apache.thrift.transport.TMemoryInputTransport(getFrameBuffer().array()); - org.apache.thrift.protocol.TProtocol prot = client.getProtocolFactory().getProtocol(memoryTransport); - return (new Client(prot)).recv_createClusterInterpreterProcess(); - } - } - - } - - public static class Processor extends org.apache.thrift.TBaseProcessor implements org.apache.thrift.TProcessor { - private static final org.slf4j.Logger _LOGGER = org.slf4j.LoggerFactory.getLogger(Processor.class.getName()); - public Processor(I iface) { - super(iface, getProcessMap(new java.util.HashMap>())); - } - - protected Processor(I iface, java.util.Map> processMap) { - super(iface, getProcessMap(processMap)); - } - - private static java.util.Map> getProcessMap(java.util.Map> processMap) { - processMap.put("createClusterInterpreterProcess", new createClusterInterpreterProcess()); - return processMap; - } - - public static class createClusterInterpreterProcess extends org.apache.thrift.ProcessFunction { - public createClusterInterpreterProcess() { - super("createClusterInterpreterProcess"); - } - - public createClusterInterpreterProcess_args getEmptyArgsInstance() { - return new createClusterInterpreterProcess_args(); - } - - protected boolean isOneway() { - return false; - } - - @Override - protected boolean rethrowUnhandledExceptions() { - return false; - } - - public createClusterInterpreterProcess_result getResult(I iface, createClusterInterpreterProcess_args args) throws org.apache.thrift.TException { - createClusterInterpreterProcess_result result = new createClusterInterpreterProcess_result(); - result.success = iface.createClusterInterpreterProcess(args.intpProcParameters); - result.setSuccessIsSet(true); - return result; - } - } - - } - - public static class AsyncProcessor extends org.apache.thrift.TBaseAsyncProcessor { - private static final org.slf4j.Logger _LOGGER = org.slf4j.LoggerFactory.getLogger(AsyncProcessor.class.getName()); - public AsyncProcessor(I iface) { - super(iface, getProcessMap(new java.util.HashMap>())); - } - - protected AsyncProcessor(I iface, java.util.Map> processMap) { - super(iface, getProcessMap(processMap)); - } - - private static java.util.Map> getProcessMap(java.util.Map> processMap) { - processMap.put("createClusterInterpreterProcess", new createClusterInterpreterProcess()); - return processMap; - } - - public static class createClusterInterpreterProcess extends org.apache.thrift.AsyncProcessFunction { - public createClusterInterpreterProcess() { - super("createClusterInterpreterProcess"); - } - - public createClusterInterpreterProcess_args getEmptyArgsInstance() { - return new createClusterInterpreterProcess_args(); - } - - public org.apache.thrift.async.AsyncMethodCallback getResultHandler(final org.apache.thrift.server.AbstractNonblockingServer.AsyncFrameBuffer fb, final int seqid) { - final org.apache.thrift.AsyncProcessFunction fcall = this; - return new org.apache.thrift.async.AsyncMethodCallback() { - public void onComplete(java.lang.Boolean o) { - createClusterInterpreterProcess_result result = new createClusterInterpreterProcess_result(); - result.success = o; - result.setSuccessIsSet(true); - try { - fcall.sendResponse(fb, result, org.apache.thrift.protocol.TMessageType.REPLY,seqid); - } catch (org.apache.thrift.transport.TTransportException e) { - _LOGGER.error("TTransportException writing to internal frame buffer", e); - fb.close(); - } catch (java.lang.Exception e) { - _LOGGER.error("Exception writing to internal frame buffer", e); - onError(e); - } - } - public void onError(java.lang.Exception e) { - byte msgType = org.apache.thrift.protocol.TMessageType.REPLY; - org.apache.thrift.TSerializable msg; - createClusterInterpreterProcess_result result = new createClusterInterpreterProcess_result(); - if (e instanceof org.apache.thrift.transport.TTransportException) { - _LOGGER.error("TTransportException inside handler", e); - fb.close(); - return; - } else if (e instanceof org.apache.thrift.TApplicationException) { - _LOGGER.error("TApplicationException inside handler", e); - msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION; - msg = (org.apache.thrift.TApplicationException)e; - } else { - _LOGGER.error("Exception inside handler", e); - msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION; - msg = new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.INTERNAL_ERROR, e.getMessage()); - } - try { - fcall.sendResponse(fb,msg,msgType,seqid); - } catch (java.lang.Exception ex) { - _LOGGER.error("Exception writing to internal frame buffer", ex); - fb.close(); - } - } - }; - } - - protected boolean isOneway() { - return false; - } - - public void start(I iface, createClusterInterpreterProcess_args args, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException { - iface.createClusterInterpreterProcess(args.intpProcParameters,resultHandler); - } - } - - } - - public static class createClusterInterpreterProcess_args implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { - private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("createClusterInterpreterProcess_args"); - - private static final org.apache.thrift.protocol.TField INTP_PROC_PARAMETERS_FIELD_DESC = new org.apache.thrift.protocol.TField("intpProcParameters", org.apache.thrift.protocol.TType.STRUCT, (short)1); - - private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new createClusterInterpreterProcess_argsStandardSchemeFactory(); - private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new createClusterInterpreterProcess_argsTupleSchemeFactory(); - - public @org.apache.thrift.annotation.Nullable ClusterIntpProcParameters intpProcParameters; // required - - /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ - public enum _Fields implements org.apache.thrift.TFieldIdEnum { - INTP_PROC_PARAMETERS((short)1, "intpProcParameters"); - - private static final java.util.Map byName = new java.util.HashMap(); - - static { - for (_Fields field : java.util.EnumSet.allOf(_Fields.class)) { - byName.put(field.getFieldName(), field); - } - } - - /** - * Find the _Fields constant that matches fieldId, or null if its not found. - */ - @org.apache.thrift.annotation.Nullable - public static _Fields findByThriftId(int fieldId) { - switch(fieldId) { - case 1: // INTP_PROC_PARAMETERS - return INTP_PROC_PARAMETERS; - default: - return null; - } - } - - /** - * Find the _Fields constant that matches fieldId, throwing an exception - * if it is not found. - */ - public static _Fields findByThriftIdOrThrow(int fieldId) { - _Fields fields = findByThriftId(fieldId); - if (fields == null) throw new java.lang.IllegalArgumentException("Field " + fieldId + " doesn't exist!"); - return fields; - } - - /** - * Find the _Fields constant that matches name, or null if its not found. - */ - @org.apache.thrift.annotation.Nullable - public static _Fields findByName(java.lang.String name) { - return byName.get(name); - } - - private final short _thriftId; - private final java.lang.String _fieldName; - - _Fields(short thriftId, java.lang.String fieldName) { - _thriftId = thriftId; - _fieldName = fieldName; - } - - public short getThriftFieldId() { - return _thriftId; - } - - public java.lang.String getFieldName() { - return _fieldName; - } - } - - // isset id assignments - public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; - static { - java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); - tmpMap.put(_Fields.INTP_PROC_PARAMETERS, new org.apache.thrift.meta_data.FieldMetaData("intpProcParameters", org.apache.thrift.TFieldRequirementType.DEFAULT, - new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, ClusterIntpProcParameters.class))); - metaDataMap = java.util.Collections.unmodifiableMap(tmpMap); - org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(createClusterInterpreterProcess_args.class, metaDataMap); - } - - public createClusterInterpreterProcess_args() { - } - - public createClusterInterpreterProcess_args( - ClusterIntpProcParameters intpProcParameters) - { - this(); - this.intpProcParameters = intpProcParameters; - } - - /** - * Performs a deep copy on other. - */ - public createClusterInterpreterProcess_args(createClusterInterpreterProcess_args other) { - if (other.isSetIntpProcParameters()) { - this.intpProcParameters = new ClusterIntpProcParameters(other.intpProcParameters); - } - } - - public createClusterInterpreterProcess_args deepCopy() { - return new createClusterInterpreterProcess_args(this); - } - - @Override - public void clear() { - this.intpProcParameters = null; - } - - @org.apache.thrift.annotation.Nullable - public ClusterIntpProcParameters getIntpProcParameters() { - return this.intpProcParameters; - } - - public createClusterInterpreterProcess_args setIntpProcParameters(@org.apache.thrift.annotation.Nullable ClusterIntpProcParameters intpProcParameters) { - this.intpProcParameters = intpProcParameters; - return this; - } - - public void unsetIntpProcParameters() { - this.intpProcParameters = null; - } - - /** Returns true if field intpProcParameters is set (has been assigned a value) and false otherwise */ - public boolean isSetIntpProcParameters() { - return this.intpProcParameters != null; - } - - public void setIntpProcParametersIsSet(boolean value) { - if (!value) { - this.intpProcParameters = null; - } - } - - public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) { - switch (field) { - case INTP_PROC_PARAMETERS: - if (value == null) { - unsetIntpProcParameters(); - } else { - setIntpProcParameters((ClusterIntpProcParameters)value); - } - break; - - } - } - - @org.apache.thrift.annotation.Nullable - public java.lang.Object getFieldValue(_Fields field) { - switch (field) { - case INTP_PROC_PARAMETERS: - return getIntpProcParameters(); - - } - throw new java.lang.IllegalStateException(); - } - - /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */ - public boolean isSet(_Fields field) { - if (field == null) { - throw new java.lang.IllegalArgumentException(); - } - - switch (field) { - case INTP_PROC_PARAMETERS: - return isSetIntpProcParameters(); - } - throw new java.lang.IllegalStateException(); - } - - @Override - public boolean equals(java.lang.Object that) { - if (that == null) - return false; - if (that instanceof createClusterInterpreterProcess_args) - return this.equals((createClusterInterpreterProcess_args)that); - return false; - } - - public boolean equals(createClusterInterpreterProcess_args that) { - if (that == null) - return false; - if (this == that) - return true; - - boolean this_present_intpProcParameters = true && this.isSetIntpProcParameters(); - boolean that_present_intpProcParameters = true && that.isSetIntpProcParameters(); - if (this_present_intpProcParameters || that_present_intpProcParameters) { - if (!(this_present_intpProcParameters && that_present_intpProcParameters)) - return false; - if (!this.intpProcParameters.equals(that.intpProcParameters)) - return false; - } - - return true; - } - - @Override - public int hashCode() { - int hashCode = 1; - - hashCode = hashCode * 8191 + ((isSetIntpProcParameters()) ? 131071 : 524287); - if (isSetIntpProcParameters()) - hashCode = hashCode * 8191 + intpProcParameters.hashCode(); - - return hashCode; - } - - @Override - public int compareTo(createClusterInterpreterProcess_args other) { - if (!getClass().equals(other.getClass())) { - return getClass().getName().compareTo(other.getClass().getName()); - } - - int lastComparison = 0; - - lastComparison = java.lang.Boolean.valueOf(isSetIntpProcParameters()).compareTo(other.isSetIntpProcParameters()); - if (lastComparison != 0) { - return lastComparison; - } - if (isSetIntpProcParameters()) { - lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.intpProcParameters, other.intpProcParameters); - if (lastComparison != 0) { - return lastComparison; - } - } - return 0; - } - - @org.apache.thrift.annotation.Nullable - public _Fields fieldForId(int fieldId) { - return _Fields.findByThriftId(fieldId); - } - - public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException { - scheme(iprot).read(iprot, this); - } - - public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { - scheme(oprot).write(oprot, this); - } - - @Override - public java.lang.String toString() { - java.lang.StringBuilder sb = new java.lang.StringBuilder("createClusterInterpreterProcess_args("); - boolean first = true; - - sb.append("intpProcParameters:"); - if (this.intpProcParameters == null) { - sb.append("null"); - } else { - sb.append(this.intpProcParameters); - } - first = false; - sb.append(")"); - return sb.toString(); - } - - public void validate() throws org.apache.thrift.TException { - // check for required fields - // check for sub-struct validity - if (intpProcParameters != null) { - intpProcParameters.validate(); - } - } - - private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { - try { - write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out))); - } catch (org.apache.thrift.TException te) { - throw new java.io.IOException(te); - } - } - - private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, java.lang.ClassNotFoundException { - try { - read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); - } catch (org.apache.thrift.TException te) { - throw new java.io.IOException(te); - } - } - - private static class createClusterInterpreterProcess_argsStandardSchemeFactory implements org.apache.thrift.scheme.SchemeFactory { - public createClusterInterpreterProcess_argsStandardScheme getScheme() { - return new createClusterInterpreterProcess_argsStandardScheme(); - } - } - - private static class createClusterInterpreterProcess_argsStandardScheme extends org.apache.thrift.scheme.StandardScheme { - - public void read(org.apache.thrift.protocol.TProtocol iprot, createClusterInterpreterProcess_args struct) throws org.apache.thrift.TException { - org.apache.thrift.protocol.TField schemeField; - iprot.readStructBegin(); - while (true) - { - schemeField = iprot.readFieldBegin(); - if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { - break; - } - switch (schemeField.id) { - case 1: // INTP_PROC_PARAMETERS - if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) { - struct.intpProcParameters = new ClusterIntpProcParameters(); - struct.intpProcParameters.read(iprot); - struct.setIntpProcParametersIsSet(true); - } else { - org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); - } - break; - default: - org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); - } - iprot.readFieldEnd(); - } - iprot.readStructEnd(); - - // check for required fields of primitive type, which can't be checked in the validate method - struct.validate(); - } - - public void write(org.apache.thrift.protocol.TProtocol oprot, createClusterInterpreterProcess_args struct) throws org.apache.thrift.TException { - struct.validate(); - - oprot.writeStructBegin(STRUCT_DESC); - if (struct.intpProcParameters != null) { - oprot.writeFieldBegin(INTP_PROC_PARAMETERS_FIELD_DESC); - struct.intpProcParameters.write(oprot); - oprot.writeFieldEnd(); - } - oprot.writeFieldStop(); - oprot.writeStructEnd(); - } - - } - - private static class createClusterInterpreterProcess_argsTupleSchemeFactory implements org.apache.thrift.scheme.SchemeFactory { - public createClusterInterpreterProcess_argsTupleScheme getScheme() { - return new createClusterInterpreterProcess_argsTupleScheme(); - } - } - - private static class createClusterInterpreterProcess_argsTupleScheme extends org.apache.thrift.scheme.TupleScheme { - - @Override - public void write(org.apache.thrift.protocol.TProtocol prot, createClusterInterpreterProcess_args struct) throws org.apache.thrift.TException { - org.apache.thrift.protocol.TTupleProtocol oprot = (org.apache.thrift.protocol.TTupleProtocol) prot; - java.util.BitSet optionals = new java.util.BitSet(); - if (struct.isSetIntpProcParameters()) { - optionals.set(0); - } - oprot.writeBitSet(optionals, 1); - if (struct.isSetIntpProcParameters()) { - struct.intpProcParameters.write(oprot); - } - } - - @Override - public void read(org.apache.thrift.protocol.TProtocol prot, createClusterInterpreterProcess_args struct) throws org.apache.thrift.TException { - org.apache.thrift.protocol.TTupleProtocol iprot = (org.apache.thrift.protocol.TTupleProtocol) prot; - java.util.BitSet incoming = iprot.readBitSet(1); - if (incoming.get(0)) { - struct.intpProcParameters = new ClusterIntpProcParameters(); - struct.intpProcParameters.read(iprot); - struct.setIntpProcParametersIsSet(true); - } - } - } - - private static S scheme(org.apache.thrift.protocol.TProtocol proto) { - return (org.apache.thrift.scheme.StandardScheme.class.equals(proto.getScheme()) ? STANDARD_SCHEME_FACTORY : TUPLE_SCHEME_FACTORY).getScheme(); - } - } - - public static class createClusterInterpreterProcess_result implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { - private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("createClusterInterpreterProcess_result"); - - private static final org.apache.thrift.protocol.TField SUCCESS_FIELD_DESC = new org.apache.thrift.protocol.TField("success", org.apache.thrift.protocol.TType.BOOL, (short)0); - - private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new createClusterInterpreterProcess_resultStandardSchemeFactory(); - private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new createClusterInterpreterProcess_resultTupleSchemeFactory(); - - public boolean success; // required - - /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ - public enum _Fields implements org.apache.thrift.TFieldIdEnum { - SUCCESS((short)0, "success"); - - private static final java.util.Map byName = new java.util.HashMap(); - - static { - for (_Fields field : java.util.EnumSet.allOf(_Fields.class)) { - byName.put(field.getFieldName(), field); - } - } - - /** - * Find the _Fields constant that matches fieldId, or null if its not found. - */ - @org.apache.thrift.annotation.Nullable - public static _Fields findByThriftId(int fieldId) { - switch(fieldId) { - case 0: // SUCCESS - return SUCCESS; - default: - return null; - } - } - - /** - * Find the _Fields constant that matches fieldId, throwing an exception - * if it is not found. - */ - public static _Fields findByThriftIdOrThrow(int fieldId) { - _Fields fields = findByThriftId(fieldId); - if (fields == null) throw new java.lang.IllegalArgumentException("Field " + fieldId + " doesn't exist!"); - return fields; - } - - /** - * Find the _Fields constant that matches name, or null if its not found. - */ - @org.apache.thrift.annotation.Nullable - public static _Fields findByName(java.lang.String name) { - return byName.get(name); - } - - private final short _thriftId; - private final java.lang.String _fieldName; - - _Fields(short thriftId, java.lang.String fieldName) { - _thriftId = thriftId; - _fieldName = fieldName; - } - - public short getThriftFieldId() { - return _thriftId; - } - - public java.lang.String getFieldName() { - return _fieldName; - } - } - - // isset id assignments - private static final int __SUCCESS_ISSET_ID = 0; - private byte __isset_bitfield = 0; - public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; - static { - java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); - tmpMap.put(_Fields.SUCCESS, new org.apache.thrift.meta_data.FieldMetaData("success", org.apache.thrift.TFieldRequirementType.DEFAULT, - new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL))); - metaDataMap = java.util.Collections.unmodifiableMap(tmpMap); - org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(createClusterInterpreterProcess_result.class, metaDataMap); - } - - public createClusterInterpreterProcess_result() { - } - - public createClusterInterpreterProcess_result( - boolean success) - { - this(); - this.success = success; - setSuccessIsSet(true); - } - - /** - * Performs a deep copy on other. - */ - public createClusterInterpreterProcess_result(createClusterInterpreterProcess_result other) { - __isset_bitfield = other.__isset_bitfield; - this.success = other.success; - } - - public createClusterInterpreterProcess_result deepCopy() { - return new createClusterInterpreterProcess_result(this); - } - - @Override - public void clear() { - setSuccessIsSet(false); - this.success = false; - } - - public boolean isSuccess() { - return this.success; - } - - public createClusterInterpreterProcess_result setSuccess(boolean success) { - this.success = success; - setSuccessIsSet(true); - return this; - } - - public void unsetSuccess() { - __isset_bitfield = org.apache.thrift.EncodingUtils.clearBit(__isset_bitfield, __SUCCESS_ISSET_ID); - } - - /** Returns true if field success is set (has been assigned a value) and false otherwise */ - public boolean isSetSuccess() { - return org.apache.thrift.EncodingUtils.testBit(__isset_bitfield, __SUCCESS_ISSET_ID); - } - - public void setSuccessIsSet(boolean value) { - __isset_bitfield = org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __SUCCESS_ISSET_ID, value); - } - - public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) { - switch (field) { - case SUCCESS: - if (value == null) { - unsetSuccess(); - } else { - setSuccess((java.lang.Boolean)value); - } - break; - - } - } - - @org.apache.thrift.annotation.Nullable - public java.lang.Object getFieldValue(_Fields field) { - switch (field) { - case SUCCESS: - return isSuccess(); - - } - throw new java.lang.IllegalStateException(); - } - - /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */ - public boolean isSet(_Fields field) { - if (field == null) { - throw new java.lang.IllegalArgumentException(); - } - - switch (field) { - case SUCCESS: - return isSetSuccess(); - } - throw new java.lang.IllegalStateException(); - } - - @Override - public boolean equals(java.lang.Object that) { - if (that == null) - return false; - if (that instanceof createClusterInterpreterProcess_result) - return this.equals((createClusterInterpreterProcess_result)that); - return false; - } - - public boolean equals(createClusterInterpreterProcess_result that) { - if (that == null) - return false; - if (this == that) - return true; - - boolean this_present_success = true; - boolean that_present_success = true; - if (this_present_success || that_present_success) { - if (!(this_present_success && that_present_success)) - return false; - if (this.success != that.success) - return false; - } - - return true; - } - - @Override - public int hashCode() { - int hashCode = 1; - - hashCode = hashCode * 8191 + ((success) ? 131071 : 524287); - - return hashCode; - } - - @Override - public int compareTo(createClusterInterpreterProcess_result other) { - if (!getClass().equals(other.getClass())) { - return getClass().getName().compareTo(other.getClass().getName()); - } - - int lastComparison = 0; - - lastComparison = java.lang.Boolean.valueOf(isSetSuccess()).compareTo(other.isSetSuccess()); - if (lastComparison != 0) { - return lastComparison; - } - if (isSetSuccess()) { - lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.success, other.success); - if (lastComparison != 0) { - return lastComparison; - } - } - return 0; - } - - @org.apache.thrift.annotation.Nullable - public _Fields fieldForId(int fieldId) { - return _Fields.findByThriftId(fieldId); - } - - public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException { - scheme(iprot).read(iprot, this); - } - - public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { - scheme(oprot).write(oprot, this); - } - - @Override - public java.lang.String toString() { - java.lang.StringBuilder sb = new java.lang.StringBuilder("createClusterInterpreterProcess_result("); - boolean first = true; - - sb.append("success:"); - sb.append(this.success); - first = false; - sb.append(")"); - return sb.toString(); - } - - public void validate() throws org.apache.thrift.TException { - // check for required fields - // check for sub-struct validity - } - - private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { - try { - write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out))); - } catch (org.apache.thrift.TException te) { - throw new java.io.IOException(te); - } - } - - private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, java.lang.ClassNotFoundException { - try { - // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor. - __isset_bitfield = 0; - read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); - } catch (org.apache.thrift.TException te) { - throw new java.io.IOException(te); - } - } - - private static class createClusterInterpreterProcess_resultStandardSchemeFactory implements org.apache.thrift.scheme.SchemeFactory { - public createClusterInterpreterProcess_resultStandardScheme getScheme() { - return new createClusterInterpreterProcess_resultStandardScheme(); - } - } - - private static class createClusterInterpreterProcess_resultStandardScheme extends org.apache.thrift.scheme.StandardScheme { - - public void read(org.apache.thrift.protocol.TProtocol iprot, createClusterInterpreterProcess_result struct) throws org.apache.thrift.TException { - org.apache.thrift.protocol.TField schemeField; - iprot.readStructBegin(); - while (true) - { - schemeField = iprot.readFieldBegin(); - if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { - break; - } - switch (schemeField.id) { - case 0: // SUCCESS - if (schemeField.type == org.apache.thrift.protocol.TType.BOOL) { - struct.success = iprot.readBool(); - struct.setSuccessIsSet(true); - } else { - org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); - } - break; - default: - org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); - } - iprot.readFieldEnd(); - } - iprot.readStructEnd(); - - // check for required fields of primitive type, which can't be checked in the validate method - struct.validate(); - } - - public void write(org.apache.thrift.protocol.TProtocol oprot, createClusterInterpreterProcess_result struct) throws org.apache.thrift.TException { - struct.validate(); - - oprot.writeStructBegin(STRUCT_DESC); - if (struct.isSetSuccess()) { - oprot.writeFieldBegin(SUCCESS_FIELD_DESC); - oprot.writeBool(struct.success); - oprot.writeFieldEnd(); - } - oprot.writeFieldStop(); - oprot.writeStructEnd(); - } - - } - - private static class createClusterInterpreterProcess_resultTupleSchemeFactory implements org.apache.thrift.scheme.SchemeFactory { - public createClusterInterpreterProcess_resultTupleScheme getScheme() { - return new createClusterInterpreterProcess_resultTupleScheme(); - } - } - - private static class createClusterInterpreterProcess_resultTupleScheme extends org.apache.thrift.scheme.TupleScheme { - - @Override - public void write(org.apache.thrift.protocol.TProtocol prot, createClusterInterpreterProcess_result struct) throws org.apache.thrift.TException { - org.apache.thrift.protocol.TTupleProtocol oprot = (org.apache.thrift.protocol.TTupleProtocol) prot; - java.util.BitSet optionals = new java.util.BitSet(); - if (struct.isSetSuccess()) { - optionals.set(0); - } - oprot.writeBitSet(optionals, 1); - if (struct.isSetSuccess()) { - oprot.writeBool(struct.success); - } - } - - @Override - public void read(org.apache.thrift.protocol.TProtocol prot, createClusterInterpreterProcess_result struct) throws org.apache.thrift.TException { - org.apache.thrift.protocol.TTupleProtocol iprot = (org.apache.thrift.protocol.TTupleProtocol) prot; - java.util.BitSet incoming = iprot.readBitSet(1); - if (incoming.get(0)) { - struct.success = iprot.readBool(); - struct.setSuccessIsSet(true); - } - } - } - - private static S scheme(org.apache.thrift.protocol.TProtocol proto) { - return (org.apache.thrift.scheme.StandardScheme.class.equals(proto.getScheme()) ? STANDARD_SCHEME_FACTORY : TUPLE_SCHEME_FACTORY).getScheme(); - } - } - -} diff --git a/zeppelin-interpreter/src/main/thrift/ClusterManagerService.thrift b/zeppelin-interpreter/src/main/thrift/ClusterManagerService.thrift deleted file mode 100644 index c6f208e..0000000 --- a/zeppelin-interpreter/src/main/thrift/ClusterManagerService.thrift +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -namespace java org.apache.zeppelin.interpreter.thrift - -struct ClusterIntpProcParameters { - 1: string host, - 2: i32 port, - 3: string userName, - 4: string noteId, - 5: string replName, - 6: string defaultInterpreterSetting -} - -service ClusterManagerService { - bool createClusterInterpreterProcess(1: ClusterIntpProcParameters intpProcParameters); -} diff --git a/zeppelin-interpreter/src/main/thrift/genthrift.sh b/zeppelin-interpreter/src/main/thrift/genthrift.sh index 31efeae..23a295a 100755 --- a/zeppelin-interpreter/src/main/thrift/genthrift.sh +++ b/zeppelin-interpreter/src/main/thrift/genthrift.sh @@ -21,7 +21,6 @@ rm -rf gen-java rm -rf ../java/org/apache/zeppelin/interpreter/thrift thrift --gen java RemoteInterpreterService.thrift thrift --gen java RemoteInterpreterEventService.thrift -thrift --gen java ClusterManagerService.thrift for file in gen-java/org/apache/zeppelin/interpreter/thrift/* ; do cat java_license_header.txt ${file} > ${file}.tmp mv -f ${file}.tmp ${file} diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterMultiNodeTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterMultiNodeTest.java new file mode 100644 index 0000000..e2ce781 --- /dev/null +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterMultiNodeTest.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.cluster; + +import org.apache.zeppelin.cluster.meta.ClusterMetaType; +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +public class ClusterMultiNodeTest { + private static Logger LOGGER = LoggerFactory.getLogger(ClusterMultiNodeTest.class); + + private static List clusterServers = new ArrayList<>(); + private static ClusterManagerClient clusterClient = null; + + static final String metaKey = "ClusterMultiNodeTestKey"; + + @BeforeClass + public static void startCluster() throws IOException, InterruptedException { + LOGGER.info("startCluster >>>"); + + String clusterAddrList = ""; + String zServerHost = RemoteInterpreterUtils.findAvailableHostAddress(); + for (int i = 0; i < 3; i ++) { + // Set the cluster IP and port + int zServerPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(); + clusterAddrList += zServerHost + ":" + zServerPort; + if (i != 2) { + clusterAddrList += ","; + } + } + ZeppelinConfiguration zconf = ZeppelinConfiguration.create(); + zconf.setClusterAddress(clusterAddrList); + + // mock cluster manager server + String cluster[] = clusterAddrList.split(","); + try { + for (int i = 0; i < 3; i ++) { + String[] parts = cluster[i].split(":"); + String clusterHost = parts[0]; + int clusterPort = Integer.valueOf(parts[1]); + + Class clazz = ClusterManagerServer.class; + Constructor constructor = clazz.getDeclaredConstructor(); + constructor.setAccessible(true); + ClusterManagerServer clusterServer = (ClusterManagerServer) constructor.newInstance(); + clusterServer.initTestCluster(clusterAddrList, clusterHost, clusterPort); + + clusterServers.add(clusterServer); + } + } catch (Exception e) { + LOGGER.error(e.getMessage(), e); + } + + for (ClusterManagerServer clusterServer : clusterServers) { + clusterServer.start(); + } + + // mock cluster manager client + clusterClient = ClusterManagerClient.getInstance(); + clusterClient.start(metaKey); + + // Waiting for cluster startup + int wait = 0; + while(wait++ < 100) { + if (clusterIsStartup() && clusterClient.raftInitialized()) { + LOGGER.info("wait {}(ms) found cluster leader", wait*3000); + break; + } + try { + Thread.sleep(500); + } catch (InterruptedException e) { + LOGGER.error(e.getMessage(), e); + } + } + + Thread.sleep(3000); + assertEquals(true, clusterIsStartup()); + LOGGER.info("startCluster <<<"); + + getClusterServerMeta(); + } + + @AfterClass + public static void stopCluster() { + LOGGER.info("stopCluster >>>"); + if (null != clusterClient) { + clusterClient.shutdown(); + } + for (ClusterManagerServer clusterServer : clusterServers) { + clusterServer.shutdown(); + } + LOGGER.info("stopCluster <<<"); + } + + static boolean clusterIsStartup() { + boolean foundLeader = false; + for (ClusterManagerServer clusterServer : clusterServers) { + if (!clusterServer.raftInitialized()) { + LOGGER.warn("clusterServer not Initialized!"); + return false; + } + if (clusterServer.isClusterLeader()) { + foundLeader = true; + } + } + + if (!foundLeader) { + LOGGER.warn("Can not found leader!"); + return false; + } + + return true; + } + + public static void getClusterServerMeta() { + LOGGER.info("getClusterServerMeta >>>"); + // Get metadata for all services + Object srvMeta = clusterClient.getClusterMeta(ClusterMetaType.SERVER_META, ""); + LOGGER.info(srvMeta.toString()); + + Object intpMeta = clusterClient.getClusterMeta(ClusterMetaType.INTP_PROCESS_META, ""); + LOGGER.info(intpMeta.toString()); + + assertNotNull(srvMeta); + assertEquals(true, (srvMeta instanceof HashMap)); + HashMap hashMap = (HashMap) srvMeta; + + assertEquals(hashMap.size(), 3); + LOGGER.info("getClusterServerMeta <<<"); + } +} diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterManagerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterSingleNodeTest.java similarity index 69% copy from zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterManagerTest.java copy to zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterSingleNodeTest.java index ef4d7fd..59853d4 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterManagerTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterSingleNodeTest.java @@ -1,4 +1,3 @@ -package org.apache.zeppelin.cluster; /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -15,11 +14,13 @@ package org.apache.zeppelin.cluster; * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.zeppelin.cluster; import org.apache.zeppelin.cluster.meta.ClusterMeta; import org.apache.zeppelin.cluster.meta.ClusterMetaType; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils; +import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.slf4j.Logger; @@ -31,21 +32,20 @@ import java.util.HashMap; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; -public class ClusterManagerTest { - private static Logger LOGGER = LoggerFactory.getLogger(ClusterManagerTest.class); - - private static ClusterManagerServer clusterManagerServer = null; - private static ClusterManagerClient clusterManagerClient = null; +public class ClusterSingleNodeTest { + private static Logger LOGGER = LoggerFactory.getLogger(ClusterSingleNodeTest.class); + private static ZeppelinConfiguration zconf; - private static ZeppelinConfiguration zconf = null; + private static ClusterManagerServer clusterServer = null; + private static ClusterManagerClient clusterClient = null; static String zServerHost; static int zServerPort; - static final String metaKey = "ClusterManagerTestKey"; + static final String metaKey = "ClusterSingleNodeTestKey"; @BeforeClass - public static void initClusterEnv() throws IOException, InterruptedException { - LOGGER.info("initClusterEnv >>>"); + public static void startCluster() throws IOException, InterruptedException { + LOGGER.info("startCluster >>>"); zconf = ZeppelinConfiguration.create(); @@ -55,19 +55,19 @@ public class ClusterManagerTest { zconf.setClusterAddress(zServerHost + ":" + zServerPort); // mock cluster manager server - clusterManagerServer = ClusterManagerServer.getInstance(); - clusterManagerServer.start(null); + clusterServer = ClusterManagerServer.getInstance(); + clusterServer.start(); // mock cluster manager client - clusterManagerClient = ClusterManagerClient.getInstance(); - clusterManagerClient.start(metaKey); + clusterClient = ClusterManagerClient.getInstance(); + clusterClient.start(metaKey); // Waiting for cluster startup int wait = 0; while(wait++ < 100) { - if (clusterManagerServer.isClusterLeader() - && clusterManagerServer.raftInitialized() - && clusterManagerClient.raftInitialized()) { + if (clusterServer.isClusterLeader() + && clusterServer.raftInitialized() + && clusterClient.raftInitialized()) { LOGGER.info("wait {}(ms) found cluster leader", wait*3000); break; } @@ -77,19 +77,33 @@ public class ClusterManagerTest { e.printStackTrace(); } } - assertEquals(true, clusterManagerServer.isClusterLeader()); - LOGGER.info("initClusterEnv <<<"); + Thread.sleep(3000); + assertEquals(true, clusterServer.isClusterLeader()); + LOGGER.info("startCluster <<<"); + } + + @AfterClass + public static void stopCluster() { + if (null != clusterClient) { + clusterClient.shutdown(); + } + if (null != clusterClient) { + clusterServer.shutdown(); + } + LOGGER.info("stopCluster"); } @Test public void getServerMeta() { - LOGGER.info("serverMeta >>>"); + LOGGER.info("getServerMeta >>>"); // Get metadata for all services - Object meta = clusterManagerClient.getClusterMeta(ClusterMetaType.ServerMeta, ""); - + Object meta = clusterClient.getClusterMeta(ClusterMetaType.SERVER_META, ""); LOGGER.info(meta.toString()); + Object intpMeta = clusterClient.getClusterMeta(ClusterMetaType.INTP_PROCESS_META, ""); + LOGGER.info(intpMeta.toString()); + assertNotNull(meta); assertEquals(true, (meta instanceof HashMap)); HashMap hashMap = (HashMap) meta; @@ -101,7 +115,7 @@ public class ClusterManagerTest { assertEquals(true, mapMetaValues.size()>0); - LOGGER.info("serverMeta <<<"); + LOGGER.info("getServerMeta <<<"); } @Test @@ -110,8 +124,6 @@ public class ClusterManagerTest { HashMap meta = new HashMap<>(); meta.put(ClusterMeta.SERVER_HOST, zServerHost); meta.put(ClusterMeta.SERVER_PORT, zServerPort); - meta.put(ClusterMeta.SERVER_TSERVER_HOST, "SERVER_TSERVER_HOST"); - meta.put(ClusterMeta.SERVER_TSERVER_PORT, "SERVER_TSERVER_PORT"); meta.put(ClusterMeta.INTP_TSERVER_HOST, "INTP_TSERVER_HOST"); meta.put(ClusterMeta.INTP_TSERVER_PORT, "INTP_TSERVER_PORT"); meta.put(ClusterMeta.CPU_CAPACITY, "CPU_CAPACITY"); @@ -120,11 +132,11 @@ public class ClusterManagerTest { meta.put(ClusterMeta.MEMORY_USED, "MEMORY_USED"); // put IntpProcess Meta - clusterManagerClient.putClusterMeta(ClusterMetaType.IntpProcessMeta, metaKey, meta); + clusterClient.putClusterMeta(ClusterMetaType.INTP_PROCESS_META, metaKey, meta); // get IntpProcess Meta HashMap> check - = clusterManagerClient.getClusterMeta(ClusterMetaType.IntpProcessMeta, metaKey); + = clusterClient.getClusterMeta(ClusterMetaType.INTP_PROCESS_META, metaKey); LOGGER.info(check.toString()); diff --git a/zeppelin-plugins/launcher/cluster/pom.xml b/zeppelin-plugins/launcher/cluster/pom.xml new file mode 100644 index 0000000..80ca0b3 --- /dev/null +++ b/zeppelin-plugins/launcher/cluster/pom.xml @@ -0,0 +1,90 @@ + + + + + + 4.0.0 + + + zengine-plugins-parent + org.apache.zeppelin + 0.9.0-SNAPSHOT + ../../../zeppelin-plugins + + + org.apache.zeppelin + launcher-cluster + jar + 0.9.0-SNAPSHOT + Zeppelin: Plugin Cluster Launcher + Launcher implementation to run interpreters on cluster + + + Launcher/ClusterInterpreterLauncher + + + + + org.apache.zeppelin + launcher-standard + 0.9.0-SNAPSHOT + + + + + + + ${project.basedir}/src/test/resources + + + ${project.basedir}/src/main/resources + + + + + maven-enforcer-plugin + + + enforce + none + + + + + maven-dependency-plugin + + + org.apache.maven.plugins + maven-checkstyle-plugin + + false + + + + + + src/main/resources + + **/*.* + + + + + diff --git a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java new file mode 100644 index 0000000..7d8ff1e --- /dev/null +++ b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.launcher; + +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; +import org.apache.zeppelin.cluster.ClusterManagerServer; +import org.apache.zeppelin.cluster.event.ClusterEvent; +import org.apache.zeppelin.cluster.event.ClusterEventListener; +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.interpreter.InterpreterOption; +import org.apache.zeppelin.interpreter.InterpreterRunner; +import org.apache.zeppelin.interpreter.recovery.RecoveryStorage; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterRunningProcess; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.zeppelin.cluster.event.ClusterEvent.CREATE_INTP_PROCESS; +import static org.apache.zeppelin.cluster.meta.ClusterMeta.INTP_TSERVER_HOST; +import static org.apache.zeppelin.cluster.meta.ClusterMeta.INTP_TSERVER_PORT; +import static org.apache.zeppelin.cluster.meta.ClusterMeta.SERVER_HOST; +import static org.apache.zeppelin.cluster.meta.ClusterMeta.SERVER_PORT; +import static org.apache.zeppelin.cluster.meta.ClusterMetaType.INTP_PROCESS_META; + +/** + * Interpreter Launcher which use cluster to launch the interpreter process. + */ +public class ClusterInterpreterLauncher extends StandardInterpreterLauncher + implements ClusterEventListener { + private static final Logger LOGGER = LoggerFactory.getLogger(ClusterInterpreterLauncher.class); + + public static final int CHECK_META_INTERVAL = 500; // ms + private InterpreterLaunchContext context; + private ClusterManagerServer clusterServer = ClusterManagerServer.getInstance(); + + public ClusterInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage) + throws IOException { + super(zConf, recoveryStorage); + clusterServer.addClusterEventListeners(this); + } + + @Override + public InterpreterClient launch(InterpreterLaunchContext context) throws IOException { + LOGGER.info("Launching Interpreter: " + context.getInterpreterSettingGroup()); + + this.context = context; + this.properties = context.getProperties(); + int connectTimeout = getConnectTimeout(); + String intpGroupId = context.getInterpreterGroupId(); + + HashMap intpProcMeta = clusterServer + .getClusterMeta(INTP_PROCESS_META, intpGroupId).get(intpGroupId); + if (null != intpProcMeta && intpProcMeta.containsKey(INTP_TSERVER_HOST) + && intpProcMeta.containsKey(INTP_TSERVER_PORT)) { + // connect exist Interpreter Process + String intpTserverHost = (String) intpProcMeta.get(INTP_TSERVER_HOST); + int intpTserverPort = (int) intpProcMeta.get(INTP_TSERVER_PORT); + return new RemoteInterpreterRunningProcess( + context.getInterpreterSettingName(), + connectTimeout, + intpTserverHost, + intpTserverPort); + } else { + // No process was found for the InterpreterGroup ID + HashMap meta = clusterServer.getIdleNodeMeta(); + if (null == meta) { + LOGGER.error("Don't get idle node meta, launch interpreter on local."); + super.launch(context); + } + + String srvHost = (String) meta.get(SERVER_HOST); + String localhost = RemoteInterpreterUtils.findAvailableHostAddress(); + + if (localhost.equalsIgnoreCase(srvHost) && false) { + // launch interpreter on local + return super.launch(context); + } else { + int srvPort = (int) meta.get(SERVER_PORT); + + Gson gson = new Gson(); + String sContext = gson.toJson(context); + + Map mapEvent = new HashMap<>(); + mapEvent.put(CLUSTER_EVENT, CREATE_INTP_PROCESS); + mapEvent.put(CLUSTER_EVENT_MSG, sContext); + String strEvent = gson.toJson(mapEvent); + clusterServer.unicastClusterEvent(srvHost, srvPort, strEvent); + + HashMap intpMeta = clusterServer + .getClusterMeta(INTP_PROCESS_META, intpGroupId).get(intpGroupId); + int retryGetMeta = connectTimeout / CHECK_META_INTERVAL; + while ((retryGetMeta-- > 0) & + (null == intpMeta || !intpMeta.containsKey(INTP_TSERVER_HOST) + || !intpMeta.containsKey(INTP_TSERVER_PORT)) ) { + try { + Thread.sleep(CHECK_META_INTERVAL); + intpMeta = clusterServer + .getClusterMeta(INTP_PROCESS_META, intpGroupId).get(intpGroupId); + LOGGER.warn("retry {} times to get {} meta!", retryGetMeta, intpGroupId); + } catch (InterruptedException e) { + LOGGER.error(e.getMessage(), e); + } + } + + // Check if the remote creation process is successful + if (null == intpMeta || !intpMeta.containsKey(INTP_TSERVER_HOST) + || !intpMeta.containsKey(INTP_TSERVER_PORT)) { + LOGGER.error("Creating process {} failed on remote server {}:{}", + intpGroupId, srvHost, srvPort); + + // launch interpreter on local + return super.launch(context); + } else { + // connnect remote interpreter process + String intpTSrvHost = (String) intpMeta.get(INTP_TSERVER_HOST); + int intpTSrvPort = (int) intpMeta.get(INTP_TSERVER_PORT); + return new RemoteInterpreterRunningProcess( + context.getInterpreterSettingName(), + connectTimeout, + intpTSrvHost, + intpTSrvPort); + } + } + } + } + + @Override + public void onClusterEvent(String event) { + Gson gson = new Gson(); + Map mapEvent = gson.fromJson(event, + new TypeToken>(){}.getType()); + String sEvent = (String) mapEvent.get(CLUSTER_EVENT); + ClusterEvent clusterEvent = ClusterEvent.valueOf(sEvent); + + switch (clusterEvent) { + case CREATE_INTP_PROCESS: + onCreateIntpProcess(mapEvent); + break; + default: + LOGGER.error("Unknown Cluster Event : {}", clusterEvent); + break; + } + } + + private void onCreateIntpProcess(Map mapEvent) { + String eventMsg = (String) mapEvent.get(CLUSTER_EVENT_MSG); + try { + Gson gson = new Gson(); + InterpreterLaunchContext context = gson.fromJson( + eventMsg, new TypeToken() {}.getType()); + + this.properties = context.getProperties(); + InterpreterOption option = context.getOption(); + InterpreterRunner runner = context.getRunner(); + String intpSetGroupName = context.getInterpreterSettingGroup(); + String intpSetName = context.getInterpreterSettingName(); + int connectTimeout = getConnectTimeout(); + String localRepoPath = zConf.getInterpreterLocalRepoPath() + "/" + + context.getInterpreterSettingId(); + + ClusterInterpreterProcess clusterInterpreterProcess + = new ClusterInterpreterProcess( + runner != null ? runner.getPath() : zConf.getInterpreterRemoteRunnerPath(), + context.getZeppelinServerRPCPort(), + context.getZeppelinServerHost(), + zConf.getInterpreterPortRange(), + zConf.getInterpreterDir() + "/" + intpSetGroupName, + localRepoPath, + buildEnvFromProperties(context), + connectTimeout, + intpSetName, + context.getInterpreterGroupId(), + option.isUserImpersonate()); + + clusterInterpreterProcess.start(context.getUserName()); + } catch (IOException e) { + LOGGER.error(e.getMessage(), e); + } + } +} diff --git a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterProcess.java b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterProcess.java new file mode 100644 index 0000000..986c2ed --- /dev/null +++ b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterProcess.java @@ -0,0 +1,141 @@ +package org.apache.zeppelin.interpreter.launcher; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.zeppelin.cluster.ClusterManagerServer; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.zeppelin.cluster.meta.ClusterMeta.INTP_TSERVER_HOST; +import static org.apache.zeppelin.cluster.meta.ClusterMeta.INTP_TSERVER_PORT; +import static org.apache.zeppelin.cluster.meta.ClusterMetaType.INTP_PROCESS_META; + + +public class ClusterInterpreterProcess extends RemoteInterpreterManagedProcess { + private static final Logger LOGGER + = LoggerFactory.getLogger(ClusterInterpreterProcess.class); + + private String intpHost = ""; + private int intpPort = 0; + + private ClusterManagerServer clusterServer = ClusterManagerServer.getInstance(); + + public ClusterInterpreterProcess( + String intpRunner, + int zeppelinServerRPCPort, + String zeppelinServerRPCHost, + String interpreterPortRange, + String intpDir, + String localRepoDir, + Map env, + int connectTimeout, + String interpreterSettingName, + String interpreterGroupId, + boolean isUserImpersonated) { + + super(intpRunner, + zeppelinServerRPCPort, + zeppelinServerRPCHost, + interpreterPortRange, + intpDir, + localRepoDir, + env, + connectTimeout, + interpreterSettingName, + interpreterGroupId, + isUserImpersonated); + } + + @Override + public void start(String userName) throws IOException { + CheckIntpRunStatusThread checkIntpRunStatusThread = new CheckIntpRunStatusThread(this); + checkIntpRunStatusThread.start(); + + super.start(userName); + } + + @Override + public void processStarted(int port, String host) { + // Cluster mode, discovering interpreter processes through metadata registration + this.intpHost = host; + this.intpPort = port; + super.processStarted(port, host); + } + + @Override + public String getHost() { + return intpHost; + } + + @Override + public int getPort() { + return intpPort; + } + + @Override + public boolean isRunning() { + if (RemoteInterpreterUtils.checkIfRemoteEndpointAccessible(getHost(), getPort())) { + return true; + } + return false; + } + + @Override + public String getErrorMessage() { + return null; + } + + // Metadata registered in the cluster by the interpreter process, + // Keep the interpreter process started + private class CheckIntpRunStatusThread extends Thread { + private ClusterInterpreterProcess intpProcess; + + CheckIntpRunStatusThread(ClusterInterpreterProcess intpProcess) { + this.intpProcess = intpProcess; + } + + @Override + public void run() { + LOGGER.info("checkIntpRunStatusThread run() >>>"); + + String intpGroupId = getInterpreterGroupId(); + + HashMap intpMeta = clusterServer + .getClusterMeta(INTP_PROCESS_META, intpGroupId).get(intpGroupId); + int connectTimeout = intpProcess.getConnectTimeout(); + int retryGetMeta = connectTimeout / ClusterInterpreterLauncher.CHECK_META_INTERVAL; + while ((retryGetMeta-- > 0) + && (null == intpMeta || !intpMeta.containsKey(INTP_TSERVER_HOST) + || !intpMeta.containsKey(INTP_TSERVER_PORT))) { + try { + Thread.sleep(ClusterInterpreterLauncher.CHECK_META_INTERVAL); + intpMeta = clusterServer + .getClusterMeta(INTP_PROCESS_META, intpGroupId).get(intpGroupId); + LOGGER.info("retry {} times to get {} meta!", retryGetMeta, intpGroupId); + } catch (InterruptedException e) { + LOGGER.error(e.getMessage(), e); + } + + if (null != intpMeta && intpMeta.containsKey(INTP_TSERVER_HOST) + && intpMeta.containsKey(INTP_TSERVER_PORT)) { + String intpHost = (String) intpMeta.get(INTP_TSERVER_HOST); + int intpPort = (int) intpMeta.get(INTP_TSERVER_PORT); + LOGGER.info("Found cluster interpreter {}:{}", intpHost, intpPort); + + intpProcess.processStarted(intpPort, intpHost); + } + } + + if (null == intpMeta || !intpMeta.containsKey(INTP_TSERVER_HOST) + || !intpMeta.containsKey(INTP_TSERVER_PORT)) { + LOGGER.error("Can not found interpreter meta!"); + } + + LOGGER.info("checkIntpRunStatusThread run() <<<"); + } + } +} diff --git a/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncherTest.java b/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncherTest.java new file mode 100644 index 0000000..da4bf5e --- /dev/null +++ b/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncherTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.interpreter.launcher; + +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.interpreter.InterpreterOption; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterRunningProcess; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class ClusterInterpreterLauncherTest extends ClusterMockTest { + + @BeforeClass + public static void startTest() throws IOException, InterruptedException { + ClusterMockTest.startCluster(); + } + + @AfterClass + public static void stopTest() throws IOException, InterruptedException { + ClusterMockTest.stopCluster(); + } + + @Before + public void setUp() { + for (final ZeppelinConfiguration.ConfVars confVar : ZeppelinConfiguration.ConfVars.values()) { + System.clearProperty(confVar.getVarName()); + } + } + + @Test + public void testConnectExistIntpProcess() throws IOException { + mockIntpProcessMeta("intpGroupId"); + + ClusterInterpreterLauncher launcher + = new ClusterInterpreterLauncher(ClusterMockTest.zconf, null); + Properties properties = new Properties(); + properties.setProperty( + ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName(), "5000"); + InterpreterOption option = new InterpreterOption(); + option.setUserImpersonate(true); + InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, + "user1", "intpGroupId", "groupId", + "groupName", "name", 0, "host"); + InterpreterClient client = launcher.launch(context); + + assertTrue(client instanceof RemoteInterpreterRunningProcess); + RemoteInterpreterRunningProcess interpreterProcess = (RemoteInterpreterRunningProcess) client; + assertEquals("INTP_TSERVER_HOST", interpreterProcess.getHost()); + assertEquals(0, interpreterProcess.getPort()); + assertEquals("name", interpreterProcess.getInterpreterSettingName()); + assertEquals(5000, interpreterProcess.getConnectTimeout()); + } + + @Test + public void testCreateIntpProcess() throws IOException { + ClusterInterpreterLauncher launcher + = new ClusterInterpreterLauncher(ClusterMockTest.zconf, null); + Properties properties = new Properties(); + properties.setProperty( + ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName(), "5000"); + InterpreterOption option = new InterpreterOption(); + option.setUserImpersonate(true); + InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, + "user1", "intpGroupId", "groupId", + "groupName", "name", 0, "host"); + InterpreterClient client = launcher.launch(context); + + assertTrue(client instanceof RemoteInterpreterManagedProcess); + RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client; + assertEquals("name", interpreterProcess.getInterpreterSettingName()); + assertEquals(".//interpreter/groupName", interpreterProcess.getInterpreterDir()); + assertEquals(".//local-repo/groupId", interpreterProcess.getLocalRepoDir()); + assertEquals(5000, interpreterProcess.getConnectTimeout()); + assertEquals(zconf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner()); + assertTrue(interpreterProcess.getEnv().size() >= 1); + assertEquals(true, interpreterProcess.isUserImpersonated()); + } +} diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterManagerTest.java b/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterMockTest.java similarity index 61% rename from zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterManagerTest.java rename to zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterMockTest.java index ef4d7fd..72b51b4 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterManagerTest.java +++ b/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterMockTest.java @@ -1,4 +1,3 @@ -package org.apache.zeppelin.cluster; /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -15,13 +14,14 @@ package org.apache.zeppelin.cluster; * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.zeppelin.interpreter.launcher; +import org.apache.zeppelin.cluster.ClusterManagerClient; +import org.apache.zeppelin.cluster.ClusterManagerServer; import org.apache.zeppelin.cluster.meta.ClusterMeta; import org.apache.zeppelin.cluster.meta.ClusterMetaType; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils; -import org.junit.BeforeClass; -import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,21 +31,20 @@ import java.util.HashMap; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; -public class ClusterManagerTest { - private static Logger LOGGER = LoggerFactory.getLogger(ClusterManagerTest.class); +public class ClusterMockTest { + private static Logger LOGGER = LoggerFactory.getLogger(ClusterMockTest.class); - private static ClusterManagerServer clusterManagerServer = null; - private static ClusterManagerClient clusterManagerClient = null; + private static ClusterManagerServer clusterServer = null; + private static ClusterManagerClient clusterClient = null; - private static ZeppelinConfiguration zconf = null; + protected static ZeppelinConfiguration zconf = null; static String zServerHost; static int zServerPort; - static final String metaKey = "ClusterManagerTestKey"; + static final String metaKey = "ClusterMockKey"; - @BeforeClass - public static void initClusterEnv() throws IOException, InterruptedException { - LOGGER.info("initClusterEnv >>>"); + public static void startCluster() throws IOException, InterruptedException { + LOGGER.info("startCluster >>>"); zconf = ZeppelinConfiguration.create(); @@ -55,20 +54,20 @@ public class ClusterManagerTest { zconf.setClusterAddress(zServerHost + ":" + zServerPort); // mock cluster manager server - clusterManagerServer = ClusterManagerServer.getInstance(); - clusterManagerServer.start(null); + clusterServer = ClusterManagerServer.getInstance(); + clusterServer.start(); // mock cluster manager client - clusterManagerClient = ClusterManagerClient.getInstance(); - clusterManagerClient.start(metaKey); + clusterClient = ClusterManagerClient.getInstance(); + clusterClient.start(metaKey); // Waiting for cluster startup int wait = 0; - while(wait++ < 100) { - if (clusterManagerServer.isClusterLeader() - && clusterManagerServer.raftInitialized() - && clusterManagerClient.raftInitialized()) { - LOGGER.info("wait {}(ms) found cluster leader", wait*3000); + while (wait++ < 100) { + if (clusterServer.isClusterLeader() + && clusterServer.raftInitialized() + && clusterClient.raftInitialized()) { + LOGGER.info("wait {}(ms) found cluster leader", wait * 3000); break; } try { @@ -77,16 +76,27 @@ public class ClusterManagerTest { e.printStackTrace(); } } - assertEquals(true, clusterManagerServer.isClusterLeader()); - LOGGER.info("initClusterEnv <<<"); + assertEquals(true, clusterServer.isClusterLeader()); + + LOGGER.info("startCluster <<<"); + } + + public static void stopCluster() { + LOGGER.info("stopCluster >>>"); + if (null != clusterClient) { + clusterClient.shutdown(); + } + if (null != clusterClient) { + clusterServer.shutdown(); + } + LOGGER.info("stopCluster <<<"); } - @Test public void getServerMeta() { LOGGER.info("serverMeta >>>"); // Get metadata for all services - Object meta = clusterManagerClient.getClusterMeta(ClusterMetaType.ServerMeta, ""); + Object meta = clusterClient.getClusterMeta(ClusterMetaType.SERVER_META, ""); LOGGER.info(meta.toString()); @@ -99,37 +109,34 @@ public class ClusterManagerTest { assertEquals(true, (values instanceof HashMap)); HashMap mapMetaValues = (HashMap) values; - assertEquals(true, mapMetaValues.size()>0); + assertEquals(true, mapMetaValues.size() > 0); LOGGER.info("serverMeta <<<"); } - @Test - public void putIntpProcessMeta() { + public void mockIntpProcessMeta(String metaKey) { // mock IntpProcess Meta HashMap meta = new HashMap<>(); - meta.put(ClusterMeta.SERVER_HOST, zServerHost); - meta.put(ClusterMeta.SERVER_PORT, zServerPort); - meta.put(ClusterMeta.SERVER_TSERVER_HOST, "SERVER_TSERVER_HOST"); - meta.put(ClusterMeta.SERVER_TSERVER_PORT, "SERVER_TSERVER_PORT"); + meta.put(ClusterMeta.SERVER_HOST, "SERVER_HOST"); + meta.put(ClusterMeta.SERVER_PORT, 0); meta.put(ClusterMeta.INTP_TSERVER_HOST, "INTP_TSERVER_HOST"); - meta.put(ClusterMeta.INTP_TSERVER_PORT, "INTP_TSERVER_PORT"); + meta.put(ClusterMeta.INTP_TSERVER_PORT, 0); meta.put(ClusterMeta.CPU_CAPACITY, "CPU_CAPACITY"); meta.put(ClusterMeta.CPU_USED, "CPU_USED"); meta.put(ClusterMeta.MEMORY_CAPACITY, "MEMORY_CAPACITY"); meta.put(ClusterMeta.MEMORY_USED, "MEMORY_USED"); // put IntpProcess Meta - clusterManagerClient.putClusterMeta(ClusterMetaType.IntpProcessMeta, metaKey, meta); + clusterClient.putClusterMeta(ClusterMetaType.INTP_PROCESS_META, metaKey, meta); // get IntpProcess Meta HashMap> check - = clusterManagerClient.getClusterMeta(ClusterMetaType.IntpProcessMeta, metaKey); + = clusterClient.getClusterMeta(ClusterMetaType.INTP_PROCESS_META, metaKey); LOGGER.info(check.toString()); assertNotNull(check); assertNotNull(check.get(metaKey)); - assertEquals(true, check.get(metaKey).size()>0); + assertEquals(true, check.get(metaKey).size() == 8); } } diff --git a/zeppelin-plugins/pom.xml b/zeppelin-plugins/pom.xml index 6c7aea9..727acd7 100644 --- a/zeppelin-plugins/pom.xml +++ b/zeppelin-plugins/pom.xml @@ -47,6 +47,7 @@ launcher/standard launcher/k8s-standard launcher/spark + launcher/cluster diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java index cf9074f..1e180d8 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java @@ -31,7 +31,6 @@ import javax.servlet.ServletContextListener; import org.apache.commons.lang.StringUtils; import org.apache.shiro.web.env.EnvironmentLoaderListener; import org.apache.shiro.web.servlet.ShiroFilter; -import org.apache.zeppelin.cluster.ClusterManager; import org.apache.zeppelin.cluster.ClusterManagerServer; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; @@ -43,8 +42,8 @@ import org.apache.zeppelin.helium.HeliumBundleFactory; import org.apache.zeppelin.interpreter.InterpreterFactory; import org.apache.zeppelin.interpreter.InterpreterOutput; import org.apache.zeppelin.interpreter.InterpreterSettingManager; +import org.apache.zeppelin.interpreter.recovery.NullRecoveryStorage; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener; -import org.apache.zeppelin.interpreter.thrift.ClusterManagerService; import org.apache.zeppelin.notebook.NoteEventListener; import org.apache.zeppelin.notebook.Notebook; import org.apache.zeppelin.notebook.AuthorizationService; @@ -53,6 +52,7 @@ import org.apache.zeppelin.notebook.repo.NotebookRepoSync; import org.apache.zeppelin.notebook.scheduler.NoSchedulerService; import org.apache.zeppelin.notebook.scheduler.QuartzSchedulerService; import org.apache.zeppelin.notebook.scheduler.SchedulerService; +import org.apache.zeppelin.plugin.PluginManager; import org.apache.zeppelin.rest.exception.WebApplicationExceptionMapper; import org.apache.zeppelin.search.LuceneSearch; import org.apache.zeppelin.search.SearchService; @@ -96,10 +96,10 @@ public class ZeppelinServer extends ResourceConfig { public static Server jettyWebServer; public static ServiceLocator sharedServiceLocator; + private static ZeppelinConfiguration conf = ZeppelinConfiguration.create(); + @Inject public ZeppelinServer() { - ZeppelinConfiguration conf = ZeppelinConfiguration.create(); - InterpreterOutput.limit = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT); packages("org.apache.zeppelin.rest"); @@ -162,10 +162,6 @@ public class ZeppelinServer extends ResourceConfig { .to(NoteEventListener.class) .to(WebSocketServlet.class) .in(Singleton.class); - bindAsContract(ClusterManagerServer.class) - .to(ClusterManager.class) - .to(ClusterManagerService.Iface.class) - .in(Singleton.class); if (conf.isZeppelinNotebookCronEnable()) { bind(QuartzSchedulerService.class).to(SchedulerService.class).in(Singleton.class); } else { @@ -356,9 +352,9 @@ public class ZeppelinServer extends ResourceConfig { } private static void setupClusterManagerServer(ServiceLocator serviceLocator) { - InterpreterFactory interpreterFactory - = sharedServiceLocator.getService(InterpreterFactory.class); - sharedServiceLocator.getService(ClusterManagerServer.class).start(interpreterFactory); + if (conf.isClusterMode()) { + ClusterManagerServer.getInstance().start(); + } } private static SslContextFactory getSslContextFactory(ZeppelinConfiguration conf) { diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java index 8eebce4..2fbef23 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java @@ -670,6 +670,8 @@ public class InterpreterSetting { public String getLauncherPlugin() { if (isRunningOnKubernetes()) { return "K8sStandardInterpreterLauncher"; + } else if (isRunningOnCluster()) { + return "ClusterInterpreterLauncher"; } else { if (group.equals("spark")) { return "SparkInterpreterLauncher"; @@ -683,6 +685,10 @@ public class InterpreterSetting { return conf.getRunMode() == ZeppelinConfiguration.RUN_MODE.K8S; } + private boolean isRunningOnCluster() { + return conf.isClusterMode(); + } + public boolean isUserAuthorized(List userAndRoles) { if (!option.permissionIsSet()) { return true; diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java index 02e8fd0..6d91f1d 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java @@ -173,6 +173,10 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess { return interpreterSettingName; } + public String getInterpreterGroupId() { + return interpreterGroupId; + } + @VisibleForTesting public String getInterpreterRunner() { return interpreterRunner;