apex-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@x5h.eu
Subject Re: Apex and RabbitMQ problems with the input operator
Date Fri, 07 Jul 2017 09:18:55 GMT
After various tests I finally got it all working nicely and for future
users I'll post here how.

First the rabbitMQ configuration that was the only working one:
rabbitmqadmin declare exchange name=apex type=fanout durable=false
rabbitmqadmin declare queue name=test durable=true
rabbitmqadmin binding source="apex" destination_type="queue"
destination="test" routing_key="rktest"

It is important that apex only accepts a non-durable exchange. But this
means you have to recreate it everytime you restart your RabbitMQ service.

The "Mkdirs failed to create" error:
This just means that the DFS service is down or in my case the safemode
is on.
hdfs dfsadmin -safemode get
hdfs dfsadmin -safemode enter

My example uses the following (I moved the operator values in a
corresponding *.xml file I just listed them here for better understanding):

@ApplicationAnnotation(name="RabbitMQ2HDFS")
public class RabbitMQApplication implements StreamingApplication
{

  @Override
  public void populateDAG(DAG dag, Configuration conf)
  {
    RabbitMQInputOperator in = dag.addOperator("rabbitInput",new
RabbitMQInputOperator());
    in.setHost("192.168.33.63");
    in.setExchange("apex");
    in.setExchangeType("fanout");
    in.setQueueName("test");

   LineOutputOperator out = dag.addOperator("fileOut", new
LineOutputOperator());
   out.setFilePath("/hdfs/rabbitMQ");
   out.setBaseName("rabbitOut");
   out.setMaxLength(1024);
   out.setRotationWindows(4);
   dag.addStream("data", in.outputPort, out.input);
}
}

And the corresponding Output Operator. The only important thing here was
that it extends the byte AbstractFileOutputOperator

public class LineOutputOperator extends AbstractFileOutputOperator<byte[]>
{
  private static final String NL = System.lineSeparator();
  private static final Charset CS = StandardCharsets.UTF_8;

  @NotNull
  private String baseName;

  @Override
  public byte[] getBytesForTuple(byte[] t) {
    String result = new String(t, CS) + NL;
    return result.getBytes(CS);
 }

  @Override
  protected String getFileName(byte[] tuple) {
    return baseName;
  }

  public String getBaseName() { return baseName; }
  public void setBaseName(String v) { baseName = v; }
}

The most pressing issue was that it won't run on the yarn cluster only
in local mode. I still have no idea why it diden't run but my best guess
is that it was a bad idea in the beginning to try the apex app in a
Rasperry Pi 3 cluster. I switched to a standard Arch Linux Server with
8GB RAM and without changing a thing in the application it worked
perfectly.

Thanks for all the help!

