Return-Path: X-Original-To: apmail-apex-dev-archive@minotaur.apache.org Delivered-To: apmail-apex-dev-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8CEEE182C4 for ; Tue, 26 Jan 2016 21:53:07 +0000 (UTC) Received: (qmail 93966 invoked by uid 500); 26 Jan 2016 21:53:07 -0000 Delivered-To: apmail-apex-dev-archive@apex.apache.org Received: (qmail 93903 invoked by uid 500); 26 Jan 2016 21:53:07 -0000 Mailing-List: contact dev-help@apex.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@apex.incubator.apache.org Delivered-To: mailing list dev@apex.incubator.apache.org Received: (qmail 93889 invoked by uid 99); 26 Jan 2016 21:53:07 -0000 Received: from Unknown (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 26 Jan 2016 21:53:07 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 90D8BC0D93 for ; Tue, 26 Jan 2016 21:53:06 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.899 X-Spam-Level: ** X-Spam-Status: No, score=2.899 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=3, RCVD_IN_MSPIKE_H2=-0.001, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id epaCN348lvGV for ; Tue, 26 Jan 2016 21:53:02 +0000 (UTC) Received: from mail-vk0-f47.google.com (mail-vk0-f47.google.com [209.85.213.47]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with ESMTPS id 706F343B36 for ; Tue, 26 Jan 2016 21:53:02 +0000 (UTC) Received: by mail-vk0-f47.google.com with SMTP id n1so100144576vkb.3 for ; Tue, 26 Jan 2016 13:53:02 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :content-type; bh=ag1TK7ZQagAWVlJgTdaLBuMcYrM3O5pFSjDiBKPDjGs=; b=KvxOFE9ICFilS+d/hWl7lEQzNqjUosyH7G48DDXuBun92eAlZXY1SR8eNR/FSH53JC fDUYpQWO+8ZTtKUPxe6fJBebNDhzR11iJbKYX7UVgAvfIJ+pH2GSA4Za7x4RLBvm+WZH kfuQTqYFtDaFzM5qNEfUOGm8GEXjdPixKoM/rAcnE+po5x3vkOHPFIowOaiOH90/OrKh +AHZEvBcfVDfYGSwNpQmK9MJkeXQjh5sEQbDpooHct66N5i5Aang2mTjLi2xKOTVOxnY OAPSZ6BcBZFQacnANcKOhc34dLvCev9rXhQq62os/GMU1+jKTnnzlMHnUe4lZ5179IBn 6qUw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:date :message-id:subject:from:to:content-type; bh=ag1TK7ZQagAWVlJgTdaLBuMcYrM3O5pFSjDiBKPDjGs=; b=HbV7It2pddFL+xCqer/tm2EWr1yBRUzZ9DeYyR4isdKXzPED5TC0iX93Pd0SJQxB0r 6+rHc87tUSqZR9mRjNwXiD5ZzleyffZRRrEx4LYmefGD2R7uTVFYSMLLyjvo2pLeMhZs zOL+4FTGJkC2EdhDVjjKRDRz+xdasq9aGfJiXYPe2aEJ/hOMib18QJuGMkVqFiT1Rntc gf7vvGPWa+Jv6q/47YtbS9CV9MqdA1uVenR67JQHmEQ0d+ca5q73qB7QCJFiQAq7vpBt 3bY/U0YE8G3mAmszuPzH30TwJOQnZSSNzcBdLkoNnLFJhsHsaXT5LdzSU1c8GqBC0sCb S0bg== X-Gm-Message-State: AG10YOSO+Fa8soLwLIHTn1q2Ro5avCWzFuZLQX11BocVTZpDXwYNYbMtLe1CHbHXauoZUwbJeDmk/PyXSrnnrg== MIME-Version: 1.0 X-Received: by 10.31.52.68 with SMTP id b65mr16225765vka.150.1453845175793; Tue, 26 Jan 2016 13:52:55 -0800 (PST) Received: by 10.31.162.83 with HTTP; Tue, 26 Jan 2016 13:52:55 -0800 (PST) In-Reply-To: <20160126214640.D4D99E00D6@git1-us-west.apache.org> References: <20160126214640.D4D99E00D6@git1-us-west.apache.org> Date: Tue, 26 Jan 2016 16:52:55 -0500 Message-ID: Subject: Re: [GitHub] incubator-apex-malhar pull request: MLHR-1936 Adding NiFi operator... From: Bryan Bende To: dev@apex.incubator.apache.org Content-Type: multipart/alternative; boundary=001a1143ff3442fd41052a43b524 --001a1143ff3442fd41052a43b524 Content-Type: text/plain; charset=UTF-8 Seems like top-level comments on pull requests don't generate emails, so sending this here just as an FYI... I updated this pull request based on everyone's feedback. The biggest change here is adding the WindowDataManager to ensure data is saved before we complete a NiFi transaction, at which point it would be gone if we lost it. I based this mostly off looking at the RabbitMQ operators, but let me know if anything doesn't seem right. Thanks, Bryan On Tue, Jan 26, 2016 at 4:46 PM, bbende wrote: > Github user bbende commented on a diff in the pull request: > > > https://github.com/apache/incubator-apex-malhar/pull/133#discussion_r50905617 > > --- Diff: > contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiInputOperator.java > --- > @@ -0,0 +1,162 @@ > +/** > + * Licensed to the Apache Software Foundation (ASF) under one > + * or more contributor license agreements. See the NOTICE file > + * distributed with this work for additional information > + * regarding copyright ownership. The ASF licenses this file > + * to you under the Apache License, Version 2.0 (the > + * "License"); you may not use this file except in compliance > + * with the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, > + * software distributed under the License is distributed on an > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY > + * KIND, either express or implied. See the License for the > + * specific language governing permissions and limitations > + * under the License. > + */ > +package com.datatorrent.contrib.nifi; > + > +import java.io.IOException; > +import java.util.ArrayList; > +import java.util.List; > + > +import org.slf4j.Logger; > +import org.slf4j.LoggerFactory; > + > +import org.apache.nifi.remote.Transaction; > +import org.apache.nifi.remote.TransferDirection; > +import org.apache.nifi.remote.client.SiteToSiteClient; > +import org.apache.nifi.remote.protocol.DataPacket; > + > +import com.datatorrent.api.Context; > +import com.datatorrent.api.InputOperator; > + > +/** > + * This is the base implementation of a NiFi input operator.  > + * Subclasses should implement the methods which convert NiFi > DataPackets to tuples and emit them. > + *

> + * Ports:
> + * Input: No input port
> + * Output: Can have any number of output ports
> + *
> + * Properties:
> + * None
> + *
> + * Compile time checks:
> + * Classes derived from this have to implement the abstract methods > emitTuples(List tuples)  > + * and createTuple(DataPacket dp)
> + *
> + * Run time checks:
> + * None
> + *
> + * Benchmarks:
> + * TBD
> + *

> + * > + * @displayName Abstract NiFi Input > + * @category Messaging > + * @tags input operator > + * @since 3.3.0 > + */ > + > +public abstract class AbstractNiFiInputOperator implements > InputOperator > +{ > + > + private static final Logger LOGGER = > LoggerFactory.getLogger(AbstractNiFiInputOperator.class); > + > + private SiteToSiteClient client; > + private final SiteToSiteClient.Builder siteToSiteBuilder; > + > + /** > + * @param siteToSiteBuilder the builder for a NiFi SiteToSiteClient > + */ > + public AbstractNiFiInputOperator(final SiteToSiteClient.Builder > siteToSiteBuilder) > + { > + this.siteToSiteBuilder = siteToSiteBuilder; > + } > + > + @Override > + public void setup(Context.OperatorContext context) > + { > + this.client = siteToSiteBuilder.build(); > + } > + > + @Override > + public void teardown() > + { > + try { > + client.close(); > + } catch (IOException e) { > + LOGGER.error(e.getMessage(), e); > + } > + } > + > + @Override > + public void emitTuples() > + { > + try { > + final Transaction transaction = > client.createTransaction(TransferDirection.RECEIVE); > + if (transaction == null) { > + LOGGER.warn("A transaction could not be created, > returning..."); > + return; > + } > + > + DataPacket dataPacket = transaction.receive(); > + if (dataPacket == null) { > + transaction.confirm(); > + transaction.complete(); > + LOGGER.debug("No data available to pull, returning and will > try again..."); > + return; > + } > + > + // read all of the available data packets and convert to the > given type > + final List tuples = new ArrayList<>(); > + do { > + tuples.add(createTuple(dataPacket)); > + dataPacket = transaction.receive(); > + } while (dataPacket != null); > --- End diff -- > > shouldn't be able to get an infinite loop... transaction.receive() > will return null when there is no matter data to pull, or when it is has > pulled the maximum number of data packets for a transaction (configured on > the site-to-site client) > > > --- > If your project is set up for it, you can reply to this email and have your > reply appear on GitHub as well. If your project does not have this feature > enabled and wishes so, or if the feature is enabled but not working, please > contact infrastructure at infrastructure@apache.org or file a JIRA ticket > with INFRA. > --- > --001a1143ff3442fd41052a43b524--