apex-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@x5h.eu
Subject Re: Apex and RabbitMQ problems with the input operator
Date Thu, 08 Jun 2017 08:05:18 GMT
Sorry the two Snippets below where from different iterations. The Error
I get is the following:

[ERROR]
/home/pi/apex/rabbitMQ/src/main/java/com/example/rabbitMQ/RabbitMQApplication.java:[31,38]
cannot find symbol
[ERROR]   symbol:   variable output
[ERROR]   location: variable rabbitInput of type
com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator

My Code is as follows:


package com.example.rabbitMQ;

import org.apache.hadoop.conf.Configuration;

import com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.DAG;
import com.datatorrent.lib.io.jms.JMSStringInputOperator;

@ApplicationAnnotation(name="RabbitMQ2HDFS")
public class RabbitMQApplication implements StreamingApplication
{

  @Override
  public void populateDAG(DAG dag, Configuration conf)
  {

    RabbitMQInputOperator rabbitInput =
dag.addOperator("Consumer",RabbitMQInputOperator.class);
    rabbitInput.setHost("localhost");
    rabbitInput.setPort(5672);
    rabbitInput.setExchange("");
    rabbitInput.setQueueName("hello");
    LineOutputOperator out = dag.addOperator("fileOut", new
LineOutputOperator());

    dag.addStream("data", rabbitInput.output, out.input);
}
}

Cheers

Manfred.



