apex-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Vikram Patil <vik...@datatorrent.com>
Subject Re: Apex and RabbitMQ problems with the input operator
Date Fri, 07 Jul 2017 09:40:26 GMT
Thanks Manfred for resolving this issue.  I checked in the code now. As you
suggested RabbitMQInputOperator seems to be supporting non-durable exchange
but with durable queues. That seems inconsistent. Please feel free to
create ticket for an improvement for RabbiMQInputOperator regarding this
issue.

Thanks & Regards,
Vikram

On Fri, Jul 7, 2017 at 2:48 PM, <apex@x5h.eu> wrote:

> After various tests I finally got it all working nicely and for future
> users I'll post here how.
>
> First the rabbitMQ configuration that was the only working one:
> rabbitmqadmin declare exchange name=apex type=fanout durable=false
> rabbitmqadmin declare queue name=test durable=true
> rabbitmqadmin binding source="apex" destination_type="queue"
> destination="test" routing_key="rktest"
>
> It is important that apex only accepts a non-durable exchange. But this
> means you have to recreate it everytime you restart your RabbitMQ service.
>
> The "Mkdirs failed to create" error:
> This just means that the DFS service is down or in my case the safemode is
> on.
> hdfs dfsadmin -safemode get
> hdfs dfsadmin -safemode enter
>
> My example uses the following (I moved the operator values in a
> corresponding *.xml file I just listed them here for better understanding):
>
> @ApplicationAnnotation(name="RabbitMQ2HDFS")
> public class RabbitMQApplication implements StreamingApplication
> {
>
>   @Override
>   public void populateDAG(DAG dag, Configuration conf)
>   {
>     RabbitMQInputOperator in = dag.addOperator("rabbitInput",new
> RabbitMQInputOperator());
>     in.setHost("192.168.33.63");
>     in.setExchange("apex");
>     in.setExchangeType("fanout");
>     in.setQueueName("test");
>
>    LineOutputOperator out = dag.addOperator("fileOut", new
> LineOutputOperator());
>    out.setFilePath("/hdfs/rabbitMQ");
>    out.setBaseName("rabbitOut");
>    out.setMaxLength(1024);
>    out.setRotationWindows(4);
>    dag.addStream("data", in.outputPort, out.input);
> }
> }
>
> And the corresponding Output Operator. The only important thing here was
> that it extends the byte AbstractFileOutputOperator
>
> public class LineOutputOperator extends AbstractFileOutputOperator<byte[]>
> {
>   private static final String NL = System.lineSeparator();
>   private static final Charset CS = StandardCharsets.UTF_8;
>
>   @NotNull
>   private String baseName;
>
>   @Override
>   public byte[] getBytesForTuple(byte[] t) {
>     String result = new String(t, CS) + NL;
>     return result.getBytes(CS);
>  }
>
>   @Override
>   protected String getFileName(byte[] tuple) {
>     return baseName;
>   }
>
>   public String getBaseName() { return baseName; }
>   public void setBaseName(String v) { baseName = v; }
> }
>
> The most pressing issue was that it won't run on the yarn cluster only in
> local mode. I still have no idea why it diden't run but my best guess is
> that it was a bad idea in the beginning to try the apex app in a Rasperry
> Pi 3 cluster. I switched to a standard Arch Linux Server with 8GB RAM and
> without changing a thing in the application it worked perfectly.
> Thanks for all the help!
>
>
> Am 22.06.2017 um 11:33 schrieb apex@x5h.eu:
>
> I drilled the error down to this message:
>
> Mkdirs failed to create file:/home/pi/datatorrent/apps/application_
> 1498123667708_0001/checkpoints/2
>
> I guess i have something buggy in my configuration does any of you know
> how to solve this error? Should I start the application with the same user
> I'm starting yarn?
>
> Cheers
>
> Manfred.
>
>
>
> Am 10.06.2017 um 14:50 schrieb apex@x5h.eu:
>
> Hello,
>
> you were completely right it seems that there are problems with my test
> scenario regarding the hadoop, yarn installation and the application never
> starts. I found this entries in the log:
>
> 2017-06-10 14:33:02,623 INFO org.apache.hadoop.yarn.server.
> resourcemanager.ApplicationMasterService: Registering app attempt :
> appattempt_1495629011552_0011_000001
> 2017-06-10 14:33:02,623 INFO org.apache.hadoop.yarn.server.
> resourcemanager.rmapp.attempt.RMAppAttemptImpl:
> appattempt_1495629011552_0011_000001 State change from NEW to SUBMITTED
> 2017-06-10 14:33:02,624 INFO org.apache.hadoop.yarn.server.
> resourcemanager.scheduler.capacity.LeafQueue: not starting application as
> amIfStarted exceeds amLimit
> 2017-06-10 14:33:02,624 INFO org.apache.hadoop.yarn.server.
> resourcemanager.scheduler.capacity.LeafQueue: Application added - appId:
> application_1495629011552_0011 user: org.apache.hadoop.yarn.server.
> resourcemanager.scheduler.capacity.LeafQueue$User@11536dc, leaf-queue:
> default #user-pending-applications: 1 #user-active-applications: 1
> #queue-pending-applications: 1 #queue-active-applications: 1
>
> Therefore the application never leaves the state "undefined". Since the
> local tests were running fine and the launch of the application didn't
> raise an error I missed the problem with the hadoop installation. Thanks
> for the correct hint to look at the hadoop cluster.
>
> Cheers
> Manfred.
>
>
> Am 09.06.2017 um 15:23 schrieb vikram patil:
>
> 1) Are you doing it on your local environment?
> 2) If you are doing it locally I would suggest following options
>      1) If you dont want to create queue on rabbitmq by yourself . Set
> queuename on operator
>         in.setQueueName("YOUR_QUEUE_NAME" )
>          Operator will do following steps :
>            * Create Durable Queue in RabbitMQ
>            * You have specfied exchange and exchangeType .
>                  So it will create an exchange using this  information and
> bind created queue with exchange with default routing key which will be "".
>         Right now it must be creating auto generated unique named queue
> for you.
>
>       2) You can create your own exchange and durable queue using rabbitmq
> admin . You will have to install rabbitmq plugins for that. You can use it
> to publish some test data as well.
>
> Using apex-cli you can check status of your application, if its failing
> then you should check logs from userlogs in hadoop logs directory.
>
> Thanks & Regards,
> Vikram
>
>
>
>
> On Fri, Jun 9, 2017 at 6:42 PM, <apex@x5h.eu> wrote:
>
>> Finally got rid of all errors but now I have the problem that the apex
>> application does not seem to register at the RabbitMQ exchange.
>>
>> This is my code:
>>
>> @ApplicationAnnotation(name="RabbitMQ2HDFS")
>> public class RabbitMQApplication implements StreamingApplication
>> {
>>
>>   @Override
>>   public void populateDAG(DAG dag, Configuration conf)
>>   {
>>     RabbitMQInputOperator in = dag.addOperator("rabbitInput",new
>> RabbitMQInputOperator());
>>     in.setHost("localhost");
>>     //in.setPort(5672);
>>     in.setExchange("apex");
>>     in.setExchangeType("fanout");
>>     ConsoleOutputOperator console = dag.addOperator("console", new
>> ConsoleOutputOperator());
>>     dag.addStream("rand_console",in.outputPort, console.input);
>>
>> }
>>
>> }
>> If I launch the application everthing executes without an error but if i
>> list the bindings on the exchange, there is none.
>>
>> Anyone even an idea how i can start to debug this?
>>
>> Cheers
>> Manfred.
>>
>>
>>
>> Am 08.06.2017 um 18:04 schrieb apex@x5h.eu:
>>
>> Okay i found the error, I copied the LineOutputOperator.java
>> <https://github.com/DataTorrent/examples/blob/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ/LineOutputOperator.java>
>> from the jmsActiveMQ example and found  there
>> public class LineOutputOperator extends AbstractFileOutputOperator<String
>> >
>>
>> Instead i took the LineOutputOperator.java
>> <https://github.com/DataTorrent/examples/blob/master/tutorials/kafka/src/main/java/com/example/myapexapp/LineOutputOperator.java>from
>> the Kafka 0.9 example there the class is correctly defined for the
>> RabbitMQInputOperator
>>
>> So far so good now it compiles without errors.
>>
>> Cheers
>>
>> Manfred
>> Am 08.06.2017 um 17:38 schrieb apex@x5h.eu:
>>
>> I still don't get it completely: (The rest of the code is in the Email
>> before, this is only the necessary sample)
>>
>>    1. dag.addStream("test", rabbitInput.output, out.input);
>>    Results in the following error:
>>    [ERROR]   symbol:   variable output
>>    [ERROR]   location: variable rabbitInput of type
>>    com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator
>>
>>    2. dag.addStream("test", rabbitInput.outputPort, out.input);
>>    Results in the following error:
>>    [ERROR] /home/pi/apex/rabbitMQ/src/main/java/com/example/rabbitMQ/
>>    RabbitMQApplication.java:[31,8] no suitable method found for
>>    addStream(java.lang.String,com.datatorrent.api.DefaultOutput
>>    Port<byte[]>,com.datatorrent.api.DefaultInputPort<java.lang.String>)
>>    [ERROR]     method com.datatorrent.api.DAG.<T>add
>>    Stream(java.lang.String,com.datatorrent.api.Operator.OutputPort<?
>>    extends T>,com.datatorrent.api.Operator.InputPort<? super T>...) is
>>    not applicable
>>    [ERROR]       (inferred type does not conform to upper bound(s)
>>    [ERROR]         inferred: byte[]
>>    [ERROR]         upper bound(s): java.lang.String,java.lang.Object)
>>    [ERROR]     method com.datatorrent.api.DAG.<T>add
>>    Stream(java.lang.String,com.datatorrent.api.Operator.OutputPort<?
>>    extends T>,com.datatorrent.api.Operator.InputPort<? super T>) is not
>>    applicable
>>    [ERROR]       (inferred type does not conform to upper bound(s)
>>    [ERROR]         inferred: byte[]
>>    [ERROR]         upper bound(s): java.lang.String,java.lang.Object)
>>    [ERROR]     method com.datatorrent.api.DAG.<T>add
>>    Stream(java.lang.String,com.datatorrent.api.Operator.OutputPort<?
>>    extends T>,com.datatorrent.api.Operator.InputPort<? super
>>    T>,com.datatorrent.api.Operator.InputPort<? super T>) is not
>>    applicable
>>    [ERROR]       (cannot infer type-variable(s) T
>>    [ERROR]         (actual and formal argument lists differ in length))
>>
>>
>>
>>
>> It seems that on the one hand the RabbitMQInputOperator.class does not
>> have an output method and on the other hand the addStream method only
>> accepts outputPort combined with inputPort methods or output and input
>> methods of the corresponding classes. Does that mean I only can use a class
>> that implements inputPort method for this example?
>>
>> Cheers
>>
>> Manfred.
>>
>>
>>
>> Am 08.06.2017 um 10:05 schrieb apex@x5h.eu:
>>
>> Sorry the two Snippets below where from different iterations. The Error I
>> get is the following:
>>
>> [ERROR] /home/pi/apex/rabbitMQ/src/main/java/com/example/rabbitMQ/
>> RabbitMQApplication.java:[31,38] cannot find symbol
>> [ERROR]   symbol:   variable output
>> [ERROR]   location: variable rabbitInput of type
>> com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator
>>
>> My Code is as follows:
>>
>> package com.example.rabbitMQ;
>>
>> import org.apache.hadoop.conf.Configuration;
>>
>> import com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator;
>> import com.datatorrent.api.annotation.ApplicationAnnotation;
>> import com.datatorrent.api.StreamingApplication;
>> import com.datatorrent.api.DAG;
>> import com.datatorrent.lib.io.jms.JMSStringInputOperator;
>>
>> @ApplicationAnnotation(name="RabbitMQ2HDFS")
>> public class RabbitMQApplication implements StreamingApplication
>> {
>>
>>   @Override
>>   public void populateDAG(DAG dag, Configuration conf)
>>   {
>>
>>     RabbitMQInputOperator rabbitInput = dag.addOperator("Consumer",Rab
>> bitMQInputOperator.class);
>>     rabbitInput.setHost("localhost");
>>     rabbitInput.setPort(5672);
>>     rabbitInput.setExchange("");
>>     rabbitInput.setQueueName("hello");
>>     LineOutputOperator out = dag.addOperator("fileOut", new
>> LineOutputOperator());
>>
>>     dag.addStream("data", rabbitInput.output, out.input);
>> }
>> }
>>
>> Cheers
>>
>> Manfred.
>>
>>
>>
>> Am 08.06.2017 um 04:34 schrieb vikram patil:
>>
>> Hi,
>>
>> dag.addStream() is actually used to create stream of from one Operator output port
to other operators input port.
>>
>> RabbitMQInputOperator consumer = dag.addOperator("Consumer",
>> RabbitMQInputOperator.class);
>> dag.addStream("data", *rabbitInput*.output, out.input);
>>
>> Looks like your operator name is incorrect? I see in your code snippet
>> above, name of of RabbiMQInputOperator is *"Consumer".*
>>
>> In property name, you need to provide operator name you have specified in
>> addOperator(*"NAME OF THE OPERATOR"*, RabbitMQInputOperator.class) api
>> call.
>>
>>      dt.operator.*rabbitMQIn*.prop.tuple_blast ( Syntax is correct
>> correct given your operator name is correct ).
>>
>> ( It should be dt.operator.*Consumer*.prop.tuple_blast based on your
>> code snippet ).
>>
>> I think tests which are provided in the Apache Malhar are very detailed,
>> they run in local mode as unit tests so we have mocked actual rabbitmq by
>> custom message publisher.
>>
>> For timebeing you set only queuename and hostname as
>>
>>    //  set your rabbitmq host .
>> consumer.setHost("localhost"); // set your rabbitmq port
>> consumer.setPort(5672) // It depends on your rabbitmq producer
>> configuration but by default // its default exchange with "" ( No Name is
>> provided ). consumer.setExchange(""); // set your queue name
>> consumer.setQueueName("YOUR_QUEUE_NAME")
>>
>>
>>
>> If its okay, could you please share code from your application.java and
>> properties.xml here?
>>
>> Thanks,
>> Vikram
>>
>>
>> On Thu, Jun 8, 2017 at 12:32 AM, <apex@x5h.eu> wrote:
>>
>>> Thanks very much for the help. The only problem left is that I don't
>>> quite understand dag.addstream().
>>>
>>> I tried this
>>>
>>> RabbitMQInputOperator consumer = dag.addOperator("Consumer",
>>> RabbitMQInputOperator.class);
>>> dag.addStream("data", rabbitInput.output, out.input);
>>>
>>>
>>> but obviously this doesn't work. What I don't get is the difference
>>> between the ActiveMQ example and the RabbitMQ example. I looked over the
>>> test examples for RabbitMQ but don't seem to understand the logic behind
>>> it.
>>>
>>> Is this the correct wax to specify properties:
>>>   <property>
>>>     <name>dt.operator.rabbitMQIn.prop.tuple_blast</name>
>>>     <value>500</value>
>>>   </property>
>>>
>>> Cheers
>>> Manfred.
>>>
>>>
>>> Am 07.06.2017 um 12:03 schrieb Vikram Patil:
>>>
>>> Yes, you would need Application.java which will be way to define a DAG
>>> for Apex Application.
>>>
>>> Please have look at the code from following example to find out how to
>>> write JMS ActiveMQ based example:
>>> https://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ
>>>
>>> This is how you can instantiate RabbitMQINputOperator and to a dag.
>>> RabbitMQInputOperator consumer = dag.addOperator("Consumer",
>>> RabbitMQInputOperator.class);
>>> https://stackoverflow.com/questions/42207495/how-to-use-apache-apex-malhar-rabbitmq-operator-in-dag
>>>
>>> Following properties need to be specified in properties.xml
>>>
>>> * Properties:<br>
>>> * <b>tuple_blast</b>: Number of tuples emitted in each burst<br>
>>> * <b>bufferSize</b>: Size of holding buffer<br>
>>> * <b>host</b>:the address for the consumer to connect to rabbitMQ
producer<br>
>>> * <b>exchange</b>:the exchange for the consumer to connect to rabbitMQ
>>> producer<br>
>>> * <b>exchangeType</b>:the exchangeType for the consumer to connect
to
>>> rabbitMQ producer<br>
>>> * <b>routingKey</b>:the routingKey for the consumer to connect to
>>> rabbitMQ producer<br>
>>> * <b>queueName</b>:the queueName for the consumer to connect to
>>> rabbitMQ producer<br>
>>> * <br>
>>>
>>> Reference: https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
>>>
>>> Thanks,
>>> Vikram
>>>
>>> On Wed, Jun 7, 2017 at 3:19 PM,  <apex@x5h.eu> <apex@x5h.eu> wrote:
>>>
>>> Hello,
>>>
>>> I compiled the whole thing but now I don't know exactly how to get it
>>> running in Apex. Do I need an application.java like in the tutorial? I do
>>> have a simple RabbitMQ queue up and running on the server. How do I consume
>>> the messages with Apex and write them to hdfs?
>>>
>>> Cheers,
>>>
>>> Manfred
>>>
>>> Following steps were necessary to get the RabbitMq test to compile
>>>
>>> @TimeoutException
>>> import java.util.concurrent.TimeoutException;
>>> public void setup() throws IOException,TimeoutException
>>> public void teardown() throws IOException,TimeoutException
>>> protected void runTest(final int testNum) throws IOException
>>>
>>> @Build jars
>>> cd apex-malhar/contrib/
>>> mvn clean package -DskipTests
>>>
>>> cd apex-malhar/library/
>>> mvn clean package -DskipTests
>>> copy packages to project directory
>>>
>>> @Link them to the project
>>> Add following lines to the pom.xml
>>> <dependency>
>>>     <groupId>contrib</groupId>
>>>     <artifactId>com.datatorrent.contrib.helper</artifactId>
>>>     <version>1.0</version>
>>>     <scope>system</scope>
>>>
>>> <systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT-tests.jar</systemPath>
>>> </dependency>
>>> <dependency>
>>>     <groupId>lib</groupId>
>>>     <artifactId>com.datatorrent.lib.helper</artifactId>
>>>     <version>1.0</version>
>>>     <scope>system</scope>
>>>
>>> <systemPath>${project.basedir}/src/main/resources/malhar-library-3.8.0-SNAPSHOT-tests.jar</systemPath>
>>> </dependency>
>>> <dependency>
>>>     <groupId>contrib</groupId>
>>>     <artifactId>com.datatorrent.contrib.rabbitmq</artifactId>
>>>     <version>1.0</version>
>>>     <scope>system</scope>
>>>
>>> <systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT.jar</systemPath>
>>> </dependency>
>>> <dependency>
>>>     <groupId>Attribute</groupId>
>>>     <artifactId>com.datatorrent.api.Attribute.AttributeMap</artifactId>
>>>     <version>1.0</version>
>>>     <scope>system</scope>
>>>
>>> <systemPath>${project.basedir}/src/main/resources/apex-api-3.7.0-SNAPSHOT.jar</systemPath>
>>> </dependency>
>>>
>>>
>>> Am 31.05.2017 um 18:57 schrieb Sanjay Pujare:
>>>
>>> Both com.datatorrent.contrib.helper and  com.datatorrent.lib.helper are
>>> under the test directory under malhar-contrib and malhar-library
>>> respectively. You may need to build these jars yourself with test scope to
>>> include these packages.
>>>
>>> On Wed, May 31, 2017 at 9:39 AM, <apex@x5h.eu> <apex@x5h.eu> wrote:
>>>
>>> Hello, (mea culpa for messing up the headline the first time)
>>>
>>> I'm currently trying to get the apex-malhar rabbitmq running. But I'm at a
>>> complete loss, while the examples are running fine I don't even get the
>>> RabbitMQInputOperatorTest.java to run.
>>>
>>> First it couldn't find the rabbitmq-client which was solveable by adding
>>> the dependency:
>>>
>>> <dependency>
>>>     <groupId>com.rabbitmq</groupId>
>>>     <artifactId>amqp-client</artifactId>
>>>     <version>4.1.0</version>
>>>   </dependency>
>>>
>>> But now it doesn't find the packages com.datatorrent.contrib.helper and
>>> com.datatorrent.lib.helper and can't find several symbols.
>>>
>>> Needless to say that I'm a beginner regarding Apex so does anyone know
>>> what exactly I'm doing wrong here?
>>>
>>> Cheers
>>>
>>> Manfred.
>>>
>>>
>>>
>>>
>>>
>>
>>
>>
>>
>>
>
>
>
>

Mime
View raw message