Return-Path: X-Original-To: apmail-samza-dev-archive@minotaur.apache.org Delivered-To: apmail-samza-dev-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4C19310130 for ; Sat, 31 Aug 2013 14:33:32 +0000 (UTC) Received: (qmail 83662 invoked by uid 500); 31 Aug 2013 14:33:32 -0000 Delivered-To: apmail-samza-dev-archive@samza.apache.org Received: (qmail 83631 invoked by uid 500); 31 Aug 2013 14:33:31 -0000 Mailing-List: contact dev-help@samza.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@samza.incubator.apache.org Delivered-To: mailing list dev@samza.incubator.apache.org Delivered-To: moderator for dev@samza.incubator.apache.org Received: (qmail 26055 invoked by uid 99); 30 Aug 2013 22:26:03 -0000 X-ASF-Spam-Status: No, hits=-1998.2 required=5.0 tests=ALL_TRUSTED,HTML_MESSAGE,LONGWORDS,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Content-Type: multipart/alternative; boundary="===============5576725796552808200==" MIME-Version: 1.0 Subject: Re: Review Request 13725: SAMZA-2 From: "Chris Riccomini" To: "Jay Kreps" , "samza" , "Chris Riccomini" Date: Fri, 30 Aug 2013 22:25:35 -0000 Message-ID: <20130830222535.25770.45008@reviews.apache.org> X-ReviewBoard-URL: https://reviews.apache.org Auto-Submitted: auto-generated Sender: "Chris Riccomini" X-ReviewGroup: Samza X-ReviewRequest-URL: https://reviews.apache.org/r/13725/ X-Sender: "Chris Riccomini" References: <20130830210702.25768.75947@reviews.apache.org> In-Reply-To: <20130830210702.25768.75947@reviews.apache.org> Reply-To: "Chris Riccomini" X-Virus-Checked: Checked by ClamAV on apache.org --===============5576725796552808200== Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit > On Aug. 30, 2013, 9:07 p.m., Jay Kreps wrote: > > This looks good. > > > > What is a situation in which you would ever need a start and stop in the MessageChooser? Since this is just logic you shouldn't need this, right? > > > > I wonder if this is still a bit high level. Comming to the specific use cases > > 1. Prioritize by time > > 2. Bootstrap state first > > 3. Remain roughly equally behind on each stream > > > > If you think about these. For the first two we haven't really made it easy. For the third one it still isn't possible. > > > > I kind of feel that making this pluggable is nice, but getting a fully featured default implementation is more important. This implementation should > > a. Allow preferring certain streams for the state bootstrap > > b. Keep all streams of the same priority roughly in sync. > > > > Maybe this is just something each user needs to do once and then reuse that class? The main motivation for start/stop is allow developers to setup a client that queries some outside service to make picking decisions. I'm not saying that this is advisable, but I know people will try and do it. Without stop, there's no way to shut down the client when the service stops. If we assume the service never stops, then this isn't a problem, but if there is a definite "end" to the processor (i.e. TaskCoordinator.shutdown), then the chooser needs a graceful shutdown. The motivation for register is that there are situations where you want to initialize your data structures (or whatever) on startup before any messages are received. Letting the MessageChooser know which SystemStreamPartitions they're going to be receiving messages from just seems like a good idea. For example, if we wanted to implement Sriram's version of RoundRobinChooser, you could setup a Map[SystemStreamPartition, ArrayDequeue] on start, instead of having to lazily check the map and insert a new queue if none exists every time update is called. (1) can be done by implementing a PriorityChooser that yanks out the timestamp from the message. I don't think we can do much better than this without adding a "time" field to the IncomingMessageEnvelope, and making consumers/serdes supply it. (2) CAN'T be done right now because we don't have any concept of "end of stream" in the IncomingMessageEnvelope. Since our fetches are asynchronous, the fact that the chooser has no more messages for a given SSP doesn't actually mean that it's at head. There is no way to tell, and without this, you can't be guaranteed that you've bootstrapped all state prior to processing messages. (3) Do you mean equally behind in terms of msgs from head, or equally behind in terms of wall-clock time? Msgs from head can't be done due to the same reason as (2). Equally behind in wall clock time is just the same as (1) isn't it? Correct me if I'm wrong, but it seems like the main issue you have is that we don't have a way to measure how far behind a message is (messages behind, or milliseconds behind). Without this, neither (a) or (b) is possible. All we have in the IncomingMessageEnvelope is offset, but no relation to head (or a timestamp). Without this, I don't think we can satisfy either criteria, whether we have a good default implementation or not. - Chris ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/13725/#review25804 ----------------------------------------------------------- On Aug. 30, 2013, 5:47 p.m., Chris Riccomini wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/13725/ > ----------------------------------------------------------- > > (Updated Aug. 30, 2013, 5:47 p.m.) > > > Review request for samza. > > > Repository: samza > > > Description > ------- > > added start, stop, and register to message chooser. > > > adding docs for message chooser. swiching round robin chooser back to a queue. > > > missed license in message chooser factory > > > add apache licensing > > > samza container was using message chooser, not message chooser factory. fixed. > > > add stream chooser test. update stream chooser to invert priority due to bug. > > > add round robin test. fix compile error in round robin chooser. > > > add priority chooser test. fix bug in priority chooser that was reversing ordering. > > > adding stream chooser. adding message chooser factory. > > > adding priority chooser. moving default chooser to round robin chooser. adding config for chooser > > > Diffs > ----- > > docs/learn/documentation/0.7.0/container/streams.md e755789407b294e02b399e71ba684c1d6dc314c6 > samza-api/src/main/java/org/apache/samza/system/MessageChooser.java 306b2902303c72f3d7a3eb313f55d7e88d21e00d > samza-api/src/main/java/org/apache/samza/system/PriorityChooser.java PRE-CREATION > samza-api/src/test/java/org/apache/samza/system/TestPriorityChooser.java PRE-CREATION > samza-core/src/main/scala/org/apache/samza/config/StreamChooserConfig.scala PRE-CREATION > samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 0c742d83c2f60d2448a79376677713a1ff0b11ec > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 2d2efdd14c7680c29aad5f2a98349e2fc57cf9fe > samza-core/src/main/scala/org/apache/samza/system/DefaultChooser.scala 5a72e7a3bfba0f06a5a98c6ba26865800d7780b9 > samza-core/src/main/scala/org/apache/samza/system/RoundRobinChooser.scala PRE-CREATION > samza-core/src/main/scala/org/apache/samza/system/StreamChooser.scala PRE-CREATION > samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala b18f0cc5a21088a58db1c26ff43bba06dd3165ac > samza-core/src/test/scala/org/apache/samza/system/TestRoundRobinChooser.scala PRE-CREATION > samza-core/src/test/scala/org/apache/samza/system/TestStreamChooser.scala PRE-CREATION > samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala PRE-CREATION > > Diff: https://reviews.apache.org/r/13725/diff/ > > > Testing > ------- > > > Thanks, > > Chris Riccomini > > --===============5576725796552808200==--