Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 087D2200C63 for ; Thu, 27 Apr 2017 05:47:02 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 071BD160BB4; Thu, 27 Apr 2017 03:47:02 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 25596160BA8 for ; Thu, 27 Apr 2017 05:47:00 +0200 (CEST) Received: (qmail 13124 invoked by uid 500); 27 Apr 2017 03:47:00 -0000 Mailing-List: contact dev-help@apex.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@apex.apache.org Delivered-To: mailing list dev@apex.apache.org Received: (qmail 13111 invoked by uid 99); 27 Apr 2017 03:46:59 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 27 Apr 2017 03:46:59 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 542B4D5CC5 for ; Thu, 27 Apr 2017 03:46:59 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.49 X-Spam-Level: ** X-Spam-Status: No, score=2.49 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001, T_KAM_HTML_FONT_INVALID=0.01, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=datatorrent-com.20150623.gappssmtp.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id 19uk3C2rDMC8 for ; Thu, 27 Apr 2017 03:46:57 +0000 (UTC) Received: from mail-ua0-f181.google.com (mail-ua0-f181.google.com [209.85.217.181]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id D386F5F36B for ; Thu, 27 Apr 2017 03:46:56 +0000 (UTC) Received: by mail-ua0-f181.google.com with SMTP id 110so12369376uas.3 for ; Wed, 26 Apr 2017 20:46:56 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=datatorrent-com.20150623.gappssmtp.com; s=20150623; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=v5OxU+3mNrza9C8wG8RgU+BRu+GjcmE/lbn1q1PzKLM=; b=ZjizznTpLR7qZOFxC2vxLzJCkBaunZE7G0G6paU3R7c2pO5ovgRnKp+AsRoKM+ugWu m8rwe9IV0DWyWCZx6s9KC2igRYl8KCSXIOsUtJ3UWBRYZ9fDhd2dHtcWKXZbv4Sm5f+W NOL25LlrKRfBfyEZTnvy4curdkarGUUpY65g0WR/QOVDj8B518scoQTnHHrXPEIWOJfu 7dYODKQcfj3suv+lPk8H2q+uHdYP9m0P/m+RXKITacuN6RPuBaeviJSkpWsQ+MUIhj5l mGDF3/vvQnGfS1zaNgh6BJZXneKErQCjf1zKEpGeqq2bMPteBbBWbor9T802tmkvXe5E YqsA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=v5OxU+3mNrza9C8wG8RgU+BRu+GjcmE/lbn1q1PzKLM=; b=eVxvCj3HKOssyKR5+2iw2Yi1VdvCr2c7sHpxCZmdp8+tRzQ323Y0AUzDK+YIFQ2zLE kMKe9QAMa/kWspFUPL1Ylt5vjjW0oRfOJbkgAthMIoXWyUknH4WJ34wBjjEhmx0OEDBX 3OEoZ3qp16NOeNf8mEtw1W0LLEhIQcnzWUeUocphFIucykeOpKdlaxYS6baB53JJN7D6 fW/m0xHqZqNzXZiTysUGN3dnlUmD9cNSCZX9YM+zz3KdfeRte3hR/DAH+haGOLf4JtUd Oh5OlgQiGlA4J6k8Kc+5JHQlpIewQ1dqQwLNRODQicH4xr58ErACaaaCo/Ss38SMWqbd QtBA== X-Gm-Message-State: AN3rC/7KHh9594Va4PSGECyKAh0QhuISGUzUQ9TklNb8Kih4w9VQy13P 0qvUJRK+xwlEOAe2afi8jLnQmoQyCz2Jq6EBvQ== X-Received: by 10.176.3.241 with SMTP id 104mr1894909uau.120.1493264816203; Wed, 26 Apr 2017 20:46:56 -0700 (PDT) MIME-Version: 1.0 Received: by 10.103.109.5 with HTTP; Wed, 26 Apr 2017 20:46:35 -0700 (PDT) In-Reply-To: References: <62d105fd-ffd5-28cd-194d-d7465eaa706a@gmail.com> From: Bhupesh Chawda Date: Thu, 27 Apr 2017 09:16:35 +0530 Message-ID: Subject: Re: [Design] - Kudu Output Operator To: dev Content-Type: multipart/alternative; boundary=94eb2c062b9cecfff3054e1dce9f archived-at: Thu, 27 Apr 2017 03:47:02 -0000 --94eb2c062b9cecfff3054e1dce9f Content-Type: text/plain; charset=UTF-8 Hi, Will be merging the PR - https://github.com/apache/apex-malhar/pull/486 soon unless there are any comments from the community. ~ Bhupesh _______________________________________________________ Bhupesh Chawda E: bhupesh@datatorrent.com | Twitter: @bhupeshsc www.datatorrent.com | apex.apache.org On Sat, Apr 8, 2017 at 2:06 AM, Amol Kekre wrote: > Ananth, > This is good proposal. We will work with you. > > Thks > Amol > > > E:amol@datatorrent.com | M: 510-449-2606 | Twitter: @*amolhkekre* > > www.datatorrent.com > > > On Sat, Apr 1, 2017 at 4:29 PM, ananth wrote: > > > Hello All, > > > > I would like to the community's opinion on the implementation of Kudu > > output operator. A first cut implementation was made available in > November > > last year but I guess we did not get time to discuss this thoroughly on > the > > mailing list and hence the PR did not get merged. > > > > This operator would allow Apex to stream data into Kudu. A brief > > description of Kudu is here : https://kudu.apache.org/. This would allow > > at a high level the following use cases from Apex point of view: > > > > - Low latency writes into Kudu store that allows SQL queries on the Kudu > > store. This essentially means sub-second data updates available for SQL > > querying. As opposed to parquet styled data dumps which would ideally > need > > a few minutes to accumulate data to take advantage of Parquet formats, > this > > would make same second queries on very large datasets on Kudu with > Impala. > > > > - Another very interesting use cases would be to allow Kudu as a source > > store to stream based on SQL queries. The kudu input operator is another > > JIRA(https://issues.apache.org/jira/browse/APEXMALHAR-2472) and would be > > covering mechanisms to stream data from Kudu into Apex. This will bring > in > > interesting use cases like de-dupe and selective streaming and out of > band > > data in a different way if Kudu is part of the eco system in a given > setup. > > > > Here is the design of the Kudu output operator: > > > > > > 1. The operator would be an AbstractOperator and would allow the concrete > > implementations to set a few behavioral aspects of the operator. > > > > 2. The following are the major phases of the operator: > > > > During activate() phase of the operator : Establish a connection to the > > cluster and get the metadata about the table that is being used as the > sink. > > During setup() phase of the operator: Fetch the current window > information > > and use it decide if we are recovering from a failure mode. (See point 8 > > below ) > > During process() of Input port : Inspect the incoming ExecutionContext ( > > see below ) tuple and perform one of the operations ( > > Insert/Update/Delete/Upsert) > > 3. The following parameters are tunable while establishing a Kudu > > connection: > > Table name, Boss worker threads, Worker threads, Socket read time outs > and > > External Consistency mode. > > 4. The user need not specify any schema outright. The pojo fields are > > automatically mapped to the table column names as identified in the > schema > > parse in the activate phase. > > 5. Allow the concrete implementation of the operator to override the Pojo > > field name to the table schema column name. This would allow flexibility > in > > use cases like table schema column names are not compatible with java > bean > > frameworks or in situations when column names cant be controlled as POJO > is > > coming from an upstream operator. > > 6. The input tuple that is to be supplied to this operator is of type > > "Kudu Execution Context". This tuple encompasses the actual Pojo that is > > going to be persisted to the Kudu store. Additionally it allows the > > upstream operator to specify the operation that needs to be performed. > One > > of the following operations is permitted as part of the context : Insert, > > Upsert, Update and delete on the Pojo that is acting as the payload in > the > > Execution Context. > > 7. The concrete implementation of the operator would allow the user to > > specify the actual POJO class definition that would be used to the write > to > > the table. The execution context would contain this POJO as well as the > > metadata that defines the behavior of the processing that needs to be > done > > on that tuple. > > 8. The operator would allow for a special case of execution mode for the > > first window that is being processed as the operator gets activated. > There > > are two modes for the first window of processing of the operator : > > a. Safe Mode : Safe mode is the "happy path execution" as in no extra > > processing is required to perform the Kudu mutation. > > b. Reconciling Mode: There is an additional function that would be > > called to see if the user would like the tuple to be used for mutation. > > This mode is automatically set when OperatorContext.ACTIVATION_WINDOW_ID > > != Stateless.WINDOW_ID during the first window of processing by the > > operator. > > > > This feature is deemed to be useful when an operator is recovering from a > > crash instance of the application and we do not want to perform multiple > > mutations of the same tuple given ATLEAST_ONCE is the default semantics. > > > > 9. The operator is a stateless operator. > > 10. The operator would generate the following autometrics : > > a. Counts of Inserts, Upserts, Deletes and Updates (separate > counters > > for each mutation) for a given window > > b. Bytes written in a given window > > c. Write RPCs in the given window > > d. Total RPC errors in this window > > e. All of the above metrics for the operator for its entire lifetime > > of the operator. > > > > > > Could you please provide your thoughts if the above design looks good ? > > > > > > > > > > Regards, > > > > Ananth > > > > > --94eb2c062b9cecfff3054e1dce9f--