Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 6149D200D1B for ; Thu, 12 Oct 2017 19:00:37 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 5FE6E1609E8; Thu, 12 Oct 2017 17:00:37 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A5BF51609E4 for ; Thu, 12 Oct 2017 19:00:36 +0200 (CEST) Received: (qmail 38431 invoked by uid 500); 12 Oct 2017 17:00:35 -0000 Mailing-List: contact dev-help@samza.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@samza.apache.org Delivered-To: mailing list dev@samza.apache.org Received: (qmail 38418 invoked by uid 99); 12 Oct 2017 17:00:35 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 12 Oct 2017 17:00:35 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id A10C21A1741 for ; Thu, 12 Oct 2017 17:00:34 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.779 X-Spam-Level: * X-Spam-Status: No, score=1.779 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=dataminr-com.20150623.gappssmtp.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id TycYpoP6PPIu for ; Thu, 12 Oct 2017 17:00:31 +0000 (UTC) Received: from mail-qt0-f178.google.com (mail-qt0-f178.google.com [209.85.216.178]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 0C2605FB9F for ; Thu, 12 Oct 2017 17:00:31 +0000 (UTC) Received: by mail-qt0-f178.google.com with SMTP id v41so4627750qtv.12 for ; Thu, 12 Oct 2017 10:00:31 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=dataminr-com.20150623.gappssmtp.com; s=20150623; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=BgnmDZxQKacyzovmSsJqfDBSce+9SpQYGcCYY4H0WBw=; b=AO4b1uNbd9KyjrwfGYRYG+aVab0P7RnFSSWF+TUexCoHBbAoS0i5HgkEMjhMbHVeBW yTl4M/4WHo9D2zW+CgSkq9sK3EZKdV86/QGGFl/dZw0+rDLFJAE0LNtKM2BC60l8xqoT uqxPmGvGXZ/0D/9uqiGK6cilvYVwCVOabx+s5qG6IyUupJvBtzPpQtRG+UQXknk1xml4 2KRFi6WQwawJIdZRKAlNpBnp0ijJVUnFW/3Rlvv0HGAUTLuqMnt6qPFWkzh2VZQTnEP6 P48MFIKRcq2aYNR9pDRU0U8wHoEIHA4hCkJijt6ZdaG54a+WFnJpirRmZAvH0y0DiMPA PZcw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=BgnmDZxQKacyzovmSsJqfDBSce+9SpQYGcCYY4H0WBw=; b=r+mFquv0zzWirFfiXU/rbN4S5A6xxdCp4LNqqebYN4VfgA/r2+mMdpGtKjQdRs7LGC gQ9sGsBCPAM+Mo/gTXjGJwPE5X7OiiAEK5SXJUDks39avq1rTVguwwfujN8uphbAYVnf 61K4VG30A6lr3p5OIkztLt6z/j3joiZdPNjaiHOjB14R7LKwD8vi7ayJNRijIB840pm9 jUwFh9DYGvVhzx2gRbqD278J/HarDicI1wdqX3duhzSLnAid24WRIpu4y5Kyim8cScqL FaR85jJ58V+y4HSfrqsoaHNnR1it8r9j2z5DrJtEEcQX9eFY0yGdOJSNRP9h3p+NBZay i29Q== X-Gm-Message-State: AMCzsaWOOcrTeUzA3ABETBKX3zSQSny9w1tyGnLaDRnM3JR2WATH53bS /1D7cKXqjXPiCOtGfRLwKTfnQDH5GoZQyREG73m/+FvOoZA= X-Google-Smtp-Source: AOwi7QCH7T9xB0mYKDkuxLF3qd7d8mVQRveEFGEyKbtSEDTrL9R+uJh1wRXi3YK7zLMR2saA3j8j1ne3mI0aC/Xhn+E= X-Received: by 10.200.27.89 with SMTP id p25mr4728640qtk.147.1507827630327; Thu, 12 Oct 2017 10:00:30 -0700 (PDT) MIME-Version: 1.0 Received: by 10.237.60.84 with HTTP; Thu, 12 Oct 2017 10:00:29 -0700 (PDT) In-Reply-To: References: From: Jef G Date: Thu, 12 Oct 2017 13:00:29 -0400 Message-ID: Subject: Re: buffering records for join To: dev@samza.apache.org Content-Type: multipart/alternative; boundary="f403045d6f8449d61e055b5c7ad4" archived-at: Thu, 12 Oct 2017 17:00:37 -0000 --f403045d6f8449d61e055b5c7ad4 Content-Type: text/plain; charset="UTF-8" Yi, thanks for your detailed reply! I believe what you are suggesting regarding a KV store is to set up a remote durable system to maintain the join state. That way if a node dies and Samza restarts the task on another node, the join state is still available. Is that correct? This approach is certainly an option. However, we were hoping to use an in-process in-memory KV store, as a remote store would introduce a lot of latency for us. In some cases we would have to make more than 100,000 round trips per second to the KV store for a single stream, and we would want to be able to scale beyond that. It also introduces some complexity and another point of failure. Regarding using AsyncStreamTask with a very large (100,000) task.max.concurrency, is that a bad idea? The high-level API is based on the KStream API, right? Our jobs will sometimes need to join as many as 20 input streams. I believe currently Samza (and KStream) only supports a binary join and if that is the case, we would need 19 binary joins. The KStream doc suggests that all intermediate results are persisted, so many chained joins might be very inefficient. If so we would prefer to use the "classic" API. -Jef On Wed, Oct 11, 2017 at 7:37 PM, Yi Pan wrote: > Hi, Jef, > > I would recommend that you use a KV store to buffer the messages for join. > The logic would be more predictable and state is also durable. In > StreamTask.process(), you can do some pseudo code like below: > {code} > public void process(IncomingMessageEnvelope msg, MessageCollector > collector, TaskCoordinator coordinator) { > if (msg is from streamA) { > storeA.put(msg.key, msg); > } else { > storeB.put(msg.key, msg); > } > if (joinCondition is triggered) { > doJoin(storeA, storeB); > } > } > {code} > > Make sure that you configure storeA and storeB w/ changelog s.t. they can > be recovered. Then, you don't need to worry about the data loss, since > before the auto-checkpoint, your buffered messages are flushed to disk and > changelog via storeA and storeB. If you do not want to delete each and > every buffered message after join, you can set TTL for each store if you > are using RocksDB store. > > We are also actively working on build-in join operator in Samza high-level > APIs. The new high-level APIs are already released in Samza 0.13.1 with the > feature preview here: > http://samza.apache.org/startup/preview/#high-level-api. Feel free to take > a look and try it. We love to hear about feedbacks now. The current version > does not support durable state in join yet. We are actively working on > durable state support in he next release. Note that the high-level API is > still in early evolution and might change in the next two releases. > > Best! > > -Yi > > On Wed, Oct 11, 2017 at 1:56 PM, Jef G wrote: > > > Hello. My team is looking into Samza for doing real-time processing. We > > would like to run a directed graph of jobs, where the records in each > job's > > input streams are joined on a common key. We have functionality to > perform > > the join by buffering records from the input streams until certain > > conditions are met and then passing them on. > > > > We are wondering about the best way to integrate this functionality into > a > > Samza job. After looking over the API we see two possibilities: > > > > 1. Use a StreamTask that adds records to a buffer. This is the method > that > > the "ad event" example uses. But we am concerned that the framework > commits > > a StreamTask's offset after process() completes, so if the job fails, > > records in the buffer are permanently lost. > > > > 2. Use an AsyncTask that adds records to a buffer. Also add TaskCallbacks > > to the buffer. When records are eventually joined and processed, commit > > their callbacks. This method seems promising but it requires setting > > task.max.concurrency very high - possibly in the tens of thousands in our > > case. Are we likely to run into any issues doing that? > > > > Are there any other options that we overlooked? What is the best > approach? > > > > -Jef G > > > -- Jef G Senior Data Scientist | Dataminr | dataminr.com 99 Madison Ave, 3rd Floor | New York, NY 10016 jefg@dataminr.com --f403045d6f8449d61e055b5c7ad4--