apex-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@x5h.eu
Subject Re: How to set RabbitMQ exchangeType, exchange, queueName, routingKey Parameters
Date Tue, 27 Jun 2017 09:44:19 GMT
The problem is that i don't see any connections in the rabbitMQ log. I
don't even see any attempts to connect to the Server. Usually I should
see at least some failed tries. I post the complete program perhaps i do
have an error somewhere there.

package com.example.rabbitMQ;

import org.apache.hadoop.conf.Configuration;

import com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.DAG;
import com.datatorrent.lib.io.jms.JMSStringInputOperator;
import com.datatorrent.lib.io.ConsoleOutputOperator;


@ApplicationAnnotation(name="RabbitMQ2HDFS")
public class RabbitMQApplication implements StreamingApplication
{

  @Override
  public void populateDAG(DAG dag, Configuration conf)
  {
    RabbitMQInputOperator in = dag.addOperator("rabbitInput",new
RabbitMQInputOperator());
    in.setHost("localhost");
    in.setExchange("apex");
    in.setExchangeType("fanout");
    in.setRoutingKey("rktest");
    ConsoleOutputOperator console = dag.addOperator("console", new
ConsoleOutputOperator());
    dag.addStream("rand_console",in.outputPort, console.input);
}
}

I'm really at an end here. There are no errors in any log file but the
stream is also not connecting for some reason I can't understand. This
is the client library I'm using.
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.1.0</version>
  </dependency>
 
Cheers
Manfred.

Am 26.06.2017 um 17:34 schrieb vikram patil:
> If you have used any routing_key , please specify that using
> in.setRoutingKey() . I dont see that one in your code.
>
> On Mon, Jun 26, 2017 at 9:00 PM, <apex@x5h.eu <mailto:apex@x5h.eu>> wrote:
>
>     I get no exception in the apex.log and yes the queue is durable.
>
>                                         vhost: /
>                                          name: task
>                                   auto_delete: False
>      backing_queue_status.avg_ack_egress_rate: 0.0
>     backing_queue_status.avg_ack_ingress_rate: 0.0
>          backing_queue_status.avg_egress_rate: 0.0
>         backing_queue_status.avg_ingress_rate: 0.5866956420847993
>                    backing_queue_status.delta: ["delta", "undefined",
>     0, 0, "undefined"]
>                      backing_queue_status.len: 31
>                     backing_queue_status.mode: default
>              backing_queue_status.next_seq_id: 31
>                       backing_queue_status.q1: 0
>                       backing_queue_status.q2: 0
>                       backing_queue_status.q3: 0
>                       backing_queue_status.q4: 31
>         backing_queue_status.target_ram_count: infinity
>                          consumer_utilisation: None
>                                     consumers: 0
>                                       durable: True
>                                     exclusive: False
>
>     The goal here is to connect to the RabbitMQ and fetch messages and
>     write them to the console. I send the messages via a script or
>     directly via the rabbitmqadmin console. Any Ideas why the program
>     does not read from the rabbitmq?
>
>     Cheers Manfred.
>
>
>
>     Am 26.06.2017 um 17:14 schrieb vikram patil:
>>     Hi Manfred,
>>
>>     Are you getting any exception in the logs ?  Check if your queue
>>     is durable.  
>>
>>     Thanks & Regards,
>>     Vikram
>>
>>     On Mon, Jun 26, 2017 at 8:37 PM, <apex@x5h.eu
>>     <mailto:apex@x5h.eu>> wrote:
>>
>>         I have a problem getting the connection working with RabbitMQ:
>>
>>         I host the RabbitMQ on the same server the apex application
>>         is running.
>>
>>         +--------------------+---------+
>>         |        name        |  type   |
>>         +--------------------+---------+
>>         | apex               | fanout  |
>>         +--------------------+---------+
>>
>>         +------+----------+
>>         | name | messages |
>>         +------+----------+
>>         | task | 31       |
>>         +------+----------+
>>
>>         In the program for test issues I declare it this way:
>>
>>          @Override
>>           public void populateDAG(DAG dag, Configuration conf)
>>           {
>>             RabbitMQInputOperator in =
>>         dag.addOperator("rabbitInput",new RabbitMQInputOperator());
>>             in.setHost("localhost");
>>             in.setExchange("apex");
>>             in.setExchangeType("fanout");
>>             in.setQueueName("task");
>>             ConsoleOutputOperator console =
>>         dag.addOperator("console", new ConsoleOutputOperator());
>>             dag.addStream("rand_console",in.outputPort, console.input);
>>         }
>>
>>         But a look at the operators shows that it does not fetch any
>>         messages:
>>
>>          {
>>             "id": "1",
>>             "name": "rabbitInput",
>>             "className":
>>         "com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator",
>>             "container": null,
>>             "host": null,
>>             "totalTuplesProcessed": "0",
>>             "totalTuplesEmitted": "0",
>>             "tuplesProcessedPSMA": "0",
>>             "tuplesEmittedPSMA": "0",
>>             "cpuPercentageMA": "0.0",
>>             "latencyMA": "0",
>>             "status": "PENDING_DEPLOY",
>>             "lastHeartbeat": "0",
>>             "failureCount": "0",
>>             "recoveryWindowId": "0",
>>             "currentWindowId": "0",
>>             "ports": [],
>>             "unifierClass": null,
>>             "logicalName": "rabbitInput",
>>             "recordingId": null,
>>             "counters": null,
>>             "metrics": null,
>>             "checkpointStartTime": "0",
>>             "checkpointTime": "0",
>>             "checkpointTimeMA": "0"
>>           },
>>
>>         What am I doing wrong here? Since i can configure the
>>         RAbbitMQ side is there a preferred way of configuration for
>>         apex?
>>
>>         Cheers
>>
>>         Manfred.
>>
>>
>>
>
>


Mime
View raw message