Am 22.06.2017 um 11:33 schrieb apex@x5h.eu:
>
> I drilled the error down to this message:
>
> Mkdirs failed to create
> file:/home/pi/datatorrent/apps/application_1498123667708_0001/checkpoints/2
>
> I guess i have something buggy in my configuration does any of you
> know how to solve this error? Should I start the application with the
> same user I'm starting yarn?
>
> Cheers
>
> Manfred.
>
>
>
> Am 10.06.2017 um 14:50 schrieb apex@x5h.eu:
>>
>> Hello,
>>
>> you were completely right it seems that there are problems with my
>> test scenario regarding the hadoop, yarn installation and the
>> application never starts. I found this entries in the log:
>>
>> 2017-06-10 14:33:02,623 INFO
>> org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService:
>> Registering app attempt : appattempt_1495629011552_0011_000001
>> 2017-06-10 14:33:02,623 INFO
>> org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl:
>> appattempt_1495629011552_0011_000001 State change from NEW to SUBMITTED
>> 2017-06-10 14:33:02,624 INFO
>> org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue:
>> not starting application as amIfStarted exceeds amLimit
>> 2017-06-10 14:33:02,624 INFO
>> org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue:
>> Application added - appId: application_1495629011552_0011 user:
>> org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue$User@11536dc,
>> leaf-queue: default #user-pending-applications: 1
>> #user-active-applications: 1 #queue-pending-applications: 1
>> #queue-active-applications: 1
>>
>> Therefore the application never leaves the state "undefined". Since
>> the local tests were running fine and the launch of the application
>> didn't raise an error I missed the problem with the hadoop
>> installation. Thanks for the correct hint to look at the hadoop cluster.
>>
>> Cheers
>> Manfred.
>>
>>
>> Am 09.06.2017 um 15:23 schrieb vikram patil:
>>> 1) Are you doing it on your local environment?
>>> 2) If you are doing it locally I would suggest following options
>>>      1) If you dont want to create queue on rabbitmq by yourself .
>>> Set queuename on operator
>>>         in.setQueueName("YOUR_QUEUE_NAME" )
>>>          Operator will do following steps :
>>>            * Create Durable Queue in RabbitMQ
>>>            * You have specfied exchange and exchangeType .
>>>                  So it will create an exchange using this
>>>  information and bind created queue with exchange with default
>>> routing key which will be "".
>>>         Right now it must be creating auto generated unique named
>>> queue for you. 
>>>        
>>>       2) You can create your own exchange and durable queue using
>>> rabbitmq admin . You will have to install rabbitmq plugins for that.
>>> You can use it to publish some test data as well.
>>>
>>> Using apex-cli you can check status of your application, if its
>>> failing then you should check logs from userlogs in hadoop logs
>>> directory.
>>>
>>> Thanks & Regards,
>>> Vikram
>>>
>>>   
>>>        
>>>
>>> On Fri, Jun 9, 2017 at 6:42 PM, <apex@x5h.eu <mailto:apex@x5h.eu>>
>>> wrote:
>>>
>>>     Finally got rid of all errors but now I have the problem that
>>>     the apex application does not seem to register at the RabbitMQ
>>>     exchange.
>>>
>>>     This is my code:
>>>
>>>     @ApplicationAnnotation(name="RabbitMQ2HDFS")
>>>     public class RabbitMQApplication implements StreamingApplication
>>>     {
>>>
>>>       @Override
>>>       public void populateDAG(DAG dag, Configuration conf)
>>>       {
>>>         RabbitMQInputOperator in = dag.addOperator("rabbitInput",new
>>>     RabbitMQInputOperator());
>>>         in.setHost("localhost");
>>>         //in.setPort(5672);
>>>         in.setExchange("apex");
>>>         in.setExchangeType("fanout");
>>>         ConsoleOutputOperator console = dag.addOperator("console",
>>>     new ConsoleOutputOperator());
>>>         dag.addStream("rand_console",in.outputPort, console.input);
>>>
>>>     }
>>>
>>>     }
>>>
>>>     If I launch the application everthing executes without an error
>>>     but if i list the bindings on the exchange, there is none.
>>>
>>>     Anyone even an idea how i can start to debug this?
>>>
>>>     Cheers
>>>     Manfred.
>>>
>>>
>>>
>>>     Am 08.06.2017 um 18:04 schrieb apex@x5h.eu <mailto:apex@x5h.eu>:
>>>>
>>>>     Okay i found the error, I copied the LineOutputOperator.java
>>>>     <https://github.com/DataTorrent/examples/blob/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ/LineOutputOperator.java>
>>>>     from the jmsActiveMQ example and found  there
>>>>     public class LineOutputOperator extends
>>>>     AbstractFileOutputOperator<String>
>>>>
>>>>     Instead i took the LineOutputOperator.java 
>>>>     <https://github.com/DataTorrent/examples/blob/master/tutorials/kafka/src/main/java/com/example/myapexapp/LineOutputOperator.java>from
>>>>     the Kafka 0.9 example there the class is correctly defined for
>>>>     the RabbitMQInputOperator
>>>>
>>>>     So far so good now it compiles without errors.
>>>>
>>>>     Cheers
>>>>
>>>>     Manfred
>>>>
>>>>     Am 08.06.2017 um 17:38 schrieb apex@x5h.eu <mailto:apex@x5h.eu>:
>>>>>
>>>>>     I still don't get it completely: (The rest of the code is in
>>>>>     the Email before, this is only the necessary sample)
>>>>>
>>>>>      1. dag.addStream("test", rabbitInput.output, out.input);
>>>>>         Results in the following error:
>>>>>         [ERROR]   symbol:   variable output
>>>>>         [ERROR]   location: variable rabbitInput of type
>>>>>         com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator
>>>>>
>>>>>      2. dag.addStream("test", rabbitInput.outputPort, out.input);
>>>>>         Results in the following error:
>>>>>         [ERROR]
>>>>>         /home/pi/apex/rabbitMQ/src/main/java/com/example/rabbitMQ/RabbitMQApplication.java:[31,8]
>>>>>         no suitable method found for
>>>>>         addStream(java.lang.String,com.datatorrent.api.DefaultOutputPort<byte[]>,com.datatorrent.api.DefaultInputPort<java.lang.String>)
>>>>>         [ERROR]     method
>>>>>         com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<?
>>>>>         extends T>,com.datatorrent.api.Operator.InputPort<? super
>>>>>         T>...) is not applicable
>>>>>         [ERROR]       (inferred type does not conform to upper
>>>>>         bound(s)
>>>>>         [ERROR]         inferred: byte[]
>>>>>         [ERROR]         upper bound(s):
>>>>>         java.lang.String,java.lang.Object)
>>>>>         [ERROR]     method
>>>>>         com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<?
>>>>>         extends T>,com.datatorrent.api.Operator.InputPort<? super
>>>>>         T>) is not applicable
>>>>>         [ERROR]       (inferred type does not conform to upper
>>>>>         bound(s)
>>>>>         [ERROR]         inferred: byte[]
>>>>>         [ERROR]         upper bound(s):
>>>>>         java.lang.String,java.lang.Object)
>>>>>         [ERROR]     method
>>>>>         com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<?
>>>>>         extends T>,com.datatorrent.api.Operator.InputPort<? super
>>>>>         T>,com.datatorrent.api.Operator.InputPort<? super T>)
is
>>>>>         not applicable
>>>>>         [ERROR]       (cannot infer type-variable(s) T
>>>>>         [ERROR]         (actual and formal argument lists differ
>>>>>         in length))
>>>>>
>>>>>
>>>>>
>>>>>     It seems that on the one hand the RabbitMQInputOperator.class
>>>>>     does not have an output method and on the other hand the
>>>>>     addStream method only accepts outputPort combined with
>>>>>     inputPort methods or output and input methods of the
>>>>>     corresponding classes. Does that mean I only can use a class
>>>>>     that implements inputPort method for this example?
>>>>>
>>>>>     Cheers
>>>>>
>>>>>     Manfred.
>>>>>
>>>>>
>>>>>
>>>>>     Am 08.06.2017 um 10:05 schrieb apex@x5h.eu <mailto:apex@x5h.eu>:
>>>>>>
>>>>>>     Sorry the two Snippets below where from different iterations.
>>>>>>     The Error I get is the following:
>>>>>>
>>>>>>     [ERROR]
>>>>>>     /home/pi/apex/rabbitMQ/src/main/java/com/example/rabbitMQ/RabbitMQApplication.java:[31,38]
>>>>>>     cannot find symbol
>>>>>>     [ERROR]   symbol:   variable output
>>>>>>     [ERROR]   location: variable rabbitInput of type
>>>>>>     com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator
>>>>>>
>>>>>>     My Code is as follows:
>>>>>>
>>>>>>
>>>>>>     package com.example.rabbitMQ;
>>>>>>
>>>>>>     import org.apache.hadoop.conf.Configuration;
>>>>>>
>>>>>>     import com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator;
>>>>>>     import com.datatorrent.api.annotation.ApplicationAnnotation;
>>>>>>     import com.datatorrent.api.StreamingApplication;
>>>>>>     import com.datatorrent.api.DAG;
>>>>>>     import com.datatorrent.lib.io.jms.JMSStringInputOperator;
>>>>>>
>>>>>>     @ApplicationAnnotation(name="RabbitMQ2HDFS")
>>>>>>     public class RabbitMQApplication implements StreamingApplication
>>>>>>     {
>>>>>>
>>>>>>       @Override
>>>>>>       public void populateDAG(DAG dag, Configuration conf)
>>>>>>       {
>>>>>>
>>>>>>         RabbitMQInputOperator rabbitInput =
>>>>>>     dag.addOperator("Consumer",RabbitMQInputOperator.class);
>>>>>>         rabbitInput.setHost("localhost");
>>>>>>         rabbitInput.setPort(5672);
>>>>>>         rabbitInput.setExchange("");
>>>>>>         rabbitInput.setQueueName("hello");
>>>>>>         LineOutputOperator out = dag.addOperator("fileOut", new
>>>>>>     LineOutputOperator());
>>>>>>
>>>>>>         dag.addStream("data", rabbitInput.output, out.input);
>>>>>>     }
>>>>>>     }
>>>>>>
>>>>>>     Cheers
>>>>>>
>>>>>>     Manfred.
>>>>>>
>>>>>>
>>>>>>
>>>>>>     Am 08.06.2017 um 04:34 schrieb vikram patil:
>>>>>>>     Hi,
>>>>>>>     dag.addStream() is actually used to create stream of from
one Operator output port to other operators input port.
>>>>>>>     RabbitMQInputOperator consumer = dag.addOperator("Consumer",
>>>>>>>     RabbitMQInputOperator.class);
>>>>>>>     dag.addStream("data", *rabbitInput*.output, out.input); 
>>>>>>>     Looks like your operator name is incorrect? I see in your
>>>>>>>     code snippet above, name of of RabbiMQInputOperator
>>>>>>>     is *"Consumer".*
>>>>>>>
>>>>>>>     In property name, you need to provide operator name you have
>>>>>>>     specified in addOperator(*"NAME OF THE OPERATOR"*,
>>>>>>>     RabbitMQInputOperator.class) api call.
>>>>>>>      
>>>>>>>          dt.operator.*rabbitMQIn*.prop.tuple_blast ( Syntax is
>>>>>>>     correct correct given your operator name is correct ). 
>>>>>>>
>>>>>>>     ( It should be dt.operator.*Consumer*.prop.tuple_blast based
>>>>>>>     on your code snippet ).
>>>>>>>
>>>>>>>     I think tests which are provided in the Apache Malhar are
>>>>>>>     very detailed, they run in local mode as unit tests so we
>>>>>>>     have mocked actual rabbitmq by custom message publisher.

