Return-Path: X-Original-To: apmail-apex-commits-archive@minotaur.apache.org Delivered-To: apmail-apex-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 75BD1196E4 for ; Sat, 9 Apr 2016 03:52:40 +0000 (UTC) Received: (qmail 11562 invoked by uid 500); 9 Apr 2016 03:52:40 -0000 Delivered-To: apmail-apex-commits-archive@apex.apache.org Received: (qmail 11529 invoked by uid 500); 9 Apr 2016 03:52:40 -0000 Mailing-List: contact commits-help@apex.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@apex.incubator.apache.org Delivered-To: mailing list commits@apex.incubator.apache.org Received: (qmail 11520 invoked by uid 99); 9 Apr 2016 03:52:40 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 09 Apr 2016 03:52:40 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id C7FD6C023C for ; Sat, 9 Apr 2016 03:52:39 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -3.221 X-Spam-Level: X-Spam-Status: No, score=-3.221 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id A65v30uR3kyu for ; Sat, 9 Apr 2016 03:52:24 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id AF29E5FAF2 for ; Sat, 9 Apr 2016 03:52:20 +0000 (UTC) Received: (qmail 10379 invoked by uid 99); 9 Apr 2016 03:52:19 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 09 Apr 2016 03:52:19 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 924EFE09C7; Sat, 9 Apr 2016 03:52:19 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: thw@apache.org To: commits@apex.incubator.apache.org Date: Sat, 09 Apr 2016 03:52:33 -0000 Message-Id: In-Reply-To: <6d2c0bb1da5b47328340d962c1dcb2ed@git.apache.org> References: <6d2c0bb1da5b47328340d962c1dcb2ed@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [15/16] incubator-apex-core git commit: APEXCORE-423 style fix for apex-core engine http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/85db448e/engine/src/main/java/com/datatorrent/stram/StramUtils.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StramUtils.java b/engine/src/main/java/com/datatorrent/stram/StramUtils.java index c25480e..8b413bc 100644 --- a/engine/src/main/java/com/datatorrent/stram/StramUtils.java +++ b/engine/src/main/java/com/datatorrent/stram/StramUtils.java @@ -40,8 +40,7 @@ public abstract class StramUtils try { //return Class.forName(className).asSubclass(superClass); return Thread.currentThread().getContextClassLoader().loadClass(className).asSubclass(superClass); - } - catch (ClassNotFoundException e) { + } catch (ClassNotFoundException e) { throw new IllegalArgumentException("Class not found: " + className, e); } } @@ -50,16 +49,14 @@ public abstract class StramUtils { try { return clazz.newInstance(); - } - catch (IllegalAccessException e) { + } catch (IllegalAccessException e) { throw new IllegalArgumentException("Failed to instantiate " + clazz, e); - } - catch (InstantiationException e) { + } catch (InstantiationException e) { throw new IllegalArgumentException("Failed to instantiate " + clazz, e); } } - public static abstract class YarnContainerMain + public abstract static class YarnContainerMain { static { // set system properties so they can be used in logger configuration @@ -72,7 +69,7 @@ public abstract class StramUtils System.setProperty("hadoop.log.file", "dt.log"); if (envs.get("CDH_YARN_HOME") != null) { // map logging properties to what CHD expects out of the box - String[] keys = new String[] { "log.dir", "log.file", "root.logger" }; + String[] keys = new String[]{"log.dir", "log.file", "root.logger"}; for (String key : keys) { String v = System.getProperty("hadoop." + key); if (v != null) { http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/85db448e/engine/src/main/java/com/datatorrent/stram/StreamingAppMaster.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMaster.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMaster.java index e060acf..7598b4f 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMaster.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMaster.java @@ -21,6 +21,9 @@ package com.datatorrent.stram; import java.io.StringWriter; import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.HelpFormatter; @@ -32,8 +35,6 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import com.datatorrent.stram.debug.StdOutErrLog; import com.datatorrent.stram.util.VersionInfo; http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/85db448e/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java index 075836a..1c7c893 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java @@ -123,7 +123,7 @@ public class StreamingAppMasterService extends CompositeService private static final long DELEGATION_TOKEN_MAX_LIFETIME = Long.MAX_VALUE / 2; private static final long DELEGATION_TOKEN_RENEW_INTERVAL = Long.MAX_VALUE / 2; private static final long DELEGATION_TOKEN_REMOVER_SCAN_INTERVAL = 24 * 60 * 60 * 1000; - private static final int UPDATE_NODE_REPORTS_INTERVAL = 10* 60 * 1000; + private static final int UPDATE_NODE_REPORTS_INTERVAL = 10 * 60 * 1000; private AMRMClient amRmClient; private NMClientAsync nmClient; private LogicalPlan dag; @@ -145,7 +145,7 @@ public class StreamingAppMasterService extends CompositeService private final Map failedContainerNodesMap = Maps.newHashMap(); // Count of failed containers private final AtomicInteger numFailedContainers = new AtomicInteger(); - private final ConcurrentLinkedQueue pendingTasks = new ConcurrentLinkedQueue(); + private final ConcurrentLinkedQueue pendingTasks = new ConcurrentLinkedQueue<>(); // child container callback private StreamingContainerParent heartbeatListener; private StreamingContainerManager dnmgr; @@ -600,9 +600,9 @@ public class StreamingAppMasterService extends CompositeService // start web service try { org.mortbay.log.Log.setLog(null); - } - catch (Throwable throwable) { - // SPOI-2687. As part of Pivotal Certification, we need to catch ClassNotFoundException as Pivotal was using Jetty 7 where as other distros are using Jetty 6. + } catch (Throwable throwable) { + // SPOI-2687. As part of Pivotal Certification, we need to catch ClassNotFoundException as Pivotal was using + // Jetty 7 where as other distros are using Jetty 6. // LOG.error("can't set the log to null: ", throwable); } @@ -616,8 +616,7 @@ public class StreamingAppMasterService extends CompositeService LOG.info("Started web service at port: " + webApp.port()); this.appMasterTrackingUrl = NetUtils.getConnectAddress(webApp.getListenerAddress()).getHostName() + ":" + webApp.port(); LOG.info("Setting tracking URL to: " + appMasterTrackingUrl); - } - catch (Exception e) { + } catch (Exception e) { LOG.error("Webapps failed to start. Ignoring for now:", e); } } @@ -646,8 +645,7 @@ public class StreamingAppMasterService extends CompositeService try { StreamingContainer.eventloop.start(); execute(); - } - finally { + } finally { StreamingContainer.eventloop.stop(); } return status; @@ -700,7 +698,7 @@ public class StreamingAppMasterService extends CompositeService int loopCounter = -1; long nodeReportUpdateTime = 0; - List releasedContainers = new ArrayList(); + List releasedContainers = new ArrayList<>(); int numTotalContainers = 0; // keep track of already requested containers to not request them again while waiting for allocation int numRequestedContainers = 0; @@ -723,7 +721,7 @@ public class StreamingAppMasterService extends CompositeService if (ar != null) { appDone = true; dnmgr.shutdownDiagnosticsMessage = String.format("Application master failed due to application %s with duplicate application name \"%s\" by the same user \"%s\" is already started.", - ar.getApplicationId().toString(), ar.getName(), ar.getUser()); + ar.getApplicationId().toString(), ar.getName(), ar.getUser()); LOG.info("Forced shutdown due to {}", dnmgr.shutdownDiagnosticsMessage); finishApplication(FinalApplicationStatus.FAILED, numTotalContainers); return; @@ -741,8 +739,8 @@ public class StreamingAppMasterService extends CompositeService checkContainerStatus(); FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED; final InetSocketAddress rmAddress = conf.getSocketAddr(YarnConfiguration.RM_ADDRESS, - YarnConfiguration.DEFAULT_RM_ADDRESS, - YarnConfiguration.DEFAULT_RM_PORT); + YarnConfiguration.DEFAULT_RM_ADDRESS, + YarnConfiguration.DEFAULT_RM_PORT); while (!appDone) { loopCounter++; @@ -779,8 +777,8 @@ public class StreamingAppMasterService extends CompositeService } // Setup request to be sent to RM to allocate containers - List containerRequests = new ArrayList(); - List removedContainerRequests = new ArrayList(); + List containerRequests = new ArrayList<>(); + List removedContainerRequests = new ArrayList<>(); // request containers for pending deploy requests if (!dnmgr.containerStartRequests.isEmpty()) { @@ -826,7 +824,7 @@ public class StreamingAppMasterService extends CompositeService resourceRequestor.reissueContainerRequests(amRmClient, requestedResources, loopCounter, resourceRequestor, containerRequests, removedContainerRequests); /* Remove nodes from blacklist after timeout */ - List blacklistRemovals = new ArrayList(); + List blacklistRemovals = new ArrayList<>(); for (String hostname : failedBlackListedNodes) { Long timeDiff = currentTimeMillis - failedContainerNodesMap.get(hostname).blackListAdditionTime; if (timeDiff >= blacklistRemovalTime) { @@ -895,8 +893,7 @@ public class StreamingAppMasterService extends CompositeService // allocated container no longer needed, add release request LOG.warn("Container {} allocated but nothing to deploy, going to release this container.", allocatedContainer.getId()); releasedContainers.add(allocatedContainer.getId()); - } - else { + } else { AllocatedContainer allocatedContainerHolder = new AllocatedContainer(allocatedContainer); this.allocatedContainers.put(allocatedContainer.getId().toString(), allocatedContainerHolder); ByteBuffer tokens = null; @@ -927,7 +924,7 @@ public class StreamingAppMasterService extends CompositeService // Check the completed containers List completedContainers = amResp.getCompletedContainersStatuses(); // LOG.debug("Got response from RM for container ask, completedCnt=" + completedContainers.size()); - List blacklistAdditions = new ArrayList(); + List blacklistAdditions = new ArrayList<>(); for (ContainerStatus containerStatus : completedContainers) { LOG.info("Completed containerId=" + containerStatus.getContainerId() + ", state=" + containerStatus.getState() + ", exitStatus=" + containerStatus.getExitStatus() + ", diagnostics=" + containerStatus.getDiagnostics()); @@ -983,8 +980,7 @@ public class StreamingAppMasterService extends CompositeService LOG.debug("Container {} failed or killed.", containerStatus.getContainerId()); dnmgr.scheduleContainerRestart(containerStatus.getContainerId().toString()); // } - } - else { + } else { // container completed successfully numCompletedContainers.incrementAndGet(); LOG.info("Container completed successfully." + ", containerId=" + containerStatus.getContainerId()); @@ -1017,8 +1013,7 @@ public class StreamingAppMasterService extends CompositeService LOG.info("Forced shutdown due to {}", dnmgr.shutdownDiagnosticsMessage); finalStatus = FinalApplicationStatus.FAILED; appDone = true; - } - else if (allocatedContainers.isEmpty() && numRequestedContainers == 0 && dnmgr.containerStartRequests.isEmpty()) { + } else if (allocatedContainers.isEmpty() && numRequestedContainers == 0 && dnmgr.containerStartRequests.isEmpty()) { LOG.debug("Exiting as no more containers are allocated or requested"); finalStatus = FinalApplicationStatus.SUCCEEDED; appDone = true; @@ -1058,7 +1053,7 @@ public class StreamingAppMasterService extends CompositeService { StramDelegationTokenIdentifier identifier = new StramDelegationTokenIdentifier(new Text(username), new Text(""), new Text("")); String service = address.getAddress().getHostAddress() + ":" + address.getPort(); - Token stramToken = new Token(identifier, delegationTokenManager); + Token stramToken = new Token<>(identifier, delegationTokenManager); stramToken.setService(new Text(service)); return stramToken; } @@ -1198,7 +1193,7 @@ public class StreamingAppMasterService extends CompositeService private class AllocatedContainer { - final private Container container; + private final Container container; private boolean stopRequested; private Token delegationToken; http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/85db448e/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java index 81dc96e..fc2fb17 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java @@ -19,17 +19,23 @@ package com.datatorrent.stram; import java.net.InetSocketAddress; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; -import com.google.common.collect.Sets; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import com.google.common.collect.Sets; + import com.datatorrent.api.Context.PortContext; import com.datatorrent.api.DAG.Locality; import com.datatorrent.api.InputOperator; @@ -38,7 +44,6 @@ import com.datatorrent.api.Operator.ProcessingMode; import com.datatorrent.api.StorageAgent; import com.datatorrent.api.StreamCodec; import com.datatorrent.api.annotation.Stateless; - import com.datatorrent.stram.api.Checkpoint; import com.datatorrent.stram.api.OperatorDeployInfo; import com.datatorrent.stram.api.OperatorDeployInfo.InputDeployInfo; @@ -67,19 +72,23 @@ import com.datatorrent.stram.webapp.ContainerInfo; * * @since 0.3.2 */ -public class StreamingContainerAgent { +public class StreamingContainerAgent +{ private static final Logger LOG = LoggerFactory.getLogger(StreamingContainerAgent.class); - public static class ContainerStartRequest { + public static class ContainerStartRequest + { final PTContainer container; - ContainerStartRequest(PTContainer container) { + ContainerStartRequest(PTContainer container) + { this.container = container; } } - - public StreamingContainerAgent(PTContainer container, StreamingContainerContext initCtx, StreamingContainerManager dnmgr) { + public StreamingContainerAgent(PTContainer container, StreamingContainerContext initCtx, StreamingContainerManager + dnmgr) + { this.container = container; this.initCtx = initCtx; this.memoryMBFree = this.container.getAllocatedMemoryMB(); @@ -102,9 +111,10 @@ public class StreamingContainerAgent { long gcCollectionTime; final StreamingContainerManager dnmgr; - private final ConcurrentLinkedQueue operatorRequests = new ConcurrentLinkedQueue(); + private final ConcurrentLinkedQueue operatorRequests = new ConcurrentLinkedQueue<>(); - public StreamingContainerContext getInitContext() { + public StreamingContainerContext getInitContext() + { return initCtx; } @@ -113,7 +123,8 @@ public class StreamingContainerAgent { return container; } - public boolean hasPendingWork() { + public boolean hasPendingWork() + { for (PTOperator oper : container.getOperators()) { if (oper.getState() == PTOperator.State.PENDING_DEPLOY) { return true; @@ -122,29 +133,33 @@ public class StreamingContainerAgent { return false; } - public void addOperatorRequest(StramToNodeRequest r) { + public void addOperatorRequest(StramToNodeRequest r) + { LOG.info("Adding operator request {} {}", container.getExternalId(), r); this.operatorRequests.add(r); } @SuppressWarnings("ReturnOfCollectionOrArrayField") - protected ConcurrentLinkedQueue getOperatorRequests() { + protected ConcurrentLinkedQueue getOperatorRequests() + { return this.operatorRequests; } /** * Create deploy info for StramChild. + * * @param operators * @return StreamingContainerContext */ - public List getDeployInfoList(Collection operators) { + public List getDeployInfoList(Collection operators) + { if (container.bufferServerAddress == null) { throw new AssertionError("No buffer server address assigned"); } - Map nodes = new LinkedHashMap(); - HashSet publishers = new HashSet(); + Map nodes = new LinkedHashMap<>(); + HashSet publishers = new HashSet<>(); PhysicalPlan physicalPlan = dnmgr.getPhysicalPlan(); @@ -156,8 +171,8 @@ public class StreamingContainerAgent { OperatorDeployInfo ndi = createOperatorDeployInfo(oper); nodes.put(ndi, oper); - ndi.inputs = new ArrayList(oper.getInputs().size()); - ndi.outputs = new ArrayList(oper.getOutputs().size()); + ndi.inputs = new ArrayList<>(oper.getInputs().size()); + ndi.outputs = new ArrayList<>(oper.getOutputs().size()); for (PTOperator.PTOutput out : oper.getOutputs()) { final StreamMeta streamMeta = out.logicalStream; @@ -168,8 +183,7 @@ public class StreamingContainerAgent { try { portInfo.contextAttributes = streamMeta.getSource().getAttributes().clone(); - } - catch (CloneNotSupportedException ex) { + } catch (CloneNotSupportedException ex) { throw new RuntimeException("Cannot clone attributes", ex); } @@ -279,7 +293,7 @@ public class StreamingContainerAgent { } } - return new ArrayList(nodes.keySet()); + return new ArrayList<>(nodes.keySet()); } public static InputPortMeta getInputPortMeta(LogicalPlan.OperatorMeta operatorMeta, StreamMeta streamMeta) @@ -349,7 +363,6 @@ public class StreamingContainerAgent { *

* * @return {@link com.datatorrent.stram.api.OperatorDeployInfo} - * */ private OperatorDeployInfo createOperatorDeployInfo(PTOperator oper) { @@ -359,13 +372,11 @@ public class StreamingContainerAgent { UnifierDeployInfo udi = new UnifierDeployInfo(); /* the constructor auto sets the type */ try { udi.operatorAttributes = oper.getUnifiedOperatorMeta().getAttributes().clone(); - } - catch (CloneNotSupportedException ex) { + } catch (CloneNotSupportedException ex) { throw new RuntimeException("Cannot clone unifier attributes", ex); } ndi = udi; - } - else { + } else { ndi = new OperatorDeployInfo(); Operator operator = oper.getOperatorMeta().getOperator(); if (operator instanceof InputOperator) { @@ -374,15 +385,14 @@ public class StreamingContainerAgent { if (!oper.getInputs().isEmpty()) { //If there are no input ports then it has to be an input operator. But if there are input ports then //we check if any input port is connected which would make it a Generic operator. - for (PTOperator.PTInput ptInput: oper.getInputs()) { + for (PTOperator.PTInput ptInput : oper.getInputs()) { if (ptInput.logicalStream != null && ptInput.logicalStream.getSource() != null) { ndi.type = OperatorType.GENERIC; break; } } } - } - else { + } else { ndi.type = OperatorType.GENERIC; } } @@ -410,8 +420,7 @@ public class StreamingContainerAgent { if (checkpoint == null || checkpoint.windowId != checkpointId) { checkpoint = new Checkpoint(checkpointId, 0, 0); } - } - catch (Exception e) { + } catch (Exception e) { throw new RuntimeException("Failed to determine checkpoint window id " + oper, e); } } @@ -423,8 +432,7 @@ public class StreamingContainerAgent { try { // clone map before modifying it ndi.contextAttributes = oper.getOperatorMeta().getAttributes().clone(); - } - catch (CloneNotSupportedException ex) { + } catch (CloneNotSupportedException ex) { throw new RuntimeException("Cannot clone operator attributes", ex); } if (oper.isOperatorStateLess()) { @@ -433,7 +441,8 @@ public class StreamingContainerAgent { return ndi; } - public ContainerInfo getContainerInfo() { + public ContainerInfo getContainerInfo() + { ContainerInfo ci = new ContainerInfo(); ci.id = container.getExternalId(); ci.host = container.host; @@ -449,8 +458,12 @@ public class StreamingContainerAgent { ci.finishedTime = container.getFinishedTime(); if (this.container.nodeHttpAddress != null) { YarnConfiguration conf = new YarnConfiguration(); - ci.containerLogsUrl = ConfigUtils.getSchemePrefix(conf) + this.container.nodeHttpAddress + "/node/containerlogs/" + ci.id + "/" + System.getenv(ApplicationConstants.Environment.USER.toString()); - ci.rawContainerLogsUrl = ConfigUtils.getRawContainerLogsUrl(conf, container.nodeHttpAddress, container.getPlan().getLogicalPlan().getAttributes().get(LogicalPlan.APPLICATION_ID), ci.id); + ci.containerLogsUrl = ConfigUtils + .getSchemePrefix(conf) + this.container.nodeHttpAddress + "/node/containerlogs/" + ci.id + "/" + System + .getenv(ApplicationConstants.Environment.USER.toString()); + ci.rawContainerLogsUrl = ConfigUtils + .getRawContainerLogsUrl(conf, container.nodeHttpAddress, container.getPlan().getLogicalPlan().getAttributes() + .get(LogicalPlan.APPLICATION_ID), ci.id); } return ci; } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/85db448e/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java index eb3fb96..9e184e7 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java @@ -18,34 +18,47 @@ */ package com.datatorrent.stram; -import java.io.*; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.Serializable; import java.lang.management.ManagementFactory; import java.lang.reflect.Field; import java.net.InetSocketAddress; import java.net.URI; -import java.util.*; -import java.util.concurrent.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.FutureTask; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nullable; -import com.datatorrent.netlet.util.DTThrowable; -import com.esotericsoftware.kryo.KryoException; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Objects; -import com.google.common.base.Predicate; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; - -import net.engio.mbassy.bus.MBassador; -import net.engio.mbassy.bus.config.BusConfiguration; - import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; @@ -72,13 +85,33 @@ import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.webapp.NotFoundException; -import com.datatorrent.api.*; +import com.esotericsoftware.kryo.KryoException; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Objects; +import com.google.common.base.Predicate; +import com.google.common.base.Throwables; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +import com.datatorrent.api.Attribute; +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Component; +import com.datatorrent.api.Context; import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.Operator; import com.datatorrent.api.Operator.InputPort; import com.datatorrent.api.Operator.OutputPort; import com.datatorrent.api.Stats.OperatorStats; +import com.datatorrent.api.StatsListener; +import com.datatorrent.api.StorageAgent; +import com.datatorrent.api.StreamCodec; +import com.datatorrent.api.StringCodec; import com.datatorrent.api.annotation.Stateless; - import com.datatorrent.bufferserver.auth.AuthManager; import com.datatorrent.bufferserver.util.Codec; import com.datatorrent.common.experimental.AppData; @@ -88,8 +121,21 @@ import com.datatorrent.common.util.NumberAggregate; import com.datatorrent.common.util.Pair; import com.datatorrent.stram.Journal.Recoverable; import com.datatorrent.stram.StreamingContainerAgent.ContainerStartRequest; -import com.datatorrent.stram.api.*; -import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.*; +import com.datatorrent.stram.api.AppDataSource; +import com.datatorrent.stram.api.Checkpoint; +import com.datatorrent.stram.api.ContainerContext; +import com.datatorrent.stram.api.OperatorDeployInfo; +import com.datatorrent.stram.api.StramEvent; +import com.datatorrent.stram.api.StramToNodeChangeLoggersRequest; +import com.datatorrent.stram.api.StramToNodeGetPropertyRequest; +import com.datatorrent.stram.api.StramToNodeSetPropertyRequest; +import com.datatorrent.stram.api.StramToNodeStartRecordingRequest; +import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerHeartbeat; +import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerHeartbeatResponse; +import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerStats; +import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.OperatorHeartbeat; +import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.StramToNodeRequest; +import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.StreamingContainerContext; import com.datatorrent.stram.engine.OperatorResponse; import com.datatorrent.stram.engine.StreamingContainer; import com.datatorrent.stram.engine.WindowGenerator; @@ -103,18 +149,30 @@ import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration; import com.datatorrent.stram.plan.logical.Operators; import com.datatorrent.stram.plan.logical.Operators.PortContextPair; import com.datatorrent.stram.plan.logical.requests.LogicalPlanRequest; -import com.datatorrent.stram.plan.physical.*; +import com.datatorrent.stram.plan.physical.OperatorStatus; import com.datatorrent.stram.plan.physical.OperatorStatus.PortStatus; +import com.datatorrent.stram.plan.physical.PTContainer; +import com.datatorrent.stram.plan.physical.PTOperator; import com.datatorrent.stram.plan.physical.PTOperator.PTInput; import com.datatorrent.stram.plan.physical.PTOperator.PTOutput; import com.datatorrent.stram.plan.physical.PTOperator.State; +import com.datatorrent.stram.plan.physical.PhysicalPlan; import com.datatorrent.stram.plan.physical.PhysicalPlan.PlanContext; +import com.datatorrent.stram.plan.physical.PlanModifier; import com.datatorrent.stram.util.ConfigUtils; import com.datatorrent.stram.util.FSJsonLineFile; import com.datatorrent.stram.util.MovingAverage.MovingAverageLong; import com.datatorrent.stram.util.SharedPubSubWebSocketClient; import com.datatorrent.stram.util.WebServicesClient; -import com.datatorrent.stram.webapp.*; +import com.datatorrent.stram.webapp.ContainerInfo; +import com.datatorrent.stram.webapp.LogicalOperatorInfo; +import com.datatorrent.stram.webapp.OperatorAggregationInfo; +import com.datatorrent.stram.webapp.OperatorInfo; +import com.datatorrent.stram.webapp.PortInfo; +import com.datatorrent.stram.webapp.StreamInfo; + +import net.engio.mbassy.bus.MBassador; +import net.engio.mbassy.bus.config.BusConfiguration; /** * Tracks topology provisioning/allocation to containers

@@ -129,20 +187,19 @@ import com.datatorrent.stram.webapp.*; */ public class StreamingContainerManager implements PlanContext { - private final static Logger LOG = LoggerFactory.getLogger(StreamingContainerManager.class); - public final static String GATEWAY_LOGIN_URL_PATH = "/ws/v2/login"; - public final static String BUILTIN_APPDATA_URL = "builtin"; - public final static String CONTAINERS_INFO_FILENAME_FORMAT = "containers_%d.json"; - public final static String OPERATORS_INFO_FILENAME_FORMAT = "operators_%d.json"; - public final static String APP_META_FILENAME = "meta.json"; - public final static String APP_META_KEY_ATTRIBUTES = "attributes"; - public final static String APP_META_KEY_METRICS = "metrics"; + private static final Logger LOG = LoggerFactory.getLogger(StreamingContainerManager.class); + public static final String GATEWAY_LOGIN_URL_PATH = "/ws/v2/login"; + public static final String BUILTIN_APPDATA_URL = "builtin"; + public static final String CONTAINERS_INFO_FILENAME_FORMAT = "containers_%d.json"; + public static final String OPERATORS_INFO_FILENAME_FORMAT = "operators_%d.json"; + public static final String APP_META_FILENAME = "meta.json"; + public static final String APP_META_KEY_ATTRIBUTES = "attributes"; + public static final String APP_META_KEY_METRICS = "metrics"; public static final String EMBEDDABLE_QUERY_NAME_SUFFIX = ".query"; - public final static long LATENCY_WARNING_THRESHOLD_MILLIS = 10 * 60 * 1000; // 10 minutes - public final static Recoverable SET_OPERATOR_PROPERTY = new SetOperatorProperty(); - public final static Recoverable SET_PHYSICAL_OPERATOR_PROPERTY = new SetPhysicalOperatorProperty(); - public final static int METRIC_QUEUE_SIZE = 1000; + public static final Recoverable SET_OPERATOR_PROPERTY = new SetOperatorProperty(); + public static final Recoverable SET_PHYSICAL_OPERATOR_PROPERTY = new SetPhysicalOperatorProperty(); + public static final int METRIC_QUEUE_SIZE = 1000; private final FinalVars vars; private final PhysicalPlan plan; @@ -150,27 +207,27 @@ public class StreamingContainerManager implements PlanContext private SharedPubSubWebSocketClient wsClient; private FSStatsRecorder statsRecorder; private FSEventRecorder eventRecorder; - protected final Map containerStopRequests = new ConcurrentHashMap(); - protected final ConcurrentLinkedQueue containerStartRequests = new ConcurrentLinkedQueue(); + protected final Map containerStopRequests = new ConcurrentHashMap<>(); + protected final ConcurrentLinkedQueue containerStartRequests = new ConcurrentLinkedQueue<>(); protected boolean forcedShutdown = false; - private final ConcurrentLinkedQueue eventQueue = new ConcurrentLinkedQueue(); + private final ConcurrentLinkedQueue eventQueue = new ConcurrentLinkedQueue<>(); private final AtomicBoolean eventQueueProcessing = new AtomicBoolean(); private final HashSet pendingAllocation = Sets.newLinkedHashSet(); protected String shutdownDiagnosticsMessage = ""; private long lastResourceRequest = 0; - private final Map containers = new ConcurrentHashMap(); - private final List> purgeCheckpoints = new ArrayList>(); + private final Map containers = new ConcurrentHashMap<>(); + private final List> purgeCheckpoints = new ArrayList<>(); private Map> checkpointGroups; private final Map> shutdownOperators = new HashMap<>(); private CriticalPathInfo criticalPathInfo; - private final ConcurrentMap reportStats = Maps.newConcurrentMap(); + private final ConcurrentMap reportStats = new ConcurrentHashMap<>(); private final AtomicBoolean deployChangeInProgress = new AtomicBoolean(); private int deployChangeCnt; private MBassador eventBus; // event bus for publishing stram events - final private Journal journal; + private final Journal journal; private RecoveryHandler recoveryHandler; // window id to node id to end window stats - private final ConcurrentSkipListMap> endWindowStatsOperatorMap = new ConcurrentSkipListMap>(); + private final ConcurrentSkipListMap> endWindowStatsOperatorMap = new ConcurrentSkipListMap<>(); private final ConcurrentMap slowestUpstreamOp = new ConcurrentHashMap<>(); private long committedWindowId; // (operator id, port name) to timestamp @@ -179,12 +236,11 @@ public class StreamingContainerManager implements PlanContext private long lastStatsTimestamp = System.currentTimeMillis(); private long currentEndWindowStatsWindowId; private long completeEndWindowStatsWindowId; - private final ConcurrentHashMap rpcLatencies = new ConcurrentHashMap(); + private final ConcurrentHashMap rpcLatencies = new ConcurrentHashMap<>(); private final AtomicLong nodeToStramRequestIds = new AtomicLong(1); private int allocatedMemoryMB = 0; private List appDataSources = null; private final Cache commandResponse = CacheBuilder.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES).build(); - private long lastLatencyWarningTime; private transient ExecutorService poolExecutor; private FileContext fileContext; @@ -223,7 +279,7 @@ public class StreamingContainerManager implements PlanContext private static class EndWindowStats { long emitTimestamp = -1; - HashMap dequeueTimestamps = new HashMap(); // input port name to end window dequeue time + HashMap dequeueTimestamps = new HashMap<>(); // input port name to end window dequeue time Object counters; Map metrics; } @@ -253,9 +309,9 @@ public class StreamingContainerManager implements PlanContext private static class SetOperatorProperty implements Recoverable { - final private String operatorName; - final private String propertyName; - final private String propertyValue; + private final String operatorName; + private final String propertyName; + private final String propertyValue; private SetOperatorProperty() { @@ -298,9 +354,9 @@ public class StreamingContainerManager implements PlanContext private static class SetPhysicalOperatorProperty implements Recoverable { - final private int operatorId; - final private String propertyName; - final private String propertyValue; + private final int operatorId; + private final String propertyName; + private final String propertyValue; private SetPhysicalOperatorProperty() { @@ -357,7 +413,7 @@ public class StreamingContainerManager implements PlanContext poolExecutor = Executors.newFixedThreadPool(4); // setup prior to plan creation for event recording if (enableEventRecording) { - this.eventBus = new MBassador(BusConfiguration.Default(1, 1, 1)); + this.eventBus = new MBassador<>(BusConfiguration.Default(1, 1, 1)); } this.plan = new PhysicalPlan(dag, this); this.journal = new Journal(this); @@ -370,7 +426,7 @@ public class StreamingContainerManager implements PlanContext this.clock = new SystemClock(); poolExecutor = Executors.newFixedThreadPool(4); this.plan = checkpointedState.physicalPlan; - this.eventBus = new MBassador(BusConfiguration.Default(1, 1, 1)); + this.eventBus = new MBassador<>(BusConfiguration.Default(1, 1, 1)); this.journal = new Journal(this); init(enableEventRecording); } @@ -393,11 +449,12 @@ public class StreamingContainerManager implements PlanContext fileName = String.format(OPERATORS_INFO_FILENAME_FORMAT, plan.getLogicalPlan().getValue(LogicalPlan.APPLICATION_ATTEMPT_ID)); this.operatorFile = new FSJsonLineFile(fileContext, new Path(this.vars.appPath, fileName), FsPermission.getDefault()); } catch (IOException ex) { - throw DTThrowable.wrapIfChecked(ex); + throw Throwables.propagate(ex); } } - public Journal getJournal() { + public Journal getJournal() + { return journal; } @@ -431,8 +488,7 @@ public class StreamingContainerManager implements PlanContext } else { LOG.warn("Could not determine the memory allocated for the streaming application master. Node manager is reporting {} MB from {}", totalMemoryNeededMB, url); } - } - catch (Exception ex) { + } catch (Exception ex) { LOG.warn("Could not determine the memory allocated for the streaming application master", ex); } } @@ -501,8 +557,7 @@ public class StreamingContainerManager implements PlanContext wsClient.setPassword(gatewayPassword); } wsClient.setup(); - } - catch (Exception ex) { + } catch (Exception ex) { LOG.warn("Cannot establish websocket connection to {}", gatewayAddress, ex); } } @@ -522,7 +577,7 @@ public class StreamingContainerManager implements PlanContext IOUtils.closeQuietly(containerFile); IOUtils.closeQuietly(operatorFile); - if(poolExecutor != null) { + if (poolExecutor != null) { poolExecutor.shutdown(); } } @@ -582,7 +637,7 @@ public class StreamingContainerManager implements PlanContext { synchronized (appDataSourcesLock) { if (appDataSources == null) { - appDataSources = new ArrayList(); + appDataSources = new ArrayList<>(); operators: for (LogicalPlan.OperatorMeta operatorMeta : plan.getLogicalPlan().getAllOperators()) { Map inputStreams = operatorMeta.getInputStreams(); @@ -608,7 +663,7 @@ public class StreamingContainerManager implements PlanContext } //Discover separate query operators - LOG.warn("DEBUG: looking at operator {} {}", operatorMeta.getName(), Thread.currentThread().getId()); + LOG.debug("Looking at operator {} {}", operatorMeta.getName(), Thread.currentThread().getId()); for (Map.Entry entry : inputStreams.entrySet()) { LogicalPlan.InputPortMeta portMeta = entry.getKey(); if (portMeta.isAppDataQueryPort()) { @@ -621,9 +676,8 @@ public class StreamingContainerManager implements PlanContext queryUrl = queryOperator.getAppDataURL(); queryTopic = queryOperator.getTopic(); } else { - LOG.warn("An embeddable query connector and the {} query operator were discovered. " + - "The query operator will be ignored and the embeddable query connector will be used instead.", - operatorMeta.getName()); + LOG.warn("An embeddable query connector and the {} query operator were discovered. The query operator will be ignored and the embeddable query connector will be used instead.", + operatorMeta.getName()); } } } else { @@ -635,7 +689,7 @@ public class StreamingContainerManager implements PlanContext for (Map.Entry entry : outputStreams.entrySet()) { LogicalPlan.OutputPortMeta portMeta = entry.getKey(); - LOG.warn("DEBUG: looking at port {} {}", portMeta.getPortName(), Thread.currentThread().getId()); + LOG.debug("Looking at port {} {}", portMeta.getPortName(), Thread.currentThread().getId()); if (portMeta.isAppDataResultPort()) { AppDataSource appDataSource = new AppDataSource(); @@ -661,7 +715,7 @@ public class StreamingContainerManager implements PlanContext } OperatorMeta resultOperatorMeta = sinks.get(0).getOperatorWrapper(); if (resultOperatorMeta.getOperator() instanceof AppData.ConnectionInfoProvider) { - AppData.ConnectionInfoProvider resultOperator = (AppData.ConnectionInfoProvider) resultOperatorMeta.getOperator(); + AppData.ConnectionInfoProvider resultOperator = (AppData.ConnectionInfoProvider)resultOperatorMeta.getOperator(); appDataSource.setResultOperatorName(resultOperatorMeta.getName()); appDataSource.setResultTopic(resultOperator.getTopic()); appDataSource.setResultUrl(convertAppDataUrl(resultOperator.getAppDataURL())); @@ -673,7 +727,7 @@ public class StreamingContainerManager implements PlanContext LOG.warn("Result operator for the App Data Source {}.{} does not implement the right interface. Ignoring the App Data Source.", operatorMeta.getName(), portMeta.getPortName()); continue; } - LOG.warn("DEBUG: Adding appDataSource {} {}", appDataSource.getName(), Thread.currentThread().getId()); + LOG.debug("Adding appDataSource {} {}", appDataSource.getName(), Thread.currentThread().getId()); appDataSources.add(appDataSource); } } @@ -708,8 +762,7 @@ public class StreamingContainerManager implements PlanContext } shutdownAllContainers(msg); this.forcedShutdown = true; - } - else { + } else { for (PTContainer c : pendingAllocation) { LOG.debug("Waiting for resource: {}m {}", c.getRequiredMemoryMB(), c); } @@ -728,8 +781,7 @@ public class StreamingContainerManager implements PlanContext LOG.info("Container {}@{} startup timeout ({} ms).", c.getExternalId(), c.host, currentTms - sca.createdMillis); containerStopRequests.put(c.getExternalId(), c.getExternalId()); } - } - else { + } else { if (currentTms - sca.lastHeartbeatMillis > this.vars.heartbeatTimeoutMillis) { if (!isApplicationIdle()) { // request stop (kill) as process may still be hanging around (would have been detected by Yarn otherwise) @@ -756,8 +808,7 @@ public class StreamingContainerManager implements PlanContext try { statsRecorder.recordContainers(containers, currentTms); statsRecorder.recordOperators(getOperatorInfoList(), currentTms); - } - catch (Exception ex) { + } catch (Exception ex) { LOG.warn("Exception caught when recording stats", ex); } } @@ -771,8 +822,8 @@ public class StreamingContainerManager implements PlanContext LOG.warn("Some operators are behind for more than {} windows! Trimming the end window stats map", this.vars.maxWindowsBehindForStats); while (endWindowStatsOperatorMap.size() > this.vars.maxWindowsBehindForStats) { LOG.debug("Removing incomplete end window stats for window id {}. Collected operator set: {}. Complete set: {}", - endWindowStatsOperatorMap.firstKey(), - endWindowStatsOperatorMap.get(endWindowStatsOperatorMap.firstKey()).keySet(), allCurrentOperators); + endWindowStatsOperatorMap.firstKey(), + endWindowStatsOperatorMap.get(endWindowStatsOperatorMap.firstKey()).keySet(), allCurrentOperators); endWindowStatsOperatorMap.remove(endWindowStatsOperatorMap.firstKey()); } } @@ -791,17 +842,14 @@ public class StreamingContainerManager implements PlanContext if (windowId < completeEndWindowStatsWindowId) { LOG.debug("Disregarding stale end window stats for window {}", windowId); endWindowStatsOperatorMap.remove(windowId); - } - else { + } else { break; } - } - else { + } else { endWindowStatsOperatorMap.remove(windowId); currentEndWindowStatsWindowId = windowId; } - } - else { + } else { // the old stats contains operators that do not exist any more // this is probably right after a partition happens. LOG.debug("Stats for non-existent operators detected. Disregarding end window stats for window {}", windowId); @@ -839,7 +887,7 @@ public class StreamingContainerManager implements PlanContext for (OperatorMeta operatorMeta : logicalOperators) { AutoMetric.Aggregator aggregator = operatorMeta.getMetricAggregatorMeta() != null ? - operatorMeta.getMetricAggregatorMeta().getAggregator() : null; + operatorMeta.getMetricAggregatorMeta().getAggregator() : null; if (aggregator == null) { continue; } @@ -878,7 +926,7 @@ public class StreamingContainerManager implements PlanContext logicalMetrics.put(operatorMeta.getName(), windowMetrics); } LOG.debug("Adding to logical metrics for {}", operatorMeta.getName()); - windowMetrics.add(new Pair>(windowId, lm)); + windowMetrics.add(new Pair<>(windowId, lm)); Map oldValue = latestLogicalMetrics.put(operatorMeta.getName(), lm); if (oldValue == null) { try { @@ -898,7 +946,8 @@ public class StreamingContainerManager implements PlanContext private void saveMetaInfo() throws IOException { Path file = new Path(this.vars.appPath, APP_META_FILENAME + "." + System.nanoTime()); - try (FSDataOutputStream os = fileContext.create(file, EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), Options.CreateOpts.CreateParent.createParent())) { JSONObject top = new JSONObject(); + try (FSDataOutputStream os = fileContext.create(file, EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), Options.CreateOpts.CreateParent.createParent())) { + JSONObject top = new JSONObject(); JSONObject attributes = new JSONObject(); for (Map.Entry, Object> entry : this.plan.getLogicalPlan().getAttributes().entrySet()) { attributes.put(entry.getKey().getSimpleName(), entry.getValue()); @@ -1244,8 +1293,7 @@ public class StreamingContainerManager implements PlanContext scc.attributes.put(ContainerContext.BUFFER_SERVER_TOKEN, container.getBufferServerToken()); scc.startWindowMillis = this.vars.windowStartMillis; return scc; - } - catch (CloneNotSupportedException ex) { + } catch (CloneNotSupportedException ex) { throw new RuntimeException("Cannot clone DAG attributes", ex); } } @@ -1282,15 +1330,14 @@ public class StreamingContainerManager implements PlanContext // operator expected active, check remote status if (ds == null) { sca.deployOpers.add(oper); - } - else { + } else { switch (ds) { case SHUTDOWN: // schedule operator deactivation against the windowId // will be processed once window is committed and all dependent operators completed processing long windowId = oper.stats.currentWindowId.get(); if (ohb.windowStats != null && !ohb.windowStats.isEmpty()) { - windowId = ohb.windowStats.get(ohb.windowStats.size()-1).windowId; + windowId = ohb.windowStats.get(ohb.windowStats.size() - 1).windowId; } LOG.debug("Operator {} deactivated at window {}", oper, windowId); synchronized (this.shutdownOperators) { @@ -1312,6 +1359,7 @@ public class StreamingContainerManager implements PlanContext recordEventAsync(new StramEvent.StopOperatorEvent(oper.getName(), oper.getId(), oper.getContainer().getExternalId())); break; case ACTIVE: + default: break; } } @@ -1322,8 +1370,7 @@ public class StreamingContainerManager implements PlanContext recordEventAsync(new StramEvent.StopOperatorEvent(oper.getName(), oper.getId(), oper.getContainer().getExternalId())); oper.setState(State.PENDING_DEPLOY); sca.deployOpers.add(oper); - } - else { + } else { // operator is currently deployed, request undeploy sca.undeployOpers.add(oper.getId()); slowestUpstreamOp.remove(oper); @@ -1333,8 +1380,7 @@ public class StreamingContainerManager implements PlanContext if (ds == null) { // operator to be deployed sca.deployOpers.add(oper); - } - else { + } else { // operator was deployed in container PTContainer container = oper.getContainer(); LOG.debug("{} marking deployed: {} remote status {}", container.getExternalId(), oper, ds); @@ -1371,15 +1417,13 @@ public class StreamingContainerManager implements PlanContext // existing container or sandbox container for just the operator LOG.error("Initiating container restart after operator failure {}", oper); containerStopRequests.put(oper.getContainer().getExternalId(), oper.getContainer().getExternalId()); - } - else { + } else { String msg = String.format("Shutdown after reaching failure threshold for %s", oper); LOG.warn(msg); shutdownAllContainers(msg); forcedShutdown = true; } - } - else { + } else { // should not get here LOG.warn("Failed operator {} {} {} to be undeployed by container", oper, oper.getState()); } @@ -1476,10 +1520,10 @@ public class StreamingContainerManager implements PlanContext if (shb.requestResponse != null) { for (StatsListener.OperatorResponse obj : shb.requestResponse) { if (obj instanceof OperatorResponse) { // This is to identify platform requests - commandResponse.put((Long) obj.getResponseId(), obj.getResponse()); + commandResponse.put((Long)obj.getResponseId(), obj.getResponse()); LOG.debug(" Got back the response {} for the request {}", obj, obj.getResponseId()); - } - else { // This is to identify user requests + } else { + // This is to identify user requests oper.stats.responses.add(obj); } } @@ -1520,7 +1564,7 @@ public class StreamingContainerManager implements PlanContext /* report checkpoint-ed WindowId status of the operator */ if (stats.checkpoint instanceof Checkpoint) { if (oper.getRecentCheckpoint() == null || oper.getRecentCheckpoint().windowId < stats.checkpoint.getWindowId()) { - addCheckpoint(oper, (Checkpoint) stats.checkpoint); + addCheckpoint(oper, (Checkpoint)stats.checkpoint); if (stats.checkpointStats != null) { status.checkpointStats = stats.checkpointStats; status.checkpointTimeMA.add(stats.checkpointStats.checkpointTime); @@ -1641,14 +1685,14 @@ public class StreamingContainerManager implements PlanContext endWindowStats.counters = stats.counters; } if (oper.getOperatorMeta().getMetricAggregatorMeta() != null && - oper.getOperatorMeta().getMetricAggregatorMeta().getAggregator() != null) { + oper.getOperatorMeta().getMetricAggregatorMeta().getAggregator() != null) { endWindowStats.metrics = stats.metrics; } if (stats.windowId > currentEndWindowStatsWindowId) { Map endWindowStatsMap = endWindowStatsOperatorMap.get(stats.windowId); if (endWindowStatsMap == null) { - endWindowStatsMap = new ConcurrentSkipListMap(); + endWindowStatsMap = new ConcurrentSkipListMap<>(); Map endWindowStatsMapPrevious = endWindowStatsOperatorMap.putIfAbsent(stats.windowId, endWindowStatsMap); if (endWindowStatsMapPrevious != null) { @@ -1729,9 +1773,9 @@ public class StreamingContainerManager implements PlanContext } status.tuplesProcessedPSMA.set(Math.round(tuplesProcessedPMSMA * 1000)); status.tuplesEmittedPSMA.set(Math.round(tuplesEmittedPMSMA * 1000)); - } - else { - //LOG.warn("This timestamp for {} is lower than the previous!! {} < {}", oper.getId(), maxEndWindowTimestamp, lastMaxEndWindowTimestamp); + } else { + //LOG.warn("This timestamp for {} is lower than the previous!! {} < {}", oper.getId(), + // maxEndWindowTimestamp, lastMaxEndWindowTimestamp); } operatorLastEndWindowTimestamps.put(oper.getId(), maxEndWindowTimestamp); status.listenerStats.add(statsList); @@ -1863,13 +1907,11 @@ public class StreamingContainerManager implements PlanContext if (li.previous().windowId != checkpoint.windowId) { li.add(checkpoint); } - } - else { + } else { node.checkpoints.add(checkpoint); } } - } - else { + } else { node.checkpoints.add(checkpoint); } } @@ -1878,8 +1920,8 @@ public class StreamingContainerManager implements PlanContext public static class UpdateCheckpointsContext { public final MutableLong committedWindowId = new MutableLong(Long.MAX_VALUE); - public final Set visited = new LinkedHashSet(); - public final Set blocked = new LinkedHashSet(); + public final Set visited = new LinkedHashSet<>(); + public final Set blocked = new LinkedHashSet<>(); public final long currentTms; public final boolean recovery; public final Map> checkpointGroups; @@ -2017,11 +2059,10 @@ public class StreamingContainerManager implements PlanContext while (checkpoints.size() > 1 && ((c2 = checkpoints.get(1)).windowId) <= maxCheckpoint.windowId) { checkpoints.removeFirst(); //LOG.debug("Checkpoint to delete: operator={} windowId={}", operator.getName(), c1); - this.purgeCheckpoints.add(new Pair(groupOper, c1.windowId)); + this.purgeCheckpoints.add(new Pair<>(groupOper, c1.windowId)); c1 = c2; } - } - else { + } else { if (ctx.recovery && checkpoints.isEmpty() && groupOper.isOperatorStateLess()) { LOG.debug("Adding checkpoint for stateless operator {} {}", groupOper, Codec.getStringWindowId(maxCheckpoint.windowId)); c1 = groupOper.addCheckpoint(maxCheckpoint.windowId, this.vars.windowStartMillis); @@ -2030,8 +2071,7 @@ public class StreamingContainerManager implements PlanContext } //LOG.debug("Operator {} checkpoints: commit {} recent {}", new Object[] {operator.getName(), c1, operator.checkpoints}); groupOper.setRecoveryCheckpoint(c1); - } - else { + } else { LOG.debug("Skipping checkpoint update {} during {}", groupOper, groupOper.getState()); } } @@ -2118,8 +2158,7 @@ public class StreamingContainerManager implements PlanContext { try { operator.getOperatorMeta().getValue(OperatorContext.STORAGE_AGENT).delete(operator.getId(), windowId); - } - catch (IOException ex) { + } catch (IOException ex) { LOG.error("Failed to purge checkpoint for operator {} for windowId {}", operator, windowId, ex); } } @@ -2133,7 +2172,7 @@ public class StreamingContainerManager implements PlanContext // address should be null only for a new container, in which case there should not be a purge request // TODO: logging added to find out how we got here LOG.warn("purge request w/o buffer server address source {} container {} checkpoints {}", - out, operator.getContainer(), operator.checkpoints); + out, operator.getContainer(), operator.checkpoints); continue; } @@ -2146,8 +2185,7 @@ public class StreamingContainerManager implements PlanContext BufferServerController bsc = getBufferServerClient(operator); try { bsc.purge(null, sourceIdentifier, operator.checkpoints.getFirst().windowId - 1); - } - catch (RuntimeException re) { + } catch (RuntimeException re) { LOG.warn("Failed to purge {} {}", bsc.addr, sourceIdentifier, re); } } @@ -2176,11 +2214,11 @@ public class StreamingContainerManager implements PlanContext private Map> groupByContainer(Collection operators) { - Map> m = new HashMap>(); + Map> m = new HashMap<>(); for (PTOperator node : operators) { List nodes = m.get(node.getContainer()); if (nodes == null) { - nodes = new ArrayList(); + nodes = new ArrayList<>(); m.put(node.getContainer(), nodes); } nodes.add(node); @@ -2247,8 +2285,7 @@ public class StreamingContainerManager implements PlanContext // ensures new subscriber starting to read from checkpoint will wait until publisher redeploy cycle is complete try { bsc.reset(null, sourceIdentifier, 0); - } - catch (Exception ex) { + } catch (Exception ex) { LOG.error("Failed to reset buffer server {} {}", sourceIdentifier, ex); } } @@ -2283,8 +2320,7 @@ public class StreamingContainerManager implements PlanContext } } - } - finally { + } finally { this.deployChangeCnt++; this.deployChangeInProgress.set(false); } @@ -2312,7 +2348,7 @@ public class StreamingContainerManager implements PlanContext public List getOperatorInfoList() { - List infoList = new ArrayList(); + List infoList = new ArrayList<>(); for (PTContainer container : this.plan.getContainers()) { for (PTOperator operator : container.getOperators()) { @@ -2352,7 +2388,7 @@ public class StreamingContainerManager implements PlanContext public List getLogicalOperatorInfoList() { - List infoList = new ArrayList(); + List infoList = new ArrayList<>(); Collection allOperators = getLogicalPlan().getAllOperators(); for (OperatorMeta operatorMeta : allOperators) { infoList.add(fillLogicalOperatorInfo(operatorMeta)); @@ -2445,19 +2481,18 @@ public class StreamingContainerManager implements PlanContext loi.totalTuplesEmitted = operator.getStatus().totalTuplesEmitted; loi.totalTuplesProcessed = operator.getStatus().totalTuplesProcessed; loi.failureCount = operator.getStatus().failureCount; - loi.status = new HashMap(); - loi.partitions = new TreeSet(); - loi.unifiers = new TreeSet(); - loi.containerIds = new TreeSet(); - loi.hosts = new TreeSet(); + loi.status = new HashMap<>(); + loi.partitions = new TreeSet<>(); + loi.unifiers = new TreeSet<>(); + loi.containerIds = new TreeSet<>(); + loi.hosts = new TreeSet<>(); Collection physicalOperators = getPhysicalPlan().getAllOperators(operator); NumberAggregate.LongAggregate checkpointTimeAggregate = new NumberAggregate.LongAggregate(); for (PTOperator physicalOperator : physicalOperators) { OperatorStatus os = physicalOperator.stats; if (physicalOperator.isUnifier()) { loi.unifiers.add(physicalOperator.getId()); - } - else { + } else { loi.partitions.add(physicalOperator.getId()); // exclude unifier, not sure if we should include it in the future @@ -2556,7 +2591,7 @@ public class StreamingContainerManager implements PlanContext public List getStreamInfoList() { - List infoList = new ArrayList(); + List infoList = new ArrayList<>(); for (PTContainer container : this.plan.getContainers()) { for (PTOperator operator : container.getOperators()) { @@ -2572,8 +2607,7 @@ public class StreamingContainerManager implements PlanContext p.operatorId = String.valueOf(input.target.getId()); if (input.target.isUnifier()) { p.portName = StreamingContainer.getUnifierInputPortName(input.portName, operator.getId(), output.portName); - } - else { + } else { p.portName = input.portName; } si.sinks.add(p); @@ -2587,7 +2621,7 @@ public class StreamingContainerManager implements PlanContext private static class RecordingRequestFilter implements Predicate { - final static Set MATCH_TYPES = Sets.newHashSet(StramToNodeRequest.RequestType.START_RECORDING, StramToNodeRequest.RequestType.STOP_RECORDING, StramToNodeRequest.RequestType.SYNC_RECORDING); + static final Set MATCH_TYPES = Sets.newHashSet(StramToNodeRequest.RequestType.START_RECORDING, StramToNodeRequest.RequestType.STOP_RECORDING, StramToNodeRequest.RequestType.SYNC_RECORDING); @Override public boolean apply(@Nullable StramToNodeRequest input) @@ -2623,7 +2657,7 @@ public class StreamingContainerManager implements PlanContext private void updateOnDeployRequests(PTOperator p, Predicate superseded, StramToNodeRequest newRequest) { // filter existing requests - List cloneRequests = new ArrayList(p.deployRequests.size()); + List cloneRequests = new ArrayList<>(p.deployRequests.size()); for (StramToNodeRequest existingRequest : p.deployRequests) { if (!superseded.apply(existingRequest)) { cloneRequests.add(existingRequest); @@ -2699,11 +2733,13 @@ public class StreamingContainerManager implements PlanContext this.containerStopRequests.put(containerId, containerId); } - public Recoverable getSetOperatorProperty(String operatorName, String propertyName, String propertyValue) { + public Recoverable getSetOperatorProperty(String operatorName, String propertyName, String propertyValue) + { return new SetOperatorProperty(operatorName, propertyName, propertyValue); } - public Recoverable getSetPhysicalOperatorProperty(int operatorId, String propertyName, String propertyValue) { + public Recoverable getSetPhysicalOperatorProperty(int operatorId, String propertyName, String propertyValue) + { return new SetPhysicalOperatorProperty(operatorId, propertyName, propertyValue); } @@ -2807,7 +2843,7 @@ public class StreamingContainerManager implements PlanContext task.requestId = nodeToStramRequestIds.incrementAndGet(); task.waitTime = waitTime; request.requestId = task.requestId; - FutureTask future = new FutureTask(task); + FutureTask future = new FutureTask<>(task); dispatch(future); return future; } @@ -2817,8 +2853,7 @@ public class StreamingContainerManager implements PlanContext LogicalPlan lp = getLogicalPlan(); try { return lp.getAttributes().clone(); - } - catch (CloneNotSupportedException ex) { + } catch (CloneNotSupportedException ex) { throw new RuntimeException("Cannot clone DAG attributes", ex); } } @@ -2831,8 +2866,7 @@ public class StreamingContainerManager implements PlanContext } try { return logicalOperator.getAttributes().clone(); - } - catch (CloneNotSupportedException ex) { + } catch (CloneNotSupportedException ex) { throw new RuntimeException("Cannot clone operator attributes", ex); } } @@ -2884,7 +2918,7 @@ public class StreamingContainerManager implements PlanContext public FutureTask logicalPlanModification(List requests) throws Exception { // delegate processing to dispatch thread - FutureTask future = new FutureTask(new LogicalPlanChangeRunnable(requests)); + FutureTask future = new FutureTask<>(new LogicalPlanChangeRunnable(requests)); dispatch(future); //LOG.info("Scheduled plan changes: {}", requests); return future; @@ -2960,8 +2994,7 @@ public class StreamingContainerManager implements PlanContext if (journal != null) { journal.write(operation); } - } - catch (Exception e) { + } catch (Exception e) { throw new IllegalStateException("Failed to write to journal " + operation, e); } } @@ -2978,12 +3011,11 @@ public class StreamingContainerManager implements PlanContext public static StreamingContainerManager getInstance(RecoveryHandler rh, LogicalPlan dag, boolean enableEventRecording) throws IOException { try { - CheckpointState checkpointedState = (CheckpointState) rh.restore(); + CheckpointState checkpointedState = (CheckpointState)rh.restore(); StreamingContainerManager scm; if (checkpointedState == null) { scm = new StreamingContainerManager(dag, enableEventRecording, new SystemClock()); - } - else { + } else { // find better way to support final transient members PhysicalPlan plan = checkpointedState.physicalPlan; plan.getLogicalPlan().setAttribute(LogicalPlan.APPLICATION_ATTEMPT_ID, dag.getAttributes().get(LogicalPlan.APPLICATION_ATTEMPT_ID)); @@ -2993,8 +3025,7 @@ public class StreamingContainerManager implements PlanContext f.setAccessible(true); try { f.set(plan, scm); - } - catch (Exception e) { + } catch (Exception e) { throw new RuntimeException("Failed to set " + f, e); } f.setAccessible(false); @@ -3015,8 +3046,7 @@ public class StreamingContainerManager implements PlanContext LOG.debug("Restore container agent {} for {}", c.getExternalId(), c); StreamingContainerAgent sca = new StreamingContainerAgent(c, scm.newStreamingContainerContext(c), scm); scm.containers.put(c.getExternalId(), sca); - } - else { + } else { LOG.debug("Requesting new resource for {}", c.toIdStateString()); scm.requestContainer(c); } @@ -3025,8 +3055,7 @@ public class StreamingContainerManager implements PlanContext scm.recoveryHandler = rh; scm.checkpoint(); return scm; - } - catch (IOException e) { + } catch (IOException e) { throw new IllegalStateException("Failed to read checkpointed state", e); } } @@ -3106,16 +3135,16 @@ public class StreamingContainerManager implements PlanContext this.finals = new FinalVars(finals, lp); StorageAgent sa = lp.getValue(OperatorContext.STORAGE_AGENT); - if(sa instanceof AsyncFSStorageAgent){ + if (sa instanceof AsyncFSStorageAgent) { // replace the default storage agent, if present - AsyncFSStorageAgent fssa = (AsyncFSStorageAgent) sa; + AsyncFSStorageAgent fssa = (AsyncFSStorageAgent)sa; if (fssa.path.contains(oldAppId)) { fssa = new AsyncFSStorageAgent(fssa.path.replace(oldAppId, appId), conf); lp.setAttribute(OperatorContext.STORAGE_AGENT, fssa); } } else if (sa instanceof FSStorageAgent) { // replace the default storage agent, if present - FSStorageAgent fssa = (FSStorageAgent) sa; + FSStorageAgent fssa = (FSStorageAgent)sa; if (fssa.path.contains(oldAppId)) { fssa = new FSStorageAgent(fssa.path.replace(oldAppId, appId), conf); lp.setAttribute(OperatorContext.STORAGE_AGENT, fssa); @@ -3182,7 +3211,7 @@ public class StreamingContainerManager implements PlanContext Thread.sleep(100); LOG.debug("Polling for a response to request with Id {}", requestId); } - if(obj != null) { + if (obj != null) { commandResponse.invalidate(requestId); return obj; } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/85db448e/engine/src/main/java/com/datatorrent/stram/StreamingContainerParent.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerParent.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerParent.java index fe3f399..b36529a 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerParent.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerParent.java @@ -18,24 +18,29 @@ */ package com.datatorrent.stram; -import com.datatorrent.stram.api.StramEvent.ContainerErrorEvent; -import com.datatorrent.stram.api.StramEvent.OperatorErrorEvent; -import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol; -import com.datatorrent.stram.util.SecureExecutor; -import com.datatorrent.stram.webapp.OperatorInfo; import java.io.IOException; import java.net.InetSocketAddress; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; -import org.apache.hadoop.ipc.*; +import org.apache.hadoop.ipc.ProtocolSignature; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.Server; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.security.authorize.Service; import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import com.datatorrent.stram.api.StramEvent.ContainerErrorEvent; +import com.datatorrent.stram.api.StramEvent.OperatorErrorEvent; +import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol; +import com.datatorrent.stram.util.SecureExecutor; +import com.datatorrent.stram.webapp.OperatorInfo; /** * @@ -44,7 +49,8 @@ import org.slf4j.LoggerFactory; * * @since 0.3.2 */ -public class StreamingContainerParent extends org.apache.hadoop.service.CompositeService implements StreamingContainerUmbilicalProtocol { +public class StreamingContainerParent extends org.apache.hadoop.service.CompositeService implements StreamingContainerUmbilicalProtocol +{ private static final Logger LOG = LoggerFactory.getLogger(StreamingContainerParent.class); private Server server; @@ -53,7 +59,8 @@ public class StreamingContainerParent extends org.apache.hadoop.service.Composit private final StreamingContainerManager dagManager; private final int listenerThreadCount; - public StreamingContainerParent(String name, StreamingContainerManager dnodeMgr, SecretManager secretManager, int listenerThreadCount) { + public StreamingContainerParent(String name, StreamingContainerManager dnodeMgr, SecretManager secretManager, int listenerThreadCount) + { super(name); this.dagManager = dnodeMgr; this.tokenSecretManager = secretManager; @@ -61,42 +68,49 @@ public class StreamingContainerParent extends org.apache.hadoop.service.Composit } @Override - public void init(Configuration conf) { - super.init(conf); + public void init(Configuration conf) + { + super.init(conf); } @Override - public void start() { + public void start() + { startRpcServer(); super.start(); } @Override - public void stop() { + public void stop() + { stopRpcServer(); super.stop(); } - protected void startRpcServer() { + protected void startRpcServer() + { Configuration conf = getConfig(); LOG.info("Config: " + conf); LOG.info("Listener thread count " + listenerThreadCount); try { server = new RPC.Builder(conf).setProtocol(StreamingContainerUmbilicalProtocol.class).setInstance(this) - .setBindAddress("0.0.0.0").setPort(0).setNumHandlers(listenerThreadCount).setSecretManager(tokenSecretManager).setVerbose(false).build(); + .setBindAddress("0.0.0.0").setPort(0).setNumHandlers(listenerThreadCount).setSecretManager(tokenSecretManager) + .setVerbose(false).build(); // Enable service authorization? if (conf.getBoolean( CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false)) { //refreshServiceAcls(conf, new MRAMPolicyProvider()); - server.refreshServiceAcl(conf, new PolicyProvider() { + server.refreshServiceAcl(conf, new PolicyProvider() + { @Override public Service[] getServices() { - return (new Service[] { - new Service(StreamingContainerUmbilicalProtocol.class.getName(), StreamingContainerUmbilicalProtocol.class) + return (new Service[]{ + new Service(StreamingContainerUmbilicalProtocol.class + .getName(), StreamingContainerUmbilicalProtocol.class) }); } @@ -111,11 +125,13 @@ public class StreamingContainerParent extends org.apache.hadoop.service.Composit } } - protected void stopRpcServer() { + protected void stopRpcServer() + { server.stop(); } - public InetSocketAddress getAddress() { + public InetSocketAddress getAddress() + { return address; } @@ -126,24 +142,27 @@ public class StreamingContainerParent extends org.apache.hadoop.service.Composit } void refreshServiceAcls(Configuration configuration, - PolicyProvider policyProvider) { + PolicyProvider policyProvider) + { this.server.refreshServiceAcl(configuration, policyProvider); } @Override public ProtocolSignature getProtocolSignature(String protocol, - long clientVersion, int clientMethodsHash) throws IOException { - return ProtocolSignature.getProtocolSignature(this, - protocol, clientVersion, clientMethodsHash); + long clientVersion, int clientMethodsHash) throws IOException + { + return ProtocolSignature.getProtocolSignature(this, protocol, clientVersion, clientMethodsHash); } @Override - public long getProtocolVersion(String arg0, long arg1) throws IOException { + public long getProtocolVersion(String arg0, long arg1) throws IOException + { return StreamingContainerUmbilicalProtocol.versionID; } @Override - public void log(String containerId, String msg) throws IOException { + public void log(String containerId, String msg) throws IOException + { LOG.info("child msg: {} context: {}", msg, dagManager.getContainerAgent(containerId).container); } @@ -152,8 +171,7 @@ public class StreamingContainerParent extends org.apache.hadoop.service.Composit { if (operators == null || operators.length == 0) { dagManager.recordEventAsync(new ContainerErrorEvent(containerId, msg)); - } - else { + } else { for (int operator : operators) { OperatorInfo operatorInfo = dagManager.getOperatorInfo(operator); if (operatorInfo != null) { @@ -163,22 +181,23 @@ public class StreamingContainerParent extends org.apache.hadoop.service.Composit } try { log(containerId, msg); - } - catch (IOException ex) { + } catch (IOException ex) { // ignore } } @Override public StreamingContainerContext getInitContext(String containerId) - throws IOException { + throws IOException + { StreamingContainerAgent sca = dagManager.getContainerAgent(containerId); return sca.getInitContext(); } @Override - public ContainerHeartbeatResponse processHeartbeat(ContainerHeartbeat msg) { + public ContainerHeartbeatResponse processHeartbeat(ContainerHeartbeat msg) + { // -- TODO // Change to use some sort of a annotation that developers can use to specify secure code // For now using SecureExecutor work load. Also change sig to throw Exception @@ -186,11 +205,13 @@ public class StreamingContainerParent extends org.apache.hadoop.service.Composit if (msg.sentTms - now > 50) { LOG.warn("Child container heartbeat sent time for {} ({}) is greater than the receive timestamp in AM ({}). Please make sure the clocks are in sync", msg.getContainerId(), msg.sentTms, now); } - //LOG.debug("RPC latency from child container {} is {} ms (according to system clocks)", msg.getContainerId(), now - msg.sentTms); + //LOG.debug("RPC latency from child container {} is {} ms (according to system clocks)", msg.getContainerId(), + // now - msg.sentTms); dagManager.updateRPCLatency(msg.getContainerId(), now - msg.sentTms); try { final ContainerHeartbeat fmsg = msg; - return SecureExecutor.execute(new SecureExecutor.WorkLoad() { + return SecureExecutor.execute(new SecureExecutor.WorkLoad() + { @Override public ContainerHeartbeatResponse run() { http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/85db448e/engine/src/main/java/com/datatorrent/stram/StreamingContainerSecurityInfo.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerSecurityInfo.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerSecurityInfo.java index 5e30887..60c0d8a 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerSecurityInfo.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerSecurityInfo.java @@ -49,10 +49,9 @@ public class StreamingContainerSecurityInfo extends SecurityInfo public TokenInfo getTokenInfo(Class type, Configuration c) { TokenInfo tokenInfo = null; - if (type.equals(StreamingContainerUmbilicalProtocol.class)) - { - tokenInfo = new TokenInfo() { - + if (type.equals(StreamingContainerUmbilicalProtocol.class)) { + tokenInfo = new TokenInfo() + { @Override public Class> value() { @@ -64,7 +63,6 @@ public class StreamingContainerSecurityInfo extends SecurityInfo { return null; } - }; } return tokenInfo; http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/85db448e/engine/src/main/java/com/datatorrent/stram/StringCodecs.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StringCodecs.java b/engine/src/main/java/com/datatorrent/stram/StringCodecs.java index 2c0a0bc..8bd5ed5 100644 --- a/engine/src/main/java/com/datatorrent/stram/StringCodecs.java +++ b/engine/src/main/java/com/datatorrent/stram/StringCodecs.java @@ -19,9 +19,9 @@ package com.datatorrent.stram; import java.net.URI; -import java.util.concurrent.ConcurrentHashMap; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,8 +29,9 @@ import org.slf4j.LoggerFactory; import org.apache.commons.beanutils.ConvertUtils; import org.apache.commons.beanutils.Converter; +import com.google.common.base.Throwables; + import com.datatorrent.api.StringCodec; -import com.datatorrent.netlet.util.DTThrowable; /** *

StringCodecs class.

@@ -39,8 +40,8 @@ import com.datatorrent.netlet.util.DTThrowable; */ public class StringCodecs { - private static final ConcurrentHashMap classLoaders = new ConcurrentHashMap(); - private static final Map, Class>> codecs = new HashMap, Class>>(); + private static final ConcurrentHashMap classLoaders = new ConcurrentHashMap<>(); + private static final Map, Class>> codecs = new HashMap<>(); private static final Logger LOG = LoggerFactory.getLogger(StringCodecs.class); private StringCodecs() @@ -69,11 +70,9 @@ public class StringCodecs StringCodec instance; try { instance = codec.newInstance(); - } - catch (IllegalAccessException ex) { + } catch (IllegalAccessException ex) { throw new RuntimeException("Internal Error - it's impossible for this exception to be thrown!", ex); - } - catch (InstantiationException ex) { + } catch (InstantiationException ex) { throw new RuntimeException("Internal Error - it's impossible for this exception to be thrown!", ex); } @@ -113,7 +112,7 @@ public class StringCodecs try { register(entry.getValue(), entry.getKey()); } catch (Exception ex) { - DTThrowable.rethrow(ex); + throw Throwables.propagate(ex); } } } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/85db448e/engine/src/main/java/com/datatorrent/stram/api/ContainerContext.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/api/ContainerContext.java b/engine/src/main/java/com/datatorrent/stram/api/ContainerContext.java index 5f13797..605944e 100644 --- a/engine/src/main/java/com/datatorrent/stram/api/ContainerContext.java +++ b/engine/src/main/java/com/datatorrent/stram/api/ContainerContext.java @@ -29,10 +29,10 @@ import com.datatorrent.api.Context; */ public interface ContainerContext extends Context { - public static final Attribute IDENTIFIER = new Attribute("unknown_container_id"); - public static final Attribute BUFFER_SERVER_MB = new Attribute(8*64); - public static final Attribute BUFFER_SERVER_TOKEN = new Attribute(null, null); - public static final Attribute REQUEST_FACTORY = new Attribute(null, null); + public static final Attribute IDENTIFIER = new Attribute<>("unknown_container_id"); + public static final Attribute BUFFER_SERVER_MB = new Attribute<>(8 * 64); + public static final Attribute BUFFER_SERVER_TOKEN = new Attribute<>(null, null); + public static final Attribute REQUEST_FACTORY = new Attribute<>(null, null); @SuppressWarnings("FieldNameHidesFieldInSuperclass") long serialVersionUID = AttributeInitializer.initialize(ContainerContext.class); } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/85db448e/engine/src/main/java/com/datatorrent/stram/api/ContainerEvent.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/api/ContainerEvent.java b/engine/src/main/java/com/datatorrent/stram/api/ContainerEvent.java index d200803..52f0959 100644 --- a/engine/src/main/java/com/datatorrent/stram/api/ContainerEvent.java +++ b/engine/src/main/java/com/datatorrent/stram/api/ContainerEvent.java @@ -32,15 +32,15 @@ import com.datatorrent.stram.engine.StreamContext; */ public interface ContainerEvent { - Class CONTAINER_EVENTS_LISTENERS[] = new Class[] { - com.datatorrent.stram.engine.BufferServerStatsSubscriber.class, - com.datatorrent.stram.debug.TupleRecorderCollection.class + Class[] CONTAINER_EVENTS_LISTENERS = new Class[]{ + com.datatorrent.stram.engine.BufferServerStatsSubscriber.class, + com.datatorrent.stram.debug.TupleRecorderCollection.class }; /** * Node event used for various events associated with nodes. */ - public static abstract class NodeEvent implements ContainerEvent + abstract class NodeEvent implements ContainerEvent { private Node node; @@ -59,7 +59,7 @@ public interface ContainerEvent /** * Event representing node activation. */ - public static class NodeActivationEvent extends NodeEvent + class NodeActivationEvent extends NodeEvent { public NodeActivationEvent(Node node) { @@ -71,7 +71,7 @@ public interface ContainerEvent /** * Event representing node deactivation. */ - public static class NodeDeactivationEvent extends NodeEvent + class NodeDeactivationEvent extends NodeEvent { public NodeDeactivationEvent(Node node) { @@ -83,7 +83,7 @@ public interface ContainerEvent /** * Event representing stats in the container from the ContainerStats object. */ - public static class ContainerStatsEvent implements ContainerEvent + class ContainerStatsEvent implements ContainerEvent { private ContainerStats containerStats; @@ -102,7 +102,7 @@ public interface ContainerEvent /** * Event representing streams. */ - public static abstract class StreamEvent implements ContainerEvent + abstract class StreamEvent implements ContainerEvent { private ComponentContextPair stream; @@ -121,7 +121,7 @@ public interface ContainerEvent /** * Event representing stream activation. */ - public static class StreamActivationEvent extends StreamEvent + class StreamActivationEvent extends StreamEvent { public StreamActivationEvent(ComponentContextPair stream) { @@ -133,7 +133,7 @@ public interface ContainerEvent /** * Event representing stream deactivation. */ - public static class StreamDeactivationEvent extends StreamEvent + class StreamDeactivationEvent extends StreamEvent { public StreamDeactivationEvent(ComponentContextPair stream) {