Return-Path: X-Original-To: apmail-apex-dev-archive@minotaur.apache.org Delivered-To: apmail-apex-dev-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 724F918C25 for ; Thu, 17 Dec 2015 21:07:03 +0000 (UTC) Received: (qmail 52662 invoked by uid 500); 17 Dec 2015 21:07:03 -0000 Delivered-To: apmail-apex-dev-archive@apex.apache.org Received: (qmail 52597 invoked by uid 500); 17 Dec 2015 21:07:03 -0000 Mailing-List: contact dev-help@apex.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@apex.incubator.apache.org Delivered-To: mailing list dev@apex.incubator.apache.org Received: (qmail 52585 invoked by uid 99); 17 Dec 2015 21:07:03 -0000 Received: from Unknown (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 Dec 2015 21:07:03 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 8E324C8912 for ; Thu, 17 Dec 2015 21:06:57 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 0 X-Spam-Level: X-Spam-Status: No, score=0 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=datatorrent-com.20150623.gappssmtp.com Received: from mx1-eu-west.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id U0ZYnNgL1i-q for ; Thu, 17 Dec 2015 21:06:41 +0000 (UTC) Received: from mail-qk0-f177.google.com (mail-qk0-f177.google.com [209.85.220.177]) by mx1-eu-west.apache.org (ASF Mail Server at mx1-eu-west.apache.org) with ESMTPS id 2C1FC20C61 for ; Thu, 17 Dec 2015 21:06:41 +0000 (UTC) Received: by mail-qk0-f177.google.com with SMTP id t125so103960071qkh.3 for ; Thu, 17 Dec 2015 13:06:41 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=datatorrent-com.20150623.gappssmtp.com; s=20150623; h=references:from:in-reply-to:mime-version:date:message-id:subject:to :content-type; bh=4rrt/hL7VjYhjga59M1CIl9/dIagsjDYVkbiYzAt6OA=; b=VU9mzkm21r2A/1HrFlR7Ma30vFB4G3yrMxfCCJdtys0EGOxETIBppRLN41u0WqN7bN nenPld/216FhhIUnFghL49xnaPkDBU84GOE3Pti24i9NGXu2EIgnFrYiuTvi8ae94IYp F7HVCNTNb6xebek2ma8IWy+52OPYYLMD2zKALCOldU66dn/URGPcbrxjC5yDQ2os7Ta9 OvbXtWsdXqzjYfQoHH9wxJK/1fF8mE189C54THrs+4T43TTKaoZjZrZOTxhOIoHNt25o vKUZPNDCjw/jusEwfb2HteC34jOr7TXl/EKj8MfRyjkK5xK2GirfA4UzYBvy7SzQOld9 aiXQ== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:references:from:in-reply-to:mime-version:date :message-id:subject:to:content-type; bh=4rrt/hL7VjYhjga59M1CIl9/dIagsjDYVkbiYzAt6OA=; b=C/HipEJyfN1w15KIZFdT4nAS4Vrde7PZ03YLLy7kDhQXPIyDUJocjhF5fZNTqwmZ+i i58R9HCcB2O31Ocsv9FSQWOrU2TNdzCxzgqLCi58hXm/FT0N9XLqMcu/VhEYdHC0CXRg CDeg2KPI2g61siFq5VIyt6cOpOsoFHMBheYeY5hpBNxjY/sBSU1pMZwszlhwo7T+nYJ3 LM+FrfFmp7V17DPp02AmgDP9i8OX9/BKAkE6M9ia0+6YFbx0vmnIEi5WIEB3DxOthTz+ pTFgWG1182p287QtyQT5KOKmhW/ML1fWjxcn5/HF/EL3nGRiSupSTcXMXCvDcw9TDQLw SbkA== X-Gm-Message-State: ALoCoQm9iXLoPpFPM8rkvAJDrqq0yXOoGFbCiZRahxHzbfUWXzhS2MEgcFG+g+NhBoCOYYqw3eBatYqSPBH9FwuJoMYQnBiFMIZppEbU6+vSWvEdmw9QOa8= X-Received: by 10.55.26.204 with SMTP id l73mr71034443qkh.78.1450386394245; Thu, 17 Dec 2015 13:06:34 -0800 (PST) References: From: Pramod Immaneni In-Reply-To: Mime-Version: 1.0 (1.0) Date: Thu, 17 Dec 2015 13:06:33 -0800 Message-ID: <7178413472941578920@unknownmsgid> Subject: Re: Database Output Operator Improvements To: dev@apex.incubator.apache.org Content-Type: text/plain; charset=UTF-8 Why can't the reconciler as it exists today be enhanced to write more optimally. > On Dec 17, 2015, at 12:07 PM, Chandni Singh wrote: > > The question is with databases like HBase & Cassandra which are again > backed by a FileSystem like HDFS why to write to HDFS when the database > connection is healthy? > > These are distributed, scalable and performant databases. > > IMO reconciler approach isn't the best here. It fits the needs when the > external entity is always slow which was the original use case. > We can spool to HDFS when the connection is unhealthy. > > If this is properly implemented it can address all the other points which > are mentioned by Ashwin. > > Also I think benchmarking of such solutions will help us in comparing and > deciding which use case they fit best. > > Chandni > > On Thu, Dec 17, 2015 at 11:49 AM, Ashwin Chandra Putta < > ashwinchandrap@gmail.com> wrote: > >> Tim, >> >> Are you saying HDFS is slower than a database? :) >> >> I think Reconciler is the best approach. The tuples need not be written to >> hdfs, they can be queued in memory. You can spool them to hdfs only when it >> reaches the limits of the queue. The reconciler solves a few major problems >> as you described above. >> >> 1. Graceful reconnection. When the external system we are writing to is >> down, the reconciler is spooling the messages to the queue and then to >> hdfs. The tuples are written to the external system only after it is back >> up again. >> 2. Handling surges. There will be cases when the throughput may get a >> sudden surge for some period and the external system may not be fast enough >> for the writes to it. In those cases, by using reconciler, we are spooling >> the incoming tuples to queue/hdfs and then writing at the pace of external >> system. >> 3. Dag slowdown. Again in case of external system failure or slow >> connection, we do not want to block the windows moving forward. If the >> windows are blocked for a long time, then stram will unnecessarily kill the >> operator. Reconciler makes sure that the incoming messages are just >> queued/spooled to hdfs (external system is not blocking the dag), so the >> dag is not slowed down. >> >> Regards, >> Ashwin. >> >> On Thu, Dec 17, 2015 at 11:29 AM, Timothy Farkas >> wrote: >> >>> Yes that is true Chandni, and considering how slow HDFS is we should >> avoid >>> writing to it if we can. >>> >>> It would be great if someone could pick up the ticket :). >>> >>> On Thu, Dec 17, 2015 at 11:17 AM, Chandni Singh >> >>> wrote: >>> >>>> +1 for Tim's suggestion. >>>> >>>> Using reconciler employs always writing to HDFS and then read from >> that. >>>> Tim's suggestion is that we only write to hdfs when database connection >>> is >>>> down. This is analogous to spooling. >>>> >>>> Chandni >>>> >>>> On Thu, Dec 17, 2015 at 11:13 AM, Pramod Immaneni < >>> pramod@datatorrent.com> >>>> wrote: >>>> >>>>> Tim we have a pattern for this called Reconciler that Gaurav has also >>>>> mentioned. There are some examples for it in Malhar >>>>> >>>>> On Thu, Dec 17, 2015 at 9:47 AM, Timothy Farkas >> >>>>> wrote: >>>>> >>>>>> Hi All, >>>>>> >>>>>> One of our users is outputting to Cassandra, but they want to >> handle >>> a >>>>>> Cassandra failure or Cassandra down time gracefully from an output >>>>>> operator. Currently a lot of our database operators will just fail >>> and >>>>>> redeploy continually until the database comes back. This is a bad >>> idea >>>>> for >>>>>> a couple of reasons: >>>>>> >>>>>> 1 - We rely on buffer server spooling to prevent data loss. If the >>>>> database >>>>>> is down for a long time (several hours or a day) we may run out of >>>> space >>>>> to >>>>>> spool for buffer server since it spools to local disk, and data is >>>> purged >>>>>> only after a window is committed. Furthermore this buffer server >>>> problem >>>>>> will exist for all the Streaming Containers in the dag, not just >> the >>>> one >>>>>> immediately upstream from the output operator, since data is >> spooled >>> to >>>>>> disk for all operators and only removed for windows once a window >> is >>>>>> committed. >>>>>> >>>>>> 2 - If there is another failure further upstream in the dag, >> upstream >>>>>> operators will be redeployed to a checkpoint less than or equal to >>> the >>>>>> checkpoint of the database operator in the At leas once case. This >>>> could >>>>>> mean redoing several hours or a day worth of computation. >>>>>> >>>>>> We should support a mechanism to detect when the connection to a >>>> database >>>>>> is lost and then spool to hdfs using a WAL, and then write the >>> contents >>>>> of >>>>>> the WAL into the database once it comes back online. This will save >>> the >>>>>> local disk space of all the nodes used in the dag and allow it to >> be >>>> used >>>>>> for only the data being output to the output operator. >>>>>> >>>>>> Ticket here if anyone is interested in working on it: >>>>>> >>>>>> https://malhar.atlassian.net/browse/MLHR-1951 >>>>>> >>>>>> Thanks, >>>>>> Tim >> >> >> >> -- >> >> Regards, >> Ashwin. >>