>>>>>>>
>>>>>>>     For timebeing you set only queuename and hostname as
>>>>>>>
>>>>>>>        //  set your rabbitmq host . 
>>>>>>>     consumer.setHost("localhost"); // set your rabbitmq port
>>>>>>>     consumer.setPort(5672) // It depends on your rabbitmq
>>>>>>>     producer configuration but by default // its default
>>>>>>>     exchange with "" ( No Name is provided ).
>>>>>>>     consumer.setExchange(""); // set your queue name
>>>>>>>     consumer.setQueueName("YOUR_QUEUE_NAME")
>>>>>>>
>>>>>>>     	
>>>>>>>
>>>>>>>
>>>>>>>     If its okay, could you please share code from your
>>>>>>>     application.java and properties.xml here?
>>>>>>>
>>>>>>>     Thanks,
>>>>>>>     Vikram
>>>>>>>
>>>>>>>
>>>>>>>     On Thu, Jun 8, 2017 at 12:32 AM, <apex@x5h.eu
>>>>>>>     <mailto:apex@x5h.eu>> wrote:
>>>>>>>
>>>>>>>         Thanks very much for the help. The only problem left
is
>>>>>>>         that I don't quite understand dag.addstream().
>>>>>>>
>>>>>>>         I tried this
>>>>>>>
>>>>>>>         RabbitMQInputOperator consumer = dag.addOperator("Consumer",
>>>>>>>         RabbitMQInputOperator.class);
>>>>>>>         dag.addStream("data", rabbitInput.output, out.input);

>>>>>>>
>>>>>>>         but obviously this doesn't work. What I don't get is
the
>>>>>>>         difference between the ActiveMQ example and the RabbitMQ
>>>>>>>         example. I looked over the test examples for RabbitMQ
>>>>>>>         but don't seem to understand the logic behind it.
>>>>>>>
>>>>>>>         Is this the correct wax to specify properties:
>>>>>>>           <property>
>>>>>>>             <name>dt.operator.rabbitMQIn.prop.tuple_blast</name>
>>>>>>>             <value>500</value>
>>>>>>>           </property>
>>>>>>>
>>>>>>>         Cheers
>>>>>>>         Manfred.
>>>>>>>
>>>>>>>
>>>>>>>         Am 07.06.2017 um 12:03 schrieb Vikram Patil:
>>>>>>>>         Yes, you would need Application.java which will be
way to define a DAG
>>>>>>>>         for Apex Application.
>>>>>>>>
>>>>>>>>         Please have look at the code from following example
to find out how to
>>>>>>>>         write JMS ActiveMQ based example:
>>>>>>>>
>>>>>>>>         https://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ
>>>>>>>>         <https://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ>
>>>>>>>>
>>>>>>>>         This is how you can instantiate RabbitMQINputOperator
and to a dag.
>>>>>>>>         RabbitMQInputOperator consumer = dag.addOperator("Consumer",
>>>>>>>>         RabbitMQInputOperator.class);
>>>>>>>>
>>>>>>>>         https://stackoverflow.com/questions/42207495/how-to-use-apache-apex-malhar-rabbitmq-operator-in-dag
>>>>>>>>         <https://stackoverflow.com/questions/42207495/how-to-use-apache-apex-malhar-rabbitmq-operator-in-dag>
>>>>>>>>
>>>>>>>>         Following properties need to be specified in properties.xml
>>>>>>>>
>>>>>>>>         * Properties:<br>
>>>>>>>>         * <b>tuple_blast</b>: Number of tuples
emitted in each burst<br>
>>>>>>>>         * <b>bufferSize</b>: Size of holding
buffer<br>
>>>>>>>>         * <b>host</b>:the address for the consumer
to connect to rabbitMQ producer<br>
>>>>>>>>         * <b>exchange</b>:the exchange for the
consumer to connect to rabbitMQ
>>>>>>>>         producer<br>
>>>>>>>>         * <b>exchangeType</b>:the exchangeType
for the consumer to connect to
>>>>>>>>         rabbitMQ producer<br>
>>>>>>>>         * <b>routingKey</b>:the routingKey for
the consumer to connect to
>>>>>>>>         rabbitMQ producer<br>
>>>>>>>>         * <b>queueName</b>:the queueName for
the consumer to connect to
>>>>>>>>         rabbitMQ producer<br>
>>>>>>>>         * <br>
>>>>>>>>
>>>>>>>>         Reference: https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
>>>>>>>>         <https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java>
>>>>>>>>
>>>>>>>>         Thanks,
>>>>>>>>         Vikram
>>>>>>>>
>>>>>>>>         On Wed, Jun 7, 2017 at 3:19 PM,  <apex@x5h.eu>
<mailto:apex@x5h.eu> wrote:
>>>>>>>>>         Hello,
>>>>>>>>>
>>>>>>>>>         I compiled the whole thing but now I don't know
exactly how to get it
>>>>>>>>>         running in Apex. Do I need an application.java
like in the tutorial? I do
>>>>>>>>>         have a simple RabbitMQ queue up and running on
the server. How do I consume
>>>>>>>>>         the messages with Apex and write them to hdfs?
>>>>>>>>>
>>>>>>>>>         Cheers,
>>>>>>>>>
>>>>>>>>>         Manfred
>>>>>>>>>
>>>>>>>>>         Following steps were necessary to get the RabbitMq
test to compile
>>>>>>>>>
>>>>>>>>>         @TimeoutException
>>>>>>>>>         import java.util.concurrent.TimeoutException;
>>>>>>>>>         public void setup() throws IOException,TimeoutException
>>>>>>>>>         public void teardown() throws IOException,TimeoutException
>>>>>>>>>         protected void runTest(final int testNum) throws
IOException
>>>>>>>>>
>>>>>>>>>         @Build jars
>>>>>>>>>         cd apex-malhar/contrib/
>>>>>>>>>         mvn clean package -DskipTests
>>>>>>>>>
>>>>>>>>>         cd apex-malhar/library/
>>>>>>>>>         mvn clean package -DskipTests
>>>>>>>>>         copy packages to project directory
>>>>>>>>>
>>>>>>>>>         @Link them to the project
>>>>>>>>>         Add following lines to the pom.xml
>>>>>>>>>         <dependency>
>>>>>>>>>             <groupId>contrib</groupId>
>>>>>>>>>             <artifactId>com.datatorrent.co <http://com.datatorrent.co>ntrib.helper</artifactId>
>>>>>>>>>             <version>1.0</version>
>>>>>>>>>             <scope>system</scope>
>>>>>>>>>
>>>>>>>>>         <systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT-tests.jar</systemPath>
>>>>>>>>>         </dependency>
>>>>>>>>>         <dependency>
>>>>>>>>>             <groupId>lib</groupId>
>>>>>>>>>             <artifactId>com.datatorrent.li <http://com.datatorrent.li>b.helper</artifactId>
>>>>>>>>>             <version>1.0</version>
>>>>>>>>>             <scope>system</scope>
>>>>>>>>>
>>>>>>>>>         <systemPath>${project.basedir}/src/main/resources/malhar-library-3.8.0-SNAPSHOT-tests.jar</systemPath>
>>>>>>>>>         </dependency>
>>>>>>>>>         <dependency>
>>>>>>>>>             <groupId>contrib</groupId>
>>>>>>>>>             <artifactId>com.datatorrent.co <http://com.datatorrent.co>ntrib.rabbitmq</artifactId>
>>>>>>>>>             <version>1.0</version>
>>>>>>>>>             <scope>system</scope>
>>>>>>>>>
>>>>>>>>>         <systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT.jar</systemPath>
>>>>>>>>>         </dependency>
>>>>>>>>>         <dependency>
>>>>>>>>>             <groupId>Attribute</groupId>
>>>>>>>>>             <artifactId>com.datatorrent.api.Attribute.AttributeMap</artifactId>
>>>>>>>>>             <version>1.0</version>
>>>>>>>>>             <scope>system</scope>
>>>>>>>>>
>>>>>>>>>         <systemPath>${project.basedir}/src/main/resources/apex-api-3.7.0-SNAPSHOT.jar</systemPath>
>>>>>>>>>         </dependency>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>         Am 31.05.2017 um 18:57 schrieb Sanjay Pujare:
>>>>>>>>>
>>>>>>>>>         Both com.datatorrent.contrib.helper and  com.datatorrent.lib.helper
are
>>>>>>>>>         under the test directory under malhar-contrib
and malhar-library
>>>>>>>>>         respectively. You may need to build these jars
yourself with test scope to
>>>>>>>>>         include these packages.
>>>>>>>>>
>>>>>>>>>         On Wed, May 31, 2017 at 9:39 AM, <apex@x5h.eu>
<mailto:apex@x5h.eu> wrote:
>>>>>>>>>>         Hello, (mea culpa for messing up the headline
the first time)
>>>>>>>>>>
>>>>>>>>>>         I'm currently trying to get the apex-malhar
rabbitmq running. But I'm at a
>>>>>>>>>>         complete loss, while the examples are running
fine I don't even get the
>>>>>>>>>>         RabbitMQInputOperatorTest.java to run.
>>>>>>>>>>
>>>>>>>>>>         First it couldn't find the rabbitmq-client
which was solveable by adding
>>>>>>>>>>         the dependency:
>>>>>>>>>>
>>>>>>>>>>         <dependency>
>>>>>>>>>>             <groupId>com.rabbitmq</groupId>
>>>>>>>>>>             <artifactId>amqp-client</artifactId>
>>>>>>>>>>             <version>4.1.0</version>
>>>>>>>>>>           </dependency>
>>>>>>>>>>
>>>>>>>>>>         But now it doesn't find the packages com.datatorrent.contrib.helper
and
>>>>>>>>>>         com.datatorrent.lib.helper and can't find
several symbols.
>>>>>>>>>>
>>>>>>>>>>         Needless to say that I'm a beginner regarding
Apex so does anyone know
>>>>>>>>>>         what exactly I'm doing wrong here?
>>>>>>>>>>
>>>>>>>>>>         Cheers
>>>>>>>>>>
>>>>>>>>>>         Manfred.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>>
>>
>


Mime
View raw message