apex-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@x5h.eu
Subject Re: Apex and RabbitMQ problems with the input operator
Date Thu, 22 Jun 2017 09:33:40 GMT
I drilled the error down to this message:

Mkdirs failed to create
file:/home/pi/datatorrent/apps/application_1498123667708_0001/checkpoints/2

I guess i have something buggy in my configuration does any of you know
how to solve this error? Should I start the application with the same
user I'm starting yarn?

Cheers

Manfred.



Am 10.06.2017 um 14:50 schrieb apex@x5h.eu:
>
> Hello,
>
> you were completely right it seems that there are problems with my
> test scenario regarding the hadoop, yarn installation and the
> application never starts. I found this entries in the log:
>
> 2017-06-10 14:33:02,623 INFO
> org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService:
> Registering app attempt : appattempt_1495629011552_0011_000001
> 2017-06-10 14:33:02,623 INFO
> org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl:
> appattempt_1495629011552_0011_000001 State change from NEW to SUBMITTED
> 2017-06-10 14:33:02,624 INFO
> org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue:
> not starting application as amIfStarted exceeds amLimit
> 2017-06-10 14:33:02,624 INFO
> org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue:
> Application added - appId: application_1495629011552_0011 user:
> org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue$User@11536dc,
> leaf-queue: default #user-pending-applications: 1
> #user-active-applications: 1 #queue-pending-applications: 1
> #queue-active-applications: 1
>
> Therefore the application never leaves the state "undefined". Since
> the local tests were running fine and the launch of the application
> didn't raise an error I missed the problem with the hadoop
> installation. Thanks for the correct hint to look at the hadoop cluster.
>
> Cheers
> Manfred.
>
>
> Am 09.06.2017 um 15:23 schrieb vikram patil:
>> 1) Are you doing it on your local environment?
>> 2) If you are doing it locally I would suggest following options
>>      1) If you dont want to create queue on rabbitmq by yourself .
>> Set queuename on operator
>>         in.setQueueName("YOUR_QUEUE_NAME" )
>>          Operator will do following steps :
>>            * Create Durable Queue in RabbitMQ
>>            * You have specfied exchange and exchangeType .
>>                  So it will create an exchange using this
>>  information and bind created queue with exchange with default
>> routing key which will be "".
>>         Right now it must be creating auto generated unique named
>> queue for you. 
>>        
>>       2) You can create your own exchange and durable queue using
>> rabbitmq admin . You will have to install rabbitmq plugins for that.
>> You can use it to publish some test data as well.
>>
>> Using apex-cli you can check status of your application, if its
>> failing then you should check logs from userlogs in hadoop logs
>> directory.
>>
>> Thanks & Regards,
>> Vikram
>>
>>   
>>        
>>
>> On Fri, Jun 9, 2017 at 6:42 PM, <apex@x5h.eu <mailto:apex@x5h.eu>> wrote:
>>
>>     Finally got rid of all errors but now I have the problem that the
>>     apex application does not seem to register at the RabbitMQ exchange.
>>
>>     This is my code:
>>
>>     @ApplicationAnnotation(name="RabbitMQ2HDFS")
>>     public class RabbitMQApplication implements StreamingApplication
>>     {
>>
>>       @Override
>>       public void populateDAG(DAG dag, Configuration conf)
>>       {
>>         RabbitMQInputOperator in = dag.addOperator("rabbitInput",new
>>     RabbitMQInputOperator());
>>         in.setHost("localhost");
>>         //in.setPort(5672);
>>         in.setExchange("apex");
>>         in.setExchangeType("fanout");
>>         ConsoleOutputOperator console = dag.addOperator("console",
>>     new ConsoleOutputOperator());
>>         dag.addStream("rand_console",in.outputPort, console.input);
>>
>>     }
>>
>>     }
>>
>>     If I launch the application everthing executes without an error
>>     but if i list the bindings on the exchange, there is none.
>>
>>     Anyone even an idea how i can start to debug this?
>>
>>     Cheers
>>     Manfred.
>>
>>
>>
>>     Am 08.06.2017 um 18:04 schrieb apex@x5h.eu <mailto:apex@x5h.eu>:
>>>
>>>     Okay i found the error, I copied the LineOutputOperator.java
>>>     <https://github.com/DataTorrent/examples/blob/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ/LineOutputOperator.java>
>>>     from the jmsActiveMQ example and found  there
>>>     public class LineOutputOperator extends
>>>     AbstractFileOutputOperator<String>
>>>
>>>     Instead i took the LineOutputOperator.java 
>>>     <https://github.com/DataTorrent/examples/blob/master/tutorials/kafka/src/main/java/com/example/myapexapp/LineOutputOperator.java>from
>>>     the Kafka 0.9 example there the class is correctly defined for
>>>     the RabbitMQInputOperator
>>>
>>>     So far so good now it compiles without errors.
>>>
>>>     Cheers
>>>
>>>     Manfred
>>>
>>>     Am 08.06.2017 um 17:38 schrieb apex@x5h.eu <mailto:apex@x5h.eu>:
>>>>
>>>>     I still don't get it completely: (The rest of the code is in
>>>>     the Email before, this is only the necessary sample)
>>>>
>>>>      1. dag.addStream("test", rabbitInput.output, out.input);
>>>>         Results in the following error:
>>>>         [ERROR]   symbol:   variable output
>>>>         [ERROR]   location: variable rabbitInput of type
>>>>         com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator
>>>>
>>>>      2. dag.addStream("test", rabbitInput.outputPort, out.input);
>>>>         Results in the following error:
>>>>         [ERROR]
>>>>         /home/pi/apex/rabbitMQ/src/main/java/com/example/rabbitMQ/RabbitMQApplication.java:[31,8]
>>>>         no suitable method found for
>>>>         addStream(java.lang.String,com.datatorrent.api.DefaultOutputPort<byte[]>,com.datatorrent.api.DefaultInputPort<java.lang.String>)
>>>>         [ERROR]     method
>>>>         com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<?
>>>>         extends T>,com.datatorrent.api.Operator.InputPort<? super
>>>>         T>...) is not applicable
>>>>         [ERROR]       (inferred type does not conform to upper bound(s)
>>>>         [ERROR]         inferred: byte[]
>>>>         [ERROR]         upper bound(s):
>>>>         java.lang.String,java.lang.Object)
>>>>         [ERROR]     method
>>>>         com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<?
>>>>         extends T>,com.datatorrent.api.Operator.InputPort<? super
>>>>         T>) is not applicable
>>>>         [ERROR]       (inferred type does not conform to upper bound(s)
>>>>         [ERROR]         inferred: byte[]
>>>>         [ERROR]         upper bound(s):
>>>>         java.lang.String,java.lang.Object)
>>>>         [ERROR]     method
>>>>         com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<?
>>>>         extends T>,com.datatorrent.api.Operator.InputPort<? super
>>>>         T>,com.datatorrent.api.Operator.InputPort<? super T>) is
>>>>         not applicable
>>>>         [ERROR]       (cannot infer type-variable(s) T
>>>>         [ERROR]         (actual and formal argument lists differ in
>>>>         length))
>>>>
>>>>
>>>>
>>>>     It seems that on the one hand the RabbitMQInputOperator.class
>>>>     does not have an output method and on the other hand the
>>>>     addStream method only accepts outputPort combined with
>>>>     inputPort methods or output and input methods of the
>>>>     corresponding classes. Does that mean I only can use a class
>>>>     that implements inputPort method for this example?
>>>>
>>>>     Cheers
>>>>
>>>>     Manfred.
>>>>
>>>>
>>>>
>>>>     Am 08.06.2017 um 10:05 schrieb apex@x5h.eu <mailto:apex@x5h.eu>:
>>>>>
>>>>>     Sorry the two Snippets below where from different iterations.
>>>>>     The Error I get is the following:
>>>>>
>>>>>     [ERROR]
>>>>>     /home/pi/apex/rabbitMQ/src/main/java/com/example/rabbitMQ/RabbitMQApplication.java:[31,38]
>>>>>     cannot find symbol
>>>>>     [ERROR]   symbol:   variable output
>>>>>     [ERROR]   location: variable rabbitInput of type
>>>>>     com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator
>>>>>
>>>>>     My Code is as follows:
>>>>>
>>>>>
>>>>>     package com.example.rabbitMQ;
>>>>>
>>>>>     import org.apache.hadoop.conf.Configuration;
>>>>>
>>>>>     import com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator;
>>>>>     import com.datatorrent.api.annotation.ApplicationAnnotation;
>>>>>     import com.datatorrent.api.StreamingApplication;
>>>>>     import com.datatorrent.api.DAG;
>>>>>     import com.datatorrent.lib.io.jms.JMSStringInputOperator;
>>>>>
>>>>>     @ApplicationAnnotation(name="RabbitMQ2HDFS")
>>>>>     public class RabbitMQApplication implements StreamingApplication
>>>>>     {
>>>>>
>>>>>       @Override
>>>>>       public void populateDAG(DAG dag, Configuration conf)
>>>>>       {
>>>>>
>>>>>         RabbitMQInputOperator rabbitInput =
>>>>>     dag.addOperator("Consumer",RabbitMQInputOperator.class);
>>>>>         rabbitInput.setHost("localhost");
>>>>>         rabbitInput.setPort(5672);
>>>>>         rabbitInput.setExchange("");
>>>>>         rabbitInput.setQueueName("hello");
>>>>>         LineOutputOperator out = dag.addOperator("fileOut", new
>>>>>     LineOutputOperator());
>>>>>
>>>>>         dag.addStream("data", rabbitInput.output, out.input);
>>>>>     }
>>>>>     }
>>>>>
>>>>>     Cheers
>>>>>
>>>>>     Manfred.
>>>>>
>>>>>
>>>>>
>>>>>     Am 08.06.2017 um 04:34 schrieb vikram patil:
>>>>>>     Hi,
>>>>>>     dag.addStream() is actually used to create stream of from one
Operator output port to other operators input port.
>>>>>>     RabbitMQInputOperator consumer = dag.addOperator("Consumer",
>>>>>>     RabbitMQInputOperator.class);
>>>>>>     dag.addStream("data", *rabbitInput*.output, out.input); 
>>>>>>     Looks like your operator name is incorrect? I see in your
>>>>>>     code snippet above, name of of RabbiMQInputOperator
>>>>>>     is *"Consumer".*
>>>>>>
>>>>>>     In property name, you need to provide operator name you have
>>>>>>     specified in addOperator(*"NAME OF THE OPERATOR"*,
>>>>>>     RabbitMQInputOperator.class) api call.
>>>>>>      
>>>>>>          dt.operator.*rabbitMQIn*.prop.tuple_blast ( Syntax is
>>>>>>     correct correct given your operator name is correct ). 
>>>>>>
>>>>>>     ( It should be dt.operator.*Consumer*.prop.tuple_blast based
>>>>>>     on your code snippet ).
>>>>>>
>>>>>>     I think tests which are provided in the Apache Malhar are
>>>>>>     very detailed, they run in local mode as unit tests so we
>>>>>>     have mocked actual rabbitmq by custom message publisher. 
>>>>>>
>>>>>>     For timebeing you set only queuename and hostname as
>>>>>>
>>>>>>        //  set your rabbitmq host . 
>>>>>>     consumer.setHost("localhost"); // set your rabbitmq port
>>>>>>     consumer.setPort(5672) // It depends on your rabbitmq
>>>>>>     producer configuration but by default // its default exchange
>>>>>>     with "" ( No Name is provided ). consumer.setExchange(""); //
>>>>>>     set your queue name consumer.setQueueName("YOUR_QUEUE_NAME")
>>>>>>
>>>>>>     	
>>>>>>
>>>>>>
>>>>>>     If its okay, could you please share code from your
>>>>>>     application.java and properties.xml here?
>>>>>>
>>>>>>     Thanks,
>>>>>>     Vikram
>>>>>>
>>>>>>
>>>>>>     On Thu, Jun 8, 2017 at 12:32 AM, <apex@x5h.eu
>>>>>>     <mailto:apex@x5h.eu>> wrote:
>>>>>>
>>>>>>         Thanks very much for the help. The only problem left is
>>>>>>         that I don't quite understand dag.addstream().
>>>>>>
>>>>>>         I tried this
>>>>>>
>>>>>>         RabbitMQInputOperator consumer = dag.addOperator("Consumer",
>>>>>>         RabbitMQInputOperator.class);
>>>>>>         dag.addStream("data", rabbitInput.output, out.input); 
>>>>>>
>>>>>>         but obviously this doesn't work. What I don't get is the
>>>>>>         difference between the ActiveMQ example and the RabbitMQ
>>>>>>         example. I looked over the test examples for RabbitMQ but
>>>>>>         don't seem to understand the logic behind it.
>>>>>>
>>>>>>         Is this the correct wax to specify properties:
>>>>>>           <property>
>>>>>>             <name>dt.operator.rabbitMQIn.prop.tuple_blast</name>
>>>>>>             <value>500</value>
>>>>>>           </property>
>>>>>>
>>>>>>         Cheers
>>>>>>         Manfred.
>>>>>>
>>>>>>
>>>>>>         Am 07.06.2017 um 12:03 schrieb Vikram Patil:
>>>>>>>         Yes, you would need Application.java which will be way
to define a DAG
>>>>>>>         for Apex Application.
>>>>>>>
>>>>>>>         Please have look at the code from following example to
find out how to
>>>>>>>         write JMS ActiveMQ based example:
>>>>>>>
>>>>>>>         https://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ
>>>>>>>         <https://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ>
>>>>>>>
>>>>>>>         This is how you can instantiate RabbitMQINputOperator
and to a dag.
>>>>>>>         RabbitMQInputOperator consumer = dag.addOperator("Consumer",
>>>>>>>         RabbitMQInputOperator.class);
>>>>>>>
>>>>>>>         https://stackoverflow.com/questions/42207495/how-to-use-apache-apex-malhar-rabbitmq-operator-in-dag
>>>>>>>         <https://stackoverflow.com/questions/42207495/how-to-use-apache-apex-malhar-rabbitmq-operator-in-dag>
>>>>>>>
>>>>>>>         Following properties need to be specified in properties.xml
>>>>>>>
>>>>>>>         * Properties:<br>
>>>>>>>         * <b>tuple_blast</b>: Number of tuples emitted
in each burst<br>
>>>>>>>         * <b>bufferSize</b>: Size of holding buffer<br>
>>>>>>>         * <b>host</b>:the address for the consumer
to connect to rabbitMQ producer<br>
>>>>>>>         * <b>exchange</b>:the exchange for the consumer
to connect to rabbitMQ
>>>>>>>         producer<br>
>>>>>>>         * <b>exchangeType</b>:the exchangeType for
the consumer to connect to
>>>>>>>         rabbitMQ producer<br>
>>>>>>>         * <b>routingKey</b>:the routingKey for the
consumer to connect to
>>>>>>>         rabbitMQ producer<br>
>>>>>>>         * <b>queueName</b>:the queueName for the
consumer to connect to
>>>>>>>         rabbitMQ producer<br>
>>>>>>>         * <br>
>>>>>>>
>>>>>>>         Reference: https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
>>>>>>>         <https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java>
>>>>>>>
>>>>>>>         Thanks,
>>>>>>>         Vikram
>>>>>>>
>>>>>>>         On Wed, Jun 7, 2017 at 3:19 PM,  <apex@x5h.eu>
<mailto:apex@x5h.eu> wrote:
>>>>>>>>         Hello,
>>>>>>>>
>>>>>>>>         I compiled the whole thing but now I don't know exactly
how to get it
>>>>>>>>         running in Apex. Do I need an application.java like
in the tutorial? I do
>>>>>>>>         have a simple RabbitMQ queue up and running on the
server. How do I consume
>>>>>>>>         the messages with Apex and write them to hdfs?
>>>>>>>>
>>>>>>>>         Cheers,
>>>>>>>>
>>>>>>>>         Manfred
>>>>>>>>
>>>>>>>>         Following steps were necessary to get the RabbitMq
test to compile
>>>>>>>>
>>>>>>>>         @TimeoutException
>>>>>>>>         import java.util.concurrent.TimeoutException;
>>>>>>>>         public void setup() throws IOException,TimeoutException
>>>>>>>>         public void teardown() throws IOException,TimeoutException
>>>>>>>>         protected void runTest(final int testNum) throws
IOException
>>>>>>>>
>>>>>>>>         @Build jars
>>>>>>>>         cd apex-malhar/contrib/
>>>>>>>>         mvn clean package -DskipTests
>>>>>>>>
>>>>>>>>         cd apex-malhar/library/
>>>>>>>>         mvn clean package -DskipTests
>>>>>>>>         copy packages to project directory
>>>>>>>>
>>>>>>>>         @Link them to the project
>>>>>>>>         Add following lines to the pom.xml
>>>>>>>>         <dependency>
>>>>>>>>             <groupId>contrib</groupId>
>>>>>>>>             <artifactId>com.datatorrent.co <http://com.datatorrent.co>ntrib.helper</artifactId>
>>>>>>>>             <version>1.0</version>
>>>>>>>>             <scope>system</scope>
>>>>>>>>
>>>>>>>>         <systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT-tests.jar</systemPath>
>>>>>>>>         </dependency>
>>>>>>>>         <dependency>
>>>>>>>>             <groupId>lib</groupId>
>>>>>>>>             <artifactId>com.datatorrent.li <http://com.datatorrent.li>b.helper</artifactId>
>>>>>>>>             <version>1.0</version>
>>>>>>>>             <scope>system</scope>
>>>>>>>>
>>>>>>>>         <systemPath>${project.basedir}/src/main/resources/malhar-library-3.8.0-SNAPSHOT-tests.jar</systemPath>
>>>>>>>>         </dependency>
>>>>>>>>         <dependency>
>>>>>>>>             <groupId>contrib</groupId>
>>>>>>>>             <artifactId>com.datatorrent.co <http://com.datatorrent.co>ntrib.rabbitmq</artifactId>
>>>>>>>>             <version>1.0</version>
>>>>>>>>             <scope>system</scope>
>>>>>>>>
>>>>>>>>         <systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT.jar</systemPath>
>>>>>>>>         </dependency>
>>>>>>>>         <dependency>
>>>>>>>>             <groupId>Attribute</groupId>
>>>>>>>>             <artifactId>com.datatorrent.api.Attribute.AttributeMap</artifactId>
>>>>>>>>             <version>1.0</version>
>>>>>>>>             <scope>system</scope>
>>>>>>>>
>>>>>>>>         <systemPath>${project.basedir}/src/main/resources/apex-api-3.7.0-SNAPSHOT.jar</systemPath>
>>>>>>>>         </dependency>
>>>>>>>>
>>>>>>>>
>>>>>>>>         Am 31.05.2017 um 18:57 schrieb Sanjay Pujare:
>>>>>>>>
>>>>>>>>         Both com.datatorrent.contrib.helper and  com.datatorrent.lib.helper
are
>>>>>>>>         under the test directory under malhar-contrib and
malhar-library
>>>>>>>>         respectively. You may need to build these jars yourself
with test scope to
>>>>>>>>         include these packages.
>>>>>>>>
>>>>>>>>         On Wed, May 31, 2017 at 9:39 AM, <apex@x5h.eu>
<mailto:apex@x5h.eu> wrote:
>>>>>>>>>         Hello, (mea culpa for messing up the headline
the first time)
>>>>>>>>>
>>>>>>>>>         I'm currently trying to get the apex-malhar rabbitmq
running. But I'm at a
>>>>>>>>>         complete loss, while the examples are running
fine I don't even get the
>>>>>>>>>         RabbitMQInputOperatorTest.java to run.
>>>>>>>>>
>>>>>>>>>         First it couldn't find the rabbitmq-client which
was solveable by adding
>>>>>>>>>         the dependency:
>>>>>>>>>
>>>>>>>>>         <dependency>
>>>>>>>>>             <groupId>com.rabbitmq</groupId>
>>>>>>>>>             <artifactId>amqp-client</artifactId>
>>>>>>>>>             <version>4.1.0</version>
>>>>>>>>>           </dependency>
>>>>>>>>>
>>>>>>>>>         But now it doesn't find the packages com.datatorrent.contrib.helper
and
>>>>>>>>>         com.datatorrent.lib.helper and can't find several
symbols.
>>>>>>>>>
>>>>>>>>>         Needless to say that I'm a beginner regarding
Apex so does anyone know
>>>>>>>>>         what exactly I'm doing wrong here?
>>>>>>>>>
>>>>>>>>>         Cheers
>>>>>>>>>
>>>>>>>>>         Manfred.
>>>>>>>>>
>>>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>>
>


Mime
View raw message