Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 63FA3200C8E for ; Thu, 8 Jun 2017 10:05:41 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 60A43160BD5; Thu, 8 Jun 2017 08:05:41 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 0C89C160BCA for ; Thu, 8 Jun 2017 10:05:39 +0200 (CEST) Received: (qmail 55396 invoked by uid 500); 8 Jun 2017 08:05:39 -0000 Mailing-List: contact users-help@apex.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: users@apex.apache.org Delivered-To: mailing list users@apex.apache.org Received: (qmail 55386 invoked by uid 99); 8 Jun 2017 08:05:38 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 08 Jun 2017 08:05:38 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 64E781A7B5F for ; Thu, 8 Jun 2017 08:05:38 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2 X-Spam-Level: ** X-Spam-Status: No, score=2 tagged_above=-999 required=6.31 tests=[HTML_MESSAGE=2, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id HRiXPo69ksjg for ; Thu, 8 Jun 2017 08:05:31 +0000 (UTC) Received: from dest-unreachable.net (mail.dest-unreachable.net [78.41.116.115]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 085D35F6BF for ; Thu, 8 Jun 2017 08:05:30 +0000 (UTC) Received: from dest-unreachable.net (localhost [127.0.0.1]) by dest-unreachable.net (Postfix) with ESMTP id C38C140161C7 for ; Thu, 8 Jun 2017 10:05:30 +0200 (CEST) Received: by dest-unreachable.net (Postfix, from userid 109) id B4F0340161CA; Thu, 8 Jun 2017 10:05:30 +0200 (CEST) Received: from [192.168.33.36] (80-108-7-90.cable.dynamic.surfer.at [80.108.7.90]) (using TLSv1.2 with cipher ECDHE-RSA-AES128-GCM-SHA256 (128/128 bits)) (No client certificate requested) by dest-unreachable.net (Postfix) with ESMTPSA id 6859E40161C7 for ; Thu, 8 Jun 2017 10:05:24 +0200 (CEST) From: apex@x5h.eu Subject: Re: Apex and RabbitMQ problems with the input operator To: users@apex.apache.org References: <271b9a08-2274-f19e-e21f-73bbfb35bf28@x5h.eu> <433bcae7-2d27-3d83-70f9-4e51eb34f68f@x5h.eu> Message-ID: Date: Thu, 8 Jun 2017 10:05:18 +0200 User-Agent: Mozilla/5.0 (Windows NT 10.0; WOW64; rv:52.0) Gecko/20100101 Thunderbird/52.1.1 MIME-Version: 1.0 In-Reply-To: Content-Type: multipart/alternative; boundary="------------96A02F9EC7451A493718A1B4" Content-Language: en-GB X-Virus-Scanned: ClamAV using ClamSMTP archived-at: Thu, 08 Jun 2017 08:05:41 -0000 This is a multi-part message in MIME format. --------------96A02F9EC7451A493718A1B4 Content-Type: text/plain; charset=utf-8 X-Clacks-Overhead: GNU Terry Pratchett Content-Transfer-Encoding: 7bit Sorry the two Snippets below where from different iterations. The Error I get is the following: [ERROR] /home/pi/apex/rabbitMQ/src/main/java/com/example/rabbitMQ/RabbitMQApplication.java:[31,38] cannot find symbol [ERROR] symbol: variable output [ERROR] location: variable rabbitInput of type com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator My Code is as follows: package com.example.rabbitMQ; import org.apache.hadoop.conf.Configuration; import com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator; import com.datatorrent.api.annotation.ApplicationAnnotation; import com.datatorrent.api.StreamingApplication; import com.datatorrent.api.DAG; import com.datatorrent.lib.io.jms.JMSStringInputOperator; @ApplicationAnnotation(name="RabbitMQ2HDFS") public class RabbitMQApplication implements StreamingApplication { @Override public void populateDAG(DAG dag, Configuration conf) { RabbitMQInputOperator rabbitInput = dag.addOperator("Consumer",RabbitMQInputOperator.class); rabbitInput.setHost("localhost"); rabbitInput.setPort(5672); rabbitInput.setExchange(""); rabbitInput.setQueueName("hello"); LineOutputOperator out = dag.addOperator("fileOut", new LineOutputOperator()); dag.addStream("data", rabbitInput.output, out.input); } } Cheers Manfred. Am 08.06.2017 um 04:34 schrieb vikram patil: > Hi, > dag.addStream() is actually used to create stream of from one Operator output port to other operators input port. > RabbitMQInputOperator consumer = dag.addOperator("Consumer", > RabbitMQInputOperator.class); > dag.addStream("data", *rabbitInput*.output, out.input); > Looks like your operator name is incorrect? I see in your code snippet > above, name of of RabbiMQInputOperator is *"Consumer".* > > In property name, you need to provide operator name you have specified > in addOperator(*"NAME OF THE OPERATOR"*, RabbitMQInputOperator.class) > api call. > > dt.operator.*rabbitMQIn*.prop.tuple_blast ( Syntax is correct > correct given your operator name is correct ). > > ( It should be dt.operator.*Consumer*.prop.tuple_blast based on your > code snippet ). > > I think tests which are provided in the Apache Malhar are very > detailed, they run in local mode as unit tests so we have mocked > actual rabbitmq by custom message publisher. > > For timebeing you set only queuename and hostname as > > // set your rabbitmq host . > consumer.setHost("localhost"); // set your rabbitmq port > consumer.setPort(5672) // It depends on your rabbitmq producer > configuration but by default // its default exchange with "" ( No Name > is provided ). consumer.setExchange(""); // set your queue name > consumer.setQueueName("YOUR_QUEUE_NAME") > > > > > If its okay, could you please share code from your application.java > and properties.xml here? > > Thanks, > Vikram > > > On Thu, Jun 8, 2017 at 12:32 AM, > wrote: > > Thanks very much for the help. The only problem left is that I > don't quite understand dag.addstream(). > > I tried this > > RabbitMQInputOperator consumer = dag.addOperator("Consumer", > RabbitMQInputOperator.class); > dag.addStream("data", rabbitInput.output, out.input); > > but obviously this doesn't work. What I don't get is the > difference between the ActiveMQ example and the RabbitMQ example. > I looked over the test examples for RabbitMQ but don't seem to > understand the logic behind it. > > Is this the correct wax to specify properties: > > dt.operator.rabbitMQIn.prop.tuple_blast > 500 > > > Cheers > Manfred. > > > Am 07.06.2017 um 12:03 schrieb Vikram Patil: >> Yes, you would need Application.java which will be way to define a DAG >> for Apex Application. >> >> Please have look at the code from following example to find out how to >> write JMS ActiveMQ based example: >> >> https://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ >> >> >> This is how you can instantiate RabbitMQINputOperator and to a dag. >> RabbitMQInputOperator consumer = dag.addOperator("Consumer", >> RabbitMQInputOperator.class); >> >> https://stackoverflow.com/questions/42207495/how-to-use-apache-apex-malhar-rabbitmq-operator-in-dag >> >> >> Following properties need to be specified in properties.xml >> >> * Properties:
>> * tuple_blast: Number of tuples emitted in each burst
>> * bufferSize: Size of holding buffer
>> * host:the address for the consumer to connect to rabbitMQ producer
>> * exchange:the exchange for the consumer to connect to rabbitMQ >> producer
>> * exchangeType:the exchangeType for the consumer to connect to >> rabbitMQ producer
>> * routingKey:the routingKey for the consumer to connect to >> rabbitMQ producer
>> * queueName:the queueName for the consumer to connect to >> rabbitMQ producer
>> *
>> >> Reference: https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java >> >> >> Thanks, >> Vikram >> >> On Wed, Jun 7, 2017 at 3:19 PM, wrote: >>> Hello, >>> >>> I compiled the whole thing but now I don't know exactly how to get it >>> running in Apex. Do I need an application.java like in the tutorial? I do >>> have a simple RabbitMQ queue up and running on the server. How do I consume >>> the messages with Apex and write them to hdfs? >>> >>> Cheers, >>> >>> Manfred >>> >>> Following steps were necessary to get the RabbitMq test to compile >>> >>> @TimeoutException >>> import java.util.concurrent.TimeoutException; >>> public void setup() throws IOException,TimeoutException >>> public void teardown() throws IOException,TimeoutException >>> protected void runTest(final int testNum) throws IOException >>> >>> @Build jars >>> cd apex-malhar/contrib/ >>> mvn clean package -DskipTests >>> >>> cd apex-malhar/library/ >>> mvn clean package -DskipTests >>> copy packages to project directory >>> >>> @Link them to the project >>> Add following lines to the pom.xml >>> >>> contrib >>> com.datatorrent.contrib.helper >>> 1.0 >>> system >>> >>> ${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT-tests.jar >>> >>> >>> lib >>> com.datatorrent.lib.helper >>> 1.0 >>> system >>> >>> ${project.basedir}/src/main/resources/malhar-library-3.8.0-SNAPSHOT-tests.jar >>> >>> >>> contrib >>> com.datatorrent.contrib.rabbitmq >>> 1.0 >>> system >>> >>> ${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT.jar >>> >>> >>> Attribute >>> com.datatorrent.api.Attribute.AttributeMap >>> 1.0 >>> system >>> >>> ${project.basedir}/src/main/resources/apex-api-3.7.0-SNAPSHOT.jar >>> >>> >>> >>> Am 31.05.2017 um 18:57 schrieb Sanjay Pujare: >>> >>> Both com.datatorrent.contrib.helper and com.datatorrent.lib.helper are >>> under the test directory under malhar-contrib and malhar-library >>> respectively. You may need to build these jars yourself with test scope to >>> include these packages. >>> >>> On Wed, May 31, 2017 at 9:39 AM, wrote: >>>> Hello, (mea culpa for messing up the headline the first time) >>>> >>>> I'm currently trying to get the apex-malhar rabbitmq running. But I'm at a >>>> complete loss, while the examples are running fine I don't even get the >>>> RabbitMQInputOperatorTest.java to run. >>>> >>>> First it couldn't find the rabbitmq-client which was solveable by adding >>>> the dependency: >>>> >>>> >>>> com.rabbitmq >>>> amqp-client >>>> 4.1.0 >>>> >>>> >>>> But now it doesn't find the packages com.datatorrent.contrib.helper and >>>> com.datatorrent.lib.helper and can't find several symbols. >>>> >>>> Needless to say that I'm a beginner regarding Apex so does anyone know >>>> what exactly I'm doing wrong here? >>>> >>>> Cheers >>>> >>>> Manfred. >>>> >>>> > > --------------96A02F9EC7451A493718A1B4 Content-Type: text/html; charset=utf-8 X-Clacks-Overhead: GNU Terry Pratchett Content-Transfer-Encoding: 8bit

Sorry the two Snippets below where from different iterations. The Error I get is the following:

[ERROR] /home/pi/apex/rabbitMQ/src/main/java/com/example/rabbitMQ/RabbitMQApplication.java:[31,38] cannot find symbol
[ERROR]   symbol:   variable output
[ERROR]   location: variable rabbitInput of type com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator

My Code is as follows:


package com.example.rabbitMQ;

import org.apache.hadoop.conf.Configuration;

import com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.DAG;
import com.datatorrent.lib.io.jms.JMSStringInputOperator;

@ApplicationAnnotation(name="RabbitMQ2HDFS")
public class RabbitMQApplication implements StreamingApplication
{

  @Override
  public void populateDAG(DAG dag, Configuration conf)
  {

    RabbitMQInputOperator rabbitInput = dag.addOperator("Consumer",RabbitMQInputOperator.class);
    rabbitInput.setHost("localhost");
    rabbitInput.setPort(5672);
    rabbitInput.setExchange("");
    rabbitInput.setQueueName("hello");
    LineOutputOperator out = dag.addOperator("fileOut", new LineOutputOperator());

    dag.addStream("data", rabbitInput.output, out.input);
}
}


Cheers

Manfred.



Am 08.06.2017 um 04:34 schrieb vikram patil:
Hi,
dag.addStream() is actually used to create stream of from one Operator output port to other operators input port.
RabbitMQInputOperator consumer = dag.addOperator("Consumer",
RabbitMQInputOperator.class);
dag.addStream("data", rabbitInput.output, out.input); 
Looks like your operator name is incorrect? I see in your code snippet above, name of of RabbiMQInputOperator is "Consumer".

In property name, you need to provide operator name you have specified in addOperator("NAME OF THE OPERATOR", RabbitMQInputOperator.class) api call.
 
     dt.operator.rabbitMQIn.prop.tuple_blast ( Syntax is correct correct given your operator name is correct ). 

( It should be 
dt.operator.Consumer.prop.tuple_blast based on your code snippet ).

I think tests which are provided in the Apache Malhar are very detailed, they run in local mode as unit tests so we have mocked actual rabbitmq by custom message publisher. 

For timebeing you set only queuename and hostname as

   //  set your rabbitmq host . 
consumer.setHost("localhost"); // set your rabbitmq port consumer.setPort(5672) // It depends on your rabbitmq producer configuration but by default // its default exchange with "" ( No Name is provided ). consumer.setExchange(""); // set your queue name consumer.setQueueName("YOUR_QUEUE_NAME")


If its okay, could you please share code from your application.java and properties.xml here?

Thanks,
Vikram


On Thu, Jun 8, 2017 at 12:32 AM, <apex@x5h.eu> wrote:

Thanks very much for the help. The only problem left is that I don't quite understand dag.addstream().

I tried this

RabbitMQInputOperator consumer = dag.addOperator("Consumer",
RabbitMQInputOperator.class);
dag.addStream("data", rabbitInput.output, out.input); 

but obviously this doesn't work. What I don't get is the difference between the ActiveMQ example and the RabbitMQ example. I looked over the test examples for RabbitMQ but don't seem to understand the logic behind it.

Is this the correct wax to specify properties:
  <property>
    <name>dt.operator.rabbitMQIn.prop.tuple_blast</name>
    <value>500</value>
  </property>

Cheers
Manfred.


Am 07.06.2017 um 12:03 schrieb Vikram Patil:
Yes, you would need Application.java which will be way to define a DAG
for Apex Application.

Please have look at the code from following example to find out how to
write JMS ActiveMQ based example:

https://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ

This is how you can instantiate RabbitMQINputOperator and to a dag.
RabbitMQInputOperator consumer = dag.addOperator("Consumer",
RabbitMQInputOperator.class);

https://stackoverflow.com/questions/42207495/how-to-use-apache-apex-malhar-rabbitmq-operator-in-dag

Following properties need to be specified in properties.xml

* Properties:<br>
* <b>tuple_blast</b>: Number of tuples emitted in each burst<br>
* <b>bufferSize</b>: Size of holding buffer<br>
* <b>host</b>:the address for the consumer to connect to rabbitMQ producer<br>
* <b>exchange</b>:the exchange for the consumer to connect to rabbitMQ
producer<br>
* <b>exchangeType</b>:the exchangeType for the consumer to connect to
rabbitMQ producer<br>
* <b>routingKey</b>:the routingKey for the consumer to connect to
rabbitMQ producer<br>
* <b>queueName</b>:the queueName for the consumer to connect to
rabbitMQ producer<br>
* <br>

Reference: https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java

Thanks,
Vikram

On Wed, Jun 7, 2017 at 3:19 PM,  <apex@x5h.eu> wrote:
Hello,

I compiled the whole thing but now I don't know exactly how to get it
running in Apex. Do I need an application.java like in the tutorial? I do
have a simple RabbitMQ queue up and running on the server. How do I consume
the messages with Apex and write them to hdfs?

Cheers,

Manfred

Following steps were necessary to get the RabbitMq test to compile

@TimeoutException
import java.util.concurrent.TimeoutException;
public void setup() throws IOException,TimeoutException
public void teardown() throws IOException,TimeoutException
protected void runTest(final int testNum) throws IOException

@Build jars
cd apex-malhar/contrib/
mvn clean package -DskipTests

cd apex-malhar/library/
mvn clean package -DskipTests
copy packages to project directory

@Link them to the project
Add following lines to the pom.xml
<dependency>
    <groupId>contrib</groupId>
    <artifactId>com.datatorrent.contrib.helper</artifactId>
    <version>1.0</version>
    <scope>system</scope>

<systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT-tests.jar</systemPath>
</dependency>
<dependency>
    <groupId>lib</groupId>
    <artifactId>com.datatorrent.lib.helper</artifactId>
    <version>1.0</version>
    <scope>system</scope>

<systemPath>${project.basedir}/src/main/resources/malhar-library-3.8.0-SNAPSHOT-tests.jar</systemPath>
</dependency>
<dependency>
    <groupId>contrib</groupId>
    <artifactId>com.datatorrent.contrib.rabbitmq</artifactId>
    <version>1.0</version>
    <scope>system</scope>

<systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT.jar</systemPath>
</dependency>
<dependency>
    <groupId>Attribute</groupId>
    <artifactId>com.datatorrent.api.Attribute.AttributeMap</artifactId>
    <version>1.0</version>
    <scope>system</scope>

<systemPath>${project.basedir}/src/main/resources/apex-api-3.7.0-SNAPSHOT.jar</systemPath>
</dependency>


Am 31.05.2017 um 18:57 schrieb Sanjay Pujare:

Both com.datatorrent.contrib.helper and  com.datatorrent.lib.helper are
under the test directory under malhar-contrib and malhar-library
respectively. You may need to build these jars yourself with test scope to
include these packages.

On Wed, May 31, 2017 at 9:39 AM, <apex@x5h.eu> wrote:
Hello, (mea culpa for messing up the headline the first time)

I'm currently trying to get the apex-malhar rabbitmq running. But I'm at a
complete loss, while the examples are running fine I don't even get the
RabbitMQInputOperatorTest.java to run.

First it couldn't find the rabbitmq-client which was solveable by adding
the dependency:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.1.0</version>
  </dependency>

But now it doesn't find the packages com.datatorrent.contrib.helper and
com.datatorrent.lib.helper and can't find several symbols.

Needless to say that I'm a beginner regarding Apex so does anyone know
what exactly I'm doing wrong here?

Cheers

Manfred.





--------------96A02F9EC7451A493718A1B4--