Am 08.06.2017 um 04:34 schrieb vikram patil:
> Hi,
> dag.addStream() is actually used to create stream of from one Operator output port to
other operators input port.
> RabbitMQInputOperator consumer = dag.addOperator("Consumer",
> RabbitMQInputOperator.class);
> dag.addStream("data", *rabbitInput*.output, out.input); 
> Looks like your operator name is incorrect? I see in your code snippet
> above, name of of RabbiMQInputOperator is *"Consumer".*
>
> In property name, you need to provide operator name you have specified
> in addOperator(*"NAME OF THE OPERATOR"*, RabbitMQInputOperator.class)
> api call.
>  
>      dt.operator.*rabbitMQIn*.prop.tuple_blast ( Syntax is correct
> correct given your operator name is correct ). 
>
> ( It should be dt.operator.*Consumer*.prop.tuple_blast based on your
> code snippet ).
>
> I think tests which are provided in the Apache Malhar are very
> detailed, they run in local mode as unit tests so we have mocked
> actual rabbitmq by custom message publisher. 
>
> For timebeing you set only queuename and hostname as
>
>    //  set your rabbitmq host . 
> consumer.setHost("localhost"); // set your rabbitmq port
> consumer.setPort(5672) // It depends on your rabbitmq producer
> configuration but by default // its default exchange with "" ( No Name
> is provided ). consumer.setExchange(""); // set your queue name
> consumer.setQueueName("YOUR_QUEUE_NAME")
>
> 	
>
>
> If its okay, could you please share code from your application.java
> and properties.xml here?
>
> Thanks,
> Vikram
>
>
> On Thu, Jun 8, 2017 at 12:32 AM, <apex@x5h.eu <mailto:apex@x5h.eu>> wrote:
>
>     Thanks very much for the help. The only problem left is that I
>     don't quite understand dag.addstream().
>
>     I tried this
>
>     RabbitMQInputOperator consumer = dag.addOperator("Consumer",
>     RabbitMQInputOperator.class);
>     dag.addStream("data", rabbitInput.output, out.input); 
>
>     but obviously this doesn't work. What I don't get is the
>     difference between the ActiveMQ example and the RabbitMQ example.
>     I looked over the test examples for RabbitMQ but don't seem to
>     understand the logic behind it.
>
>     Is this the correct wax to specify properties:
>       <property>
>         <name>dt.operator.rabbitMQIn.prop.tuple_blast</name>
>         <value>500</value>
>       </property>
>
>     Cheers
>     Manfred.
>
>
>     Am 07.06.2017 um 12:03 schrieb Vikram Patil:
>>     Yes, you would need Application.java which will be way to define a DAG
>>     for Apex Application.
>>
>>     Please have look at the code from following example to find out how to
>>     write JMS ActiveMQ based example:
>>
>>     https://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ
>>     <https://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ>
>>
>>     This is how you can instantiate RabbitMQINputOperator and to a dag.
>>     RabbitMQInputOperator consumer = dag.addOperator("Consumer",
>>     RabbitMQInputOperator.class);
>>
>>     https://stackoverflow.com/questions/42207495/how-to-use-apache-apex-malhar-rabbitmq-operator-in-dag
>>     <https://stackoverflow.com/questions/42207495/how-to-use-apache-apex-malhar-rabbitmq-operator-in-dag>
>>
>>     Following properties need to be specified in properties.xml
>>
>>     * Properties:<br>
>>     * <b>tuple_blast</b>: Number of tuples emitted in each burst<br>
>>     * <b>bufferSize</b>: Size of holding buffer<br>
>>     * <b>host</b>:the address for the consumer to connect to rabbitMQ
producer<br>
>>     * <b>exchange</b>:the exchange for the consumer to connect to rabbitMQ
>>     producer<br>
>>     * <b>exchangeType</b>:the exchangeType for the consumer to connect
to
>>     rabbitMQ producer<br>
>>     * <b>routingKey</b>:the routingKey for the consumer to connect to
>>     rabbitMQ producer<br>
>>     * <b>queueName</b>:the queueName for the consumer to connect to
>>     rabbitMQ producer<br>
>>     * <br>
>>
>>     Reference: https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
>>     <https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java>
>>
>>     Thanks,
>>     Vikram
>>
>>     On Wed, Jun 7, 2017 at 3:19 PM,  <apex@x5h.eu> <mailto:apex@x5h.eu>
wrote:
>>>     Hello,
>>>
>>>     I compiled the whole thing but now I don't know exactly how to get it
>>>     running in Apex. Do I need an application.java like in the tutorial? I do
>>>     have a simple RabbitMQ queue up and running on the server. How do I consume
>>>     the messages with Apex and write them to hdfs?
>>>
>>>     Cheers,
>>>
>>>     Manfred
>>>
>>>     Following steps were necessary to get the RabbitMq test to compile
>>>
>>>     @TimeoutException
>>>     import java.util.concurrent.TimeoutException;
>>>     public void setup() throws IOException,TimeoutException
>>>     public void teardown() throws IOException,TimeoutException
>>>     protected void runTest(final int testNum) throws IOException
>>>
>>>     @Build jars
>>>     cd apex-malhar/contrib/
>>>     mvn clean package -DskipTests
>>>
>>>     cd apex-malhar/library/
>>>     mvn clean package -DskipTests
>>>     copy packages to project directory
>>>
>>>     @Link them to the project
>>>     Add following lines to the pom.xml
>>>     <dependency>
>>>         <groupId>contrib</groupId>
>>>         <artifactId>com.datatorrent.contrib.helper</artifactId>
>>>         <version>1.0</version>
>>>         <scope>system</scope>
>>>
>>>     <systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT-tests.jar</systemPath>
>>>     </dependency>
>>>     <dependency>
>>>         <groupId>lib</groupId>
>>>         <artifactId>com.datatorrent.lib.helper</artifactId>
>>>         <version>1.0</version>
>>>         <scope>system</scope>
>>>
>>>     <systemPath>${project.basedir}/src/main/resources/malhar-library-3.8.0-SNAPSHOT-tests.jar</systemPath>
>>>     </dependency>
>>>     <dependency>
>>>         <groupId>contrib</groupId>
>>>         <artifactId>com.datatorrent.contrib.rabbitmq</artifactId>
>>>         <version>1.0</version>
>>>         <scope>system</scope>
>>>
>>>     <systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT.jar</systemPath>
>>>     </dependency>
>>>     <dependency>
>>>         <groupId>Attribute</groupId>
>>>         <artifactId>com.datatorrent.api.Attribute.AttributeMap</artifactId>
>>>         <version>1.0</version>
>>>         <scope>system</scope>
>>>
>>>     <systemPath>${project.basedir}/src/main/resources/apex-api-3.7.0-SNAPSHOT.jar</systemPath>
>>>     </dependency>
>>>
>>>
>>>     Am 31.05.2017 um 18:57 schrieb Sanjay Pujare:
>>>
>>>     Both com.datatorrent.contrib.helper and  com.datatorrent.lib.helper are
>>>     under the test directory under malhar-contrib and malhar-library
>>>     respectively. You may need to build these jars yourself with test scope to
>>>     include these packages.
>>>
>>>     On Wed, May 31, 2017 at 9:39 AM, <apex@x5h.eu> <mailto:apex@x5h.eu>
wrote:
>>>>     Hello, (mea culpa for messing up the headline the first time)
>>>>
>>>>     I'm currently trying to get the apex-malhar rabbitmq running. But I'm
at a
>>>>     complete loss, while the examples are running fine I don't even get the
>>>>     RabbitMQInputOperatorTest.java to run.
>>>>
>>>>     First it couldn't find the rabbitmq-client which was solveable by adding
>>>>     the dependency:
>>>>
>>>>     <dependency>
>>>>         <groupId>com.rabbitmq</groupId>
>>>>         <artifactId>amqp-client</artifactId>
>>>>         <version>4.1.0</version>
>>>>       </dependency>
>>>>
>>>>     But now it doesn't find the packages com.datatorrent.contrib.helper and
>>>>     com.datatorrent.lib.helper and can't find several symbols.
>>>>
>>>>     Needless to say that I'm a beginner regarding Apex so does anyone know
>>>>     what exactly I'm doing wrong here?
>>>>
>>>>     Cheers
>>>>
>>>>     Manfred.
>>>>
>>>>
>
>


Mime
View raw message