apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bryan Bende <bbe...@gmail.com>
Subject Re: [GitHub] incubator-apex-malhar pull request: MLHR-1936 Adding NiFi operator...
Date Tue, 26 Jan 2016 21:52:55 GMT
Seems like top-level comments on pull requests don't generate emails, so
sending this here just as an FYI...

I updated this pull request based on everyone's feedback. The biggest
change here is adding the WindowDataManager to ensure data is saved before
we complete a NiFi transaction, at which point it would be gone if we lost
it. I based this mostly off looking at the RabbitMQ operators, but let me
know if anything doesn't seem right.

Thanks,

Bryan

On Tue, Jan 26, 2016 at 4:46 PM, bbende <git@git.apache.org> wrote:

> Github user bbende commented on a diff in the pull request:
>
>
> https://github.com/apache/incubator-apex-malhar/pull/133#discussion_r50905617
>
>     --- Diff:
> contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiInputOperator.java
> ---
>     @@ -0,0 +1,162 @@
>     +/**
>     + * Licensed to the Apache Software Foundation (ASF) under one
>     + * or more contributor license agreements.  See the NOTICE file
>     + * distributed with this work for additional information
>     + * regarding copyright ownership.  The ASF licenses this file
>     + * to you under the Apache License, Version 2.0 (the
>     + * "License"); you may not use this file except in compliance
>     + * with the License.  You may obtain a copy of the License at
>     + *
>     + *   http://www.apache.org/licenses/LICENSE-2.0
>     + *
>     + * Unless required by applicable law or agreed to in writing,
>     + * software distributed under the License is distributed on an
>     + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>     + * KIND, either express or implied.  See the License for the
>     + * specific language governing permissions and limitations
>     + * under the License.
>     + */
>     +package com.datatorrent.contrib.nifi;
>     +
>     +import java.io.IOException;
>     +import java.util.ArrayList;
>     +import java.util.List;
>     +
>     +import org.slf4j.Logger;
>     +import org.slf4j.LoggerFactory;
>     +
>     +import org.apache.nifi.remote.Transaction;
>     +import org.apache.nifi.remote.TransferDirection;
>     +import org.apache.nifi.remote.client.SiteToSiteClient;
>     +import org.apache.nifi.remote.protocol.DataPacket;
>     +
>     +import com.datatorrent.api.Context;
>     +import com.datatorrent.api.InputOperator;
>     +
>     +/**
>     + * This is the base implementation of a NiFi input operator.&nbsp;
>     + * Subclasses should implement the methods which convert NiFi
> DataPackets to tuples and emit them.
>     + * <p>
>     + * Ports:<br>
>     + * <b>Input</b>: No input port<br>
>     + * <b>Output</b>: Can have any number of output ports<br>
>     + * <br>
>     + * Properties:<br>
>     + * None<br>
>     + * <br>
>     + * Compile time checks:<br>
>     + * Classes derived from this have to implement the abstract methods
> emitTuples(List<T> tuples)&nbsp;
>     + * and createTuple(DataPacket dp)<br>
>     + * <br>
>     + * Run time checks:<br>
>     + * None<br>
>     + * <br>
>     + * Benchmarks:<br>
>     + * TBD<br>
>     + * </p>
>     + *
>     + * @displayName Abstract NiFi Input
>     + * @category Messaging
>     + * @tags input operator
>     + * @since 3.3.0
>     + */
>     +
>     +public abstract class AbstractNiFiInputOperator<T> implements
> InputOperator
>     +{
>     +
>     +  private static final Logger LOGGER =
> LoggerFactory.getLogger(AbstractNiFiInputOperator.class);
>     +
>     +  private SiteToSiteClient client;
>     +  private final SiteToSiteClient.Builder siteToSiteBuilder;
>     +
>     +  /**
>     +   * @param siteToSiteBuilder the builder for a NiFi SiteToSiteClient
>     +   */
>     +  public AbstractNiFiInputOperator(final SiteToSiteClient.Builder
> siteToSiteBuilder)
>     +  {
>     +    this.siteToSiteBuilder = siteToSiteBuilder;
>     +  }
>     +
>     +  @Override
>     +  public void setup(Context.OperatorContext context)
>     +  {
>     +    this.client = siteToSiteBuilder.build();
>     +  }
>     +
>     +  @Override
>     +  public void teardown()
>     +  {
>     +    try {
>     +      client.close();
>     +    } catch (IOException e) {
>     +      LOGGER.error(e.getMessage(), e);
>     +    }
>     +  }
>     +
>     +  @Override
>     +  public void emitTuples()
>     +  {
>     +    try {
>     +      final Transaction transaction =
> client.createTransaction(TransferDirection.RECEIVE);
>     +      if (transaction == null) {
>     +        LOGGER.warn("A transaction could not be created,
> returning...");
>     +        return;
>     +      }
>     +
>     +      DataPacket dataPacket = transaction.receive();
>     +      if (dataPacket == null) {
>     +        transaction.confirm();
>     +        transaction.complete();
>     +        LOGGER.debug("No data available to pull, returning and will
> try again...");
>     +        return;
>     +      }
>     +
>     +      // read all of the available data packets and convert to the
> given type
>     +      final List<T> tuples = new ArrayList<>();
>     +      do {
>     +        tuples.add(createTuple(dataPacket));
>     +        dataPacket = transaction.receive();
>     +      } while (dataPacket != null);
>     --- End diff --
>
>     shouldn't be able to get an infinite loop... transaction.receive()
> will return null when there is no matter data to pull, or when it is has
> pulled the maximum number of data packets for a transaction (configured on
> the site-to-site client)
>
>
> ---
> If your project is set up for it, you can reply to this email and have your
> reply appear on GitHub as well. If your project does not have this feature
> enabled and wishes so, or if the feature is enabled but not working, please
> contact infrastructure at infrastructure@apache.org or file a JIRA ticket
> with INFRA.
> ---
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message