Return-Path: X-Original-To: apmail-samza-dev-archive@minotaur.apache.org Delivered-To: apmail-samza-dev-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1A22E1852C for ; Mon, 4 Apr 2016 20:17:21 +0000 (UTC) Received: (qmail 93835 invoked by uid 500); 4 Apr 2016 20:17:20 -0000 Delivered-To: apmail-samza-dev-archive@samza.apache.org Received: (qmail 93773 invoked by uid 500); 4 Apr 2016 20:17:20 -0000 Mailing-List: contact dev-help@samza.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@samza.apache.org Delivered-To: mailing list dev@samza.apache.org Received: (qmail 93642 invoked by uid 99); 4 Apr 2016 20:17:20 -0000 Received: from reviews-vm.apache.org (HELO reviews.apache.org) (140.211.11.40) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 04 Apr 2016 20:17:20 +0000 Received: from reviews.apache.org (localhost [127.0.0.1]) by reviews.apache.org (Postfix) with ESMTP id 1C3AA1C007E; Mon, 4 Apr 2016 20:17:18 +0000 (UTC) Content-Type: multipart/alternative; boundary="===============3401959640833903781==" MIME-Version: 1.0 Subject: Re: Review Request 44920: SAMZA-881 From: Jagadish Venkatraman To: Navina Ramesh , "Yi Pan \(Data Infrastructure\)" , Chris Pettitt , Boris Shkolnik , Jake Maes , Xinyu Liu Cc: Jagadish Venkatraman , samza Date: Mon, 04 Apr 2016 20:17:18 -0000 Message-ID: <20160404201718.20107.19476@reviews.apache.org> X-ReviewBoard-URL: https://reviews.apache.org/ Auto-Submitted: auto-generated Sender: Jagadish Venkatraman X-ReviewGroup: Samza X-Auto-Response-Suppress: DR, RN, OOF, AutoReply X-ReviewRequest-URL: https://reviews.apache.org/r/44920/ X-Sender: Jagadish Venkatraman References: <20160331224355.5121.38933@reviews.apache.org> In-Reply-To: <20160331224355.5121.38933@reviews.apache.org> X-ReviewBoard-Diff-For: samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceStatus.java X-ReviewBoard-Diff-For: samza-core/src/main/java/org/apache/samza/clustermanager/SamzaTaskManager.java X-ReviewBoard-Diff-For: samza-core/src/main/scala/org/apache/samza/metrics/SamzaAppMasterMetrics.scala X-ReviewBoard-Diff-For: samza-yarn/src/main/scala/org/apache/samza/job/yarn/refactor/SamzaAppMasterService.scala X-ReviewBoard-Diff-For: samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java X-ReviewBoard-Diff-For: samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java X-ReviewBoard-Diff-For: samza-yarn/src/main/java/org/apache/samza/job/yarn/refactor/YarnAppState.java X-ReviewBoard-Diff-For: samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java X-ReviewBoard-Diff-For: samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManagerFactory.java X-ReviewBoard-Diff-For: samza-core/src/main/java/org/apache/samza/clustermanager/ContainerRequestState.java X-ReviewBoard-Diff-For: samza-yarn/src/main/java/org/apache/samza/job/yarn/refactor/YarnContainerRunner.java X-ReviewBoard-Diff-For: samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java X-ReviewBoard-Diff-For: samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java X-ReviewBoard-Diff-For: samza-yarn/src/main/scala/org/apache/samza/job/yarn/refactor/SamzaYarnAppMasterLifecycle.scala X-ReviewBoard-Diff-For: samza-core/src/main/java/org/apache/samza/clustermanager/SamzaAppState.java X-ReviewBoard-Diff-For: samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java X-ReviewBoard-Diff-For: samza-core/src/main/java/org/apache/samza/clustermanager/ResourceFailure.java X-ReviewBoard-Diff-For: samza-core/src/main/scala/org/apache/samza/coordinator/JobModelReader.scala X-ReviewBoard-Diff-For: samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java X-ReviewBoard-Diff-For: samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResource.java X-ReviewBoard-Diff-For: samza-core/src/main/java/org/apache/samza/clustermanager/SamzaContainerLaunchException.java X-ReviewBoard-Diff-For: samza-yarn/src/main/java/org/apache/samza/job/yarn/refactor/YarnContainerManagerFactory.java X-ReviewBoard-Diff-For: samza-yarn/src/main/java/org/apache/samza/job/yarn/refactor/YarnContainerManager.java Reply-To: Jagadish Venkatraman X-ReviewRequest-Repository: samza --===============3401959640833903781== MIME-Version: 1.0 Content-Type: text/plain; charset="utf-8" Content-Transfer-Encoding: 8bit > On March 31, 2016, 10:43 p.m., Yi Pan (Data Infrastructure) wrote: > > I have some high level questions w/ the current class layout/hierarchy. Will sync up w/ Jagadish in person. Thanks for the great feedback :-) I'll update the revised patch soon. Publishing this, so that I don't loose responses to your comments. > On March 31, 2016, 10:43 p.m., Yi Pan (Data Infrastructure) wrote: > > samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java, line 85 > > > > > > Is this a copy of the AbstractContainerAllocator in samza-yarn? Or a modified version? If it is a simple copy, I would suggest to remove the code in samza-yarn, s.t. we don't leave dup codes in two different places. This is not a copy. This class is independent of Yarn. Hence, it has changes to make logic agnostic to Yarn/ its dependencies. Unless we deprecate samza-yarn/AbstractContainrAllocator (ACA), we cannot delete it. It'll be good to ensure that changes to samza-yarn/ACA are ported back to samza-core/ACA (for this interim migration period.) > On March 31, 2016, 10:43 p.m., Yi Pan (Data Infrastructure) wrote: > > samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java, line 143 > > > > > > I assume that the lines between 143 to 148 are read/write to resourceRequestState object? Shouldn't it be synchronized for thread-safety? In this case, both the read and write happen in the same thread(the allocator thread). Even if the allocator thread gets preempted after the peek(), and the callback thread adds a new resource via a call to addResource(), the access is protected with a lock in ContainerRequestState. Also, once we allocate the resource, the release of resources happens within the same allocator thread (So, the resource will not be pre-empted to run another container). Please let me know if this reasoning is off. > On March 31, 2016, 10:43 p.m., Yi Pan (Data Infrastructure) wrote: > > samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java, line 149 > > > > > > Why is this "expectedContainerId"? Is there a case that we asked for container_0 and we get back container_9? Good point, I've fixed it. > On March 31, 2016, 10:43 p.m., Yi Pan (Data Infrastructure) wrote: > > samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java, line 160 > > > > > > I would prefer to: > > 1) wrap the internal member variable via accessor methods of SamzaAppState > > 2) if jobHealthy is purely derived from neededResources.get() == 0, do not introduce an additional variable for that. 1. Good point, I agree on (1). For providing thread-safe accessors to SamzaAppState, I'm tracking it as a part of the umbrella ticket https://issues.apache.org/jira/browse/SAMZA-902 . 2.State is not solely derived from neededResources.get()==0. It is used in AbstractContainerAllocator, SamzaAMMetrics, SamzaTaskManager, and SamzaAMLifecycle. There are cases in the above usages where setting jobHealthy to true, is not equivalent to neededResources.get()==0. > On March 31, 2016, 10:43 p.m., Yi Pan (Data Infrastructure) wrote: > > samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java, line 157 > > > > > > Wouldn't it be better to encapsulate the container launch and app state change in a single method? I thought Jacob did some refactor in the samza-yarn old classes. Is it just not picked up by this patch yet? All Jake's changes (apart from some minor utility methods) are picked up by this patch. I agree with your suggestion. Making SamzaAppState to have thread-safe accessors is tracked in SAMZA-902 and is not straight-forward (partly because of spaghetti'ish access to globals everywhere and accesses to member variables from the UI template scalate/scaml files). I was thinking of doing that as a separate change (as this change was scoped to be the AM inversion). I'm happy to do this in the same patch if you think so. > On March 31, 2016, 10:43 p.m., Yi Pan (Data Infrastructure) wrote: > > samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java, line 216 > > > > > > What's the UUID here for? Since it is purely random UUID in the caller, can we remove that and provide a version of constructor that like below? > > {code} > > SamzaResourceRequest(int cpuCores, int memMb, String host, String containerId) > > {code} Good point. Thanks for the suggestion. I'll fix this. > On March 31, 2016, 10:43 p.m., Yi Pan (Data Infrastructure) wrote: > > samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java, line 65 > > > > > > nit: could use the same annotation as other variables. Fixed! > On March 31, 2016, 10:43 p.m., Yi Pan (Data Infrastructure) wrote: > > samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java, line 49 > > > > > > Since we are adding multi-threaded container models, would it be a reasonable time to add thread-safety in all new classes? Since the job coordinator mainly is requesting/allocating containers, we can start w/ a simple synchronous keyword on all methods, w/o too much concerns of the performance degradation. The cluster based job coordinator currently exposes a single run() method and is the main() function. I'll ensure that all newly added methods - namely start, getJobModel (when we move to the StreamProcessor based process model) are thread-safe. > On March 31, 2016, 10:43 p.m., Yi Pan (Data Infrastructure) wrote: > > samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java, line 56 > > > > > > question: I thought that this *is* an implementation of JobCoordinator API??? Otherwise, the class name is really confusing since it suggest that. Clarified offline. I think there was some misconception regarding the scope of this RB. I should have called it out that it was scoped to do just the inversion. SAMZA-881 is larger in scope and will build upon this. > On March 31, 2016, 10:43 p.m., Yi Pan (Data Infrastructure) wrote: > > samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java, line 80 > > > > > > suggestion for future: is this an inherited metrics class for YARN? I think that we should create a more generic SamzaJobCoordinatorMetrics later. This class is independent of Yarn. So, a better name may be JobCoordinatorMetrics. We'll need to document that we're changing some metrics namespaces. > On March 31, 2016, 10:43 p.m., Yi Pan (Data Infrastructure) wrote: > > samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java, line 103 > > > > > > I thought that we *always* start JMX server in the containers? Why are we turning it off? We have a feature to turn JMX off. (yarn.am.jmx.enabled) is the property setting. > On March 31, 2016, 10:43 p.m., Yi Pan (Data Infrastructure) wrote: > > samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java, line 131 > > > > > > Question: why isn't this JmxServer object's lifecycle == StreamProcessor? Or even, the whole user-defined process? Discussed offline. We agreed that JmxServer could be passsed externally. It's still useful to distinguish isntantiation, and startup of the JmxServer in the JmxServer class. > On March 31, 2016, 10:43 p.m., Yi Pan (Data Infrastructure) wrote: > > samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java, line 154 > > > > > > Curious: why do we need all these to create ContainerProcessManager? I thought that we should only need cluster manager related config info? Something like: ClusterManagerConfig??? Discussed offline. We agreed that we need the 1. Callback 2. SamzaAppState (which encapsulates the JModelManager) The passage of SamzaAppState does leak some abstractions to the cluster resource manager. In the future, we should define a standard interface that all cluster managers should support (eg: getLogFileForStreamProcessor(id) so that we can standardize the same Samza AM UI across cluster managers and even stand-alone.) > On March 31, 2016, 10:43 p.m., Yi Pan (Data Infrastructure) wrote: > > samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java, line 45 > > > > > > nit: If you want to change this, open a separate JIRA and remove the TODO here. Done. Thanks for the feedback? - Jagadish ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/44920/#review126385 ----------------------------------------------------------- On March 31, 2016, 10:40 p.m., Jagadish Venkatraman wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/44920/ > ----------------------------------------------------------- > > (Updated March 31, 2016, 10:40 p.m.) > > > Review request for samza, Boris Shkolnik, Chris Pettitt, Jake Maes, Yi Pan (Data Infrastructure), Navina Ramesh, and Xinyu Liu. > > > Repository: samza > > > Description > ------- > > 1.Proposed new APIs for running Samza without Yarn. (SAMZA-881) > - Defined the ContainerProcessManager abstraction. > - Defined the SamzaResource, SamzaResourceRequest. > - Re-wrote the SamzaAppMaster logic into a ClusterBasedJobCoordinator. > 2.Defined a ClusterManagerConfig to handle configurations independent of Yarn/Mesos. > 3.Made Samza completely independent of Yarn. This cleanly separates Samza specific components and Yarn > specific components. > 4.Readability improvements to the existing code base. > -Added explicity documentation for every method, member and class (including on thread-safety) > - Made internal variables final to document intent, visibility across threads. (trivially by adding modifiers, or by changing where they're initialized.) > 5.Refactored JobCoordinator into a JobModelReader. > > == Diff2 == > Address review feedback. > > Design Doc: https://issues.apache.org/jira/secure/attachment/12790540/SamzaJobCoordinatorRe-designProposal.pdf > > > Diffs > ----- > > samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java PRE-CREATION > samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java PRE-CREATION > samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java PRE-CREATION > samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManagerFactory.java PRE-CREATION > samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java PRE-CREATION > samza-core/src/main/java/org/apache/samza/clustermanager/ContainerRequestState.java PRE-CREATION > samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java PRE-CREATION > samza-core/src/main/java/org/apache/samza/clustermanager/ResourceFailure.java PRE-CREATION > samza-core/src/main/java/org/apache/samza/clustermanager/SamzaAppState.java PRE-CREATION > samza-core/src/main/java/org/apache/samza/clustermanager/SamzaContainerLaunchException.java PRE-CREATION > samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResource.java PRE-CREATION > samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java PRE-CREATION > samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceStatus.java PRE-CREATION > samza-core/src/main/java/org/apache/samza/clustermanager/SamzaTaskManager.java PRE-CREATION > samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java PRE-CREATION > samza-core/src/main/scala/org/apache/samza/coordinator/JobModelReader.scala PRE-CREATION > samza-core/src/main/scala/org/apache/samza/metrics/SamzaAppMasterMetrics.scala PRE-CREATION > samza-shell/src/main/bash/run-am.sh 9545a96953baaff17ad14962e02bc12aadbb1101 > samza-yarn/src/main/java/org/apache/samza/job/yarn/refactor/YarnAppState.java PRE-CREATION > samza-yarn/src/main/java/org/apache/samza/job/yarn/refactor/YarnContainerManager.java PRE-CREATION > samza-yarn/src/main/java/org/apache/samza/job/yarn/refactor/YarnContainerManagerFactory.java PRE-CREATION > samza-yarn/src/main/java/org/apache/samza/job/yarn/refactor/YarnContainerRunner.java PRE-CREATION > samza-yarn/src/main/scala/org/apache/samza/job/yarn/refactor/SamzaAppMasterService.scala PRE-CREATION > samza-yarn/src/main/scala/org/apache/samza/job/yarn/refactor/SamzaYarnAppMasterLifecycle.scala PRE-CREATION > > Diff: https://reviews.apache.org/r/44920/diff/ > > > Testing > ------- > > Tested with sample jobs on clusters of varying sizes. Tested locally. TODO: Unit tests. > > > Thanks, > > Jagadish Venkatraman > > --===============3401959640833903781==--