[ https://issues.apache.org/jira/browse/NIFI-865?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15126753#comment-15126753
]
ASF GitHub Bot commented on NIFI-865:
-------------------------------------
Github user mattyb149 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/199#discussion_r51459684
--- Diff: nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java
---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.amqp.processors;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import com.rabbitmq.client.AMQP.BasicProperties;
+import com.rabbitmq.client.GetResponse;
+
+/**
+ * Consuming AMQP processor which upon each invocation of
+ * {@link #onTrigger(ProcessContext, ProcessSession)} method will construct a
+ * {@link FlowFile} containing the body of the consumed AMQP message and AMQP
+ * properties that came with message which are added to a {@link FlowFile} as
+ * attributes.
+ */
+@Tags({ "amqp", "rabbit", "get", "message", "receive", "consume" })
+@CapabilityDescription("Creates a AMQP Message from the contents of a FlowFile and sends
the message to an AMQP Server")
+public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
+
+ public static final PropertyDescriptor QUEUE = new PropertyDescriptor.Builder()
+ .name("Queue")
+ .description("Source queue")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("All FlowFiles that are received from the AMQP queue are routed
to this relationship")
+ .build();
+
+ private final static List<PropertyDescriptor> propertyDescriptors;
+
+ private final static Set<Relationship> relationships;
+
+ /*
+ * Will ensure that the list of property descriptors is build only once.
+ * Will also create a Set of relationships
+ */
+ static {
+ List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
+ _propertyDescriptors.add(QUEUE);
+ _propertyDescriptors.addAll(descriptors);
+ propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
+
+ Set<Relationship> _relationships = new HashSet<>();
+ _relationships.add(REL_SUCCESS);
+ relationships = Collections.unmodifiableSet(_relationships);
+ }
+
+ /**
+ * Will construct a {@link FlowFile} containing the body of the consumed
+ * AMQP message (if {@link GetResponse} returned by {@link AMQPConsumer} is
+ * not null) and AMQP properties that came with message which are added to a
+ * {@link FlowFile} as attributes, transferring {@link FlowFile} to
+ * 'success' {@link Relationship}.
+ */
+ @Override
+ protected void rendezvousWithAmqp(ProcessContext context, ProcessSession processSession)
throws ProcessException {
+ GetResponse response = this.targetResource.consume();
+ if (response != null){
+ FlowFile flowFile = processSession.create();
+ BasicProperties amqpProperties = response.getProps();
+ flowFile = AMQPUtils.updateFlowFileAttributesWithAmqpProperties(amqpProperties,
flowFile, processSession);
+ processSession.transfer(flowFile, REL_SUCCESS);
+ }
--- End diff --
perhaps an "else { context.yield() }" here to keep from task churn
> Add processors to Get and Put to/from AMQP-based messaging systems
> ------------------------------------------------------------------
>
> Key: NIFI-865
> URL: https://issues.apache.org/jira/browse/NIFI-865
> Project: Apache NiFi
> Issue Type: Wish
> Components: Extensions
> Reporter: Joseph Witt
> Assignee: Oleg Zhurakousky
> Labels: beginner
> Fix For: 0.5.0
>
>
> David Smith in the mailing list has already begun work on this. The link to his current
set of code in Github is here: https://github.com/helicopterman22/nifi_amqp_processors
> He is seeking help with licensing/unit testing/etc.. so that it can end up in a build.
> We have word from another user now that they too are interested in this. They specifically
are using RabbitMQ but we should be able to implement (ideally) a pure AMQP protocol friendly
version and be good to go.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
|