apex-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@x5h.eu
Subject Re: Apex and RabbitMQ problems with the input operator
Date Fri, 09 Jun 2017 13:12:50 GMT
Finally got rid of all errors but now I have the problem that the apex
application does not seem to register at the RabbitMQ exchange.

This is my code:

@ApplicationAnnotation(name="RabbitMQ2HDFS")
public class RabbitMQApplication implements StreamingApplication
{

  @Override
  public void populateDAG(DAG dag, Configuration conf)
  {
    RabbitMQInputOperator in = dag.addOperator("rabbitInput",new
RabbitMQInputOperator());
    in.setHost("localhost");
    //in.setPort(5672);
    in.setExchange("apex");
    in.setExchangeType("fanout");
    ConsoleOutputOperator console = dag.addOperator("console", new
ConsoleOutputOperator());
    dag.addStream("rand_console",in.outputPort, console.input);

}

}

If I launch the application everthing executes without an error but if i
list the bindings on the exchange, there is none.

Anyone even an idea how i can start to debug this?

Cheers
Manfred.


Am 08.06.2017 um 18:04 schrieb apex@x5h.eu:
>
> Okay i found the error, I copied the LineOutputOperator.java
> <https://github.com/DataTorrent/examples/blob/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ/LineOutputOperator.java>
> from the jmsActiveMQ example and found  there
> public class LineOutputOperator extends AbstractFileOutputOperator<String>
>
> Instead i took the LineOutputOperator.java 
> <https://github.com/DataTorrent/examples/blob/master/tutorials/kafka/src/main/java/com/example/myapexapp/LineOutputOperator.java>from
> the Kafka 0.9 example there the class is correctly defined for the
> RabbitMQInputOperator
>
> So far so good now it compiles without errors.
>
> Cheers
>
> Manfred
>
> Am 08.06.2017 um 17:38 schrieb apex@x5h.eu:
>>
>> I still don't get it completely: (The rest of the code is in the
>> Email before, this is only the necessary sample)
>>
>>  1. dag.addStream("test", rabbitInput.output, out.input);
>>     Results in the following error:
>>     [ERROR]   symbol:   variable output
>>     [ERROR]   location: variable rabbitInput of type
>>     com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator
>>
>>  2. dag.addStream("test", rabbitInput.outputPort, out.input);
>>     Results in the following error:
>>     [ERROR]
>>     /home/pi/apex/rabbitMQ/src/main/java/com/example/rabbitMQ/RabbitMQApplication.java:[31,8]
>>     no suitable method found for
>>     addStream(java.lang.String,com.datatorrent.api.DefaultOutputPort<byte[]>,com.datatorrent.api.DefaultInputPort<java.lang.String>)
>>     [ERROR]     method
>>     com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<?
>>     extends T>,com.datatorrent.api.Operator.InputPort<? super T>...)
>>     is not applicable
>>     [ERROR]       (inferred type does not conform to upper bound(s)
>>     [ERROR]         inferred: byte[]
>>     [ERROR]         upper bound(s): java.lang.String,java.lang.Object)
>>     [ERROR]     method
>>     com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<?
>>     extends T>,com.datatorrent.api.Operator.InputPort<? super T>) is
>>     not applicable
>>     [ERROR]       (inferred type does not conform to upper bound(s)
>>     [ERROR]         inferred: byte[]
>>     [ERROR]         upper bound(s): java.lang.String,java.lang.Object)
>>     [ERROR]     method
>>     com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<?
>>     extends T>,com.datatorrent.api.Operator.InputPort<? super
>>     T>,com.datatorrent.api.Operator.InputPort<? super T>) is not
>>     applicable
>>     [ERROR]       (cannot infer type-variable(s) T
>>     [ERROR]         (actual and formal argument lists differ in length))
>>
>>
>>
>> It seems that on the one hand the RabbitMQInputOperator.class does
>> not have an output method and on the other hand the addStream method
>> only accepts outputPort combined with inputPort methods or output and
>> input methods of the corresponding classes. Does that mean I only can
>> use a class that implements inputPort method for this example?
>>
>> Cheers
>>
>> Manfred.
>>
>>
>>
>> Am 08.06.2017 um 10:05 schrieb apex@x5h.eu:
>>>
>>> Sorry the two Snippets below where from different iterations. The
>>> Error I get is the following:
>>>
>>> [ERROR]
>>> /home/pi/apex/rabbitMQ/src/main/java/com/example/rabbitMQ/RabbitMQApplication.java:[31,38]
>>> cannot find symbol
>>> [ERROR]   symbol:   variable output
>>> [ERROR]   location: variable rabbitInput of type
>>> com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator
>>>
>>> My Code is as follows:
>>>
>>>
>>> package com.example.rabbitMQ;
>>>
>>> import org.apache.hadoop.conf.Configuration;
>>>
>>> import com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator;
>>> import com.datatorrent.api.annotation.ApplicationAnnotation;
>>> import com.datatorrent.api.StreamingApplication;
>>> import com.datatorrent.api.DAG;
>>> import com.datatorrent.lib.io.jms.JMSStringInputOperator;
>>>
>>> @ApplicationAnnotation(name="RabbitMQ2HDFS")
>>> public class RabbitMQApplication implements StreamingApplication
>>> {
>>>
>>>   @Override
>>>   public void populateDAG(DAG dag, Configuration conf)
>>>   {
>>>
>>>     RabbitMQInputOperator rabbitInput =
>>> dag.addOperator("Consumer",RabbitMQInputOperator.class);
>>>     rabbitInput.setHost("localhost");
>>>     rabbitInput.setPort(5672);
>>>     rabbitInput.setExchange("");
>>>     rabbitInput.setQueueName("hello");
>>>     LineOutputOperator out = dag.addOperator("fileOut", new
>>> LineOutputOperator());
>>>
>>>     dag.addStream("data", rabbitInput.output, out.input);
>>> }
>>> }
>>>
>>> Cheers
>>>
>>> Manfred.
>>>
>>>
>>>
>>> Am 08.06.2017 um 04:34 schrieb vikram patil:
>>>> Hi,
>>>> dag.addStream() is actually used to create stream of from one Operator output
port to other operators input port.
>>>> RabbitMQInputOperator consumer = dag.addOperator("Consumer",
>>>> RabbitMQInputOperator.class);
>>>> dag.addStream("data", *rabbitInput*.output, out.input); 
>>>> Looks like your operator name is incorrect? I see in your code
>>>> snippet above, name of of RabbiMQInputOperator is *"Consumer".*
>>>>
>>>> In property name, you need to provide operator name you have
>>>> specified in addOperator(*"NAME OF THE OPERATOR"*,
>>>> RabbitMQInputOperator.class) api call.
>>>>  
>>>>      dt.operator.*rabbitMQIn*.prop.tuple_blast ( Syntax is correct
>>>> correct given your operator name is correct ). 
>>>>
>>>> ( It should be dt.operator.*Consumer*.prop.tuple_blast based on
>>>> your code snippet ).
>>>>
>>>> I think tests which are provided in the Apache Malhar are very
>>>> detailed, they run in local mode as unit tests so we have mocked
>>>> actual rabbitmq by custom message publisher. 
>>>>
>>>> For timebeing you set only queuename and hostname as
>>>>
>>>>    //  set your rabbitmq host . 
>>>> consumer.setHost("localhost"); // set your rabbitmq port
>>>> consumer.setPort(5672) // It depends on your rabbitmq producer
>>>> configuration but by default // its default exchange with "" ( No
>>>> Name is provided ). consumer.setExchange(""); // set your queue
>>>> name consumer.setQueueName("YOUR_QUEUE_NAME")
>>>>
>>>> 	
>>>>
>>>>
>>>> If its okay, could you please share code from your application.java
>>>> and properties.xml here?
>>>>
>>>> Thanks,
>>>> Vikram
>>>>
>>>>
>>>> On Thu, Jun 8, 2017 at 12:32 AM, <apex@x5h.eu <mailto:apex@x5h.eu>>
>>>> wrote:
>>>>
>>>>     Thanks very much for the help. The only problem left is that I
>>>>     don't quite understand dag.addstream().
>>>>
>>>>     I tried this
>>>>
>>>>     RabbitMQInputOperator consumer = dag.addOperator("Consumer",
>>>>     RabbitMQInputOperator.class);
>>>>     dag.addStream("data", rabbitInput.output, out.input); 
>>>>
>>>>     but obviously this doesn't work. What I don't get is the
>>>>     difference between the ActiveMQ example and the RabbitMQ
>>>>     example. I looked over the test examples for RabbitMQ but don't
>>>>     seem to understand the logic behind it.
>>>>
>>>>     Is this the correct wax to specify properties:
>>>>       <property>
>>>>         <name>dt.operator.rabbitMQIn.prop.tuple_blast</name>
>>>>         <value>500</value>
>>>>       </property>
>>>>
>>>>     Cheers
>>>>     Manfred.
>>>>
>>>>
>>>>     Am 07.06.2017 um 12:03 schrieb Vikram Patil:
>>>>>     Yes, you would need Application.java which will be way to define
a DAG
>>>>>     for Apex Application.
>>>>>
>>>>>     Please have look at the code from following example to find out how
to
>>>>>     write JMS ActiveMQ based example:
>>>>>
>>>>>     https://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ
>>>>>     <https://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ>
>>>>>
>>>>>     This is how you can instantiate RabbitMQINputOperator and to a dag.
>>>>>     RabbitMQInputOperator consumer = dag.addOperator("Consumer",
>>>>>     RabbitMQInputOperator.class);
>>>>>
>>>>>     https://stackoverflow.com/questions/42207495/how-to-use-apache-apex-malhar-rabbitmq-operator-in-dag
>>>>>     <https://stackoverflow.com/questions/42207495/how-to-use-apache-apex-malhar-rabbitmq-operator-in-dag>
>>>>>
>>>>>     Following properties need to be specified in properties.xml
>>>>>
>>>>>     * Properties:<br>
>>>>>     * <b>tuple_blast</b>: Number of tuples emitted in each
burst<br>
>>>>>     * <b>bufferSize</b>: Size of holding buffer<br>
>>>>>     * <b>host</b>:the address for the consumer to connect
to rabbitMQ producer<br>
>>>>>     * <b>exchange</b>:the exchange for the consumer to connect
to rabbitMQ
>>>>>     producer<br>
>>>>>     * <b>exchangeType</b>:the exchangeType for the consumer
to connect to
>>>>>     rabbitMQ producer<br>
>>>>>     * <b>routingKey</b>:the routingKey for the consumer to
connect to
>>>>>     rabbitMQ producer<br>
>>>>>     * <b>queueName</b>:the queueName for the consumer to
connect to
>>>>>     rabbitMQ producer<br>
>>>>>     * <br>
>>>>>
>>>>>     Reference: https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
>>>>>     <https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java>
>>>>>
>>>>>     Thanks,
>>>>>     Vikram
>>>>>
>>>>>     On Wed, Jun 7, 2017 at 3:19 PM,  <apex@x5h.eu> <mailto:apex@x5h.eu>
wrote:
>>>>>>     Hello,
>>>>>>
>>>>>>     I compiled the whole thing but now I don't know exactly how to
get it
>>>>>>     running in Apex. Do I need an application.java like in the tutorial?
I do
>>>>>>     have a simple RabbitMQ queue up and running on the server. How
do I consume
>>>>>>     the messages with Apex and write them to hdfs?
>>>>>>
>>>>>>     Cheers,
>>>>>>
>>>>>>     Manfred
>>>>>>
>>>>>>     Following steps were necessary to get the RabbitMq test to compile
>>>>>>
>>>>>>     @TimeoutException
>>>>>>     import java.util.concurrent.TimeoutException;
>>>>>>     public void setup() throws IOException,TimeoutException
>>>>>>     public void teardown() throws IOException,TimeoutException
>>>>>>     protected void runTest(final int testNum) throws IOException
>>>>>>
>>>>>>     @Build jars
>>>>>>     cd apex-malhar/contrib/
>>>>>>     mvn clean package -DskipTests
>>>>>>
>>>>>>     cd apex-malhar/library/
>>>>>>     mvn clean package -DskipTests
>>>>>>     copy packages to project directory
>>>>>>
>>>>>>     @Link them to the project
>>>>>>     Add following lines to the pom.xml
>>>>>>     <dependency>
>>>>>>         <groupId>contrib</groupId>
>>>>>>         <artifactId>com.datatorrent.contrib.helper</artifactId>
>>>>>>         <version>1.0</version>
>>>>>>         <scope>system</scope>
>>>>>>
>>>>>>     <systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT-tests.jar</systemPath>
>>>>>>     </dependency>
>>>>>>     <dependency>
>>>>>>         <groupId>lib</groupId>
>>>>>>         <artifactId>com.datatorrent.lib.helper</artifactId>
>>>>>>         <version>1.0</version>
>>>>>>         <scope>system</scope>
>>>>>>
>>>>>>     <systemPath>${project.basedir}/src/main/resources/malhar-library-3.8.0-SNAPSHOT-tests.jar</systemPath>
>>>>>>     </dependency>
>>>>>>     <dependency>
>>>>>>         <groupId>contrib</groupId>
>>>>>>         <artifactId>com.datatorrent.contrib.rabbitmq</artifactId>
>>>>>>         <version>1.0</version>
>>>>>>         <scope>system</scope>
>>>>>>
>>>>>>     <systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT.jar</systemPath>
>>>>>>     </dependency>
>>>>>>     <dependency>
>>>>>>         <groupId>Attribute</groupId>
>>>>>>         <artifactId>com.datatorrent.api.Attribute.AttributeMap</artifactId>
>>>>>>         <version>1.0</version>
>>>>>>         <scope>system</scope>
>>>>>>
>>>>>>     <systemPath>${project.basedir}/src/main/resources/apex-api-3.7.0-SNAPSHOT.jar</systemPath>
>>>>>>     </dependency>
>>>>>>
>>>>>>
>>>>>>     Am 31.05.2017 um 18:57 schrieb Sanjay Pujare:
>>>>>>
>>>>>>     Both com.datatorrent.contrib.helper and  com.datatorrent.lib.helper
are
>>>>>>     under the test directory under malhar-contrib and malhar-library
>>>>>>     respectively. You may need to build these jars yourself with
test scope to
>>>>>>     include these packages.
>>>>>>
>>>>>>     On Wed, May 31, 2017 at 9:39 AM, <apex@x5h.eu> <mailto:apex@x5h.eu>
wrote:
>>>>>>>     Hello, (mea culpa for messing up the headline the first time)
>>>>>>>
>>>>>>>     I'm currently trying to get the apex-malhar rabbitmq running.
But I'm at a
>>>>>>>     complete loss, while the examples are running fine I don't
even get the
>>>>>>>     RabbitMQInputOperatorTest.java to run.
>>>>>>>
>>>>>>>     First it couldn't find the rabbitmq-client which was solveable
by adding
>>>>>>>     the dependency:
>>>>>>>
>>>>>>>     <dependency>
>>>>>>>         <groupId>com.rabbitmq</groupId>
>>>>>>>         <artifactId>amqp-client</artifactId>
>>>>>>>         <version>4.1.0</version>
>>>>>>>       </dependency>
>>>>>>>
>>>>>>>     But now it doesn't find the packages com.datatorrent.contrib.helper
and
>>>>>>>     com.datatorrent.lib.helper and can't find several symbols.
>>>>>>>
>>>>>>>     Needless to say that I'm a beginner regarding Apex so does
anyone know
>>>>>>>     what exactly I'm doing wrong here?
>>>>>>>
>>>>>>>     Cheers
>>>>>>>
>>>>>>>     Manfred.
>>>>>>>
>>>>>>>
>>>>
>>>>
>>>
>>
>


Mime
View raw message