activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tim Bain <tb...@alumni.duke.edu>
Subject RE: ActiveMQ custom plugin
Date Tue, 05 Dec 2017 13:59:19 GMT
I'd suggest that you store your cache of known credentials in your plugin.
I'm not sure that you'd have access to the web session from a plugin,
because that's MQTT-specific, so I think your best bet would be to store it
in a Map (better: a Guava cache, so you get free age-off behavior) within
your plugin.

Tim

On Dec 5, 2017 3:50 AM, "Gupta Bharat" <bharat.gupta@scania.com> wrote:

> Hi Tim,
>
> I am having a scenario here. Using custom plugin I am validating my JWT
> tokens and fetching the claims from it.
>
> Claims:
>
> {
>   "unique_name": "gubhpi",
>   "sub": "gubhpi",
>   "kid": "1d884b7a-88ce-40e7-9bb8-f5e8f6b5837d",
>   "ExternalStaffReference": "0a8395d7-50b5-4cd8-ac68-6213da955f1c",
>   "ExternalCustomerReference": "acab53e8-a134-4bb8-a66d-3e80430ec4d1",
>   "AdAccount": "bguvvw",
>   "FunctionalPermissions": "490879",
>   "UserPermissions": [
>     "2482503678",
>     "1006"
>   ],
>   "iss": "https://xyz/",
>   "aud": "58b99b37-db1c78d7fef5",
>   "exp": 1510240047,
>   "nbf": 1510236447
> }
>
> From this I need to fetch "AdAccount": "bguvvw", and store it
> somewhere(please suggest me-- context or session) and when next time user
> sends me message on activemq using mqtt or web socket I need to check the
> message header with the users stored in contextsession or from some object.
> If user is present then we validate else we don’t.
>
>
> Regards,
> Bharat Gupta
> Development & Support | Platform Technician, IXCB | Scania IT
> Mobile: +46700869007, +91-9718483802
> bharat.gupta@scania.com
>
> -----Original Message-----
> From: tbain98@gmail.com [mailto:tbain98@gmail.com] On Behalf Of Tim Bain
> Sent: den 29 november 2017 04:20
> To: ActiveMQ Users <users@activemq.apache.org>
> Cc: users-owner@activemq.apache.org
> Subject: RE: ActiveMQ custom plugin
>
> On Nov 28, 2017 8:37 AM, "Gupta Bharat" <bharat.gupta@scania.com> wrote:
>
> Hello Tim,
>
> Thank you for your help. :)
>
> I have used below method to fetch that text from message when client
> publish on topic and it works.
>
> public void send(ProducerBrokerExchange producerExchange, Message
> messageSend) throws Exception {
> byte[] msg = ((ActiveMQMessage) messageSend).getContent().data;
>
>
> ((ActiveMQTextMessage)messageSend).getText()
> It returns the text from web console only.
>
>
> Really appreciate your help here. :)
>
>
>
> Great, I'm glad I've been able to help.
>
> ------------------------------------------------------------
> ------------------------------------------------------------
> ------------------------------------------------------------
> -----------------------
>
> Apart from this I have a situation here that when I try to execute below
> code:
> 1. Destination dest = session.createQueue("JWT_PAYLOAD"); ---- it is
> creating a queue
> 2. TextMessage claims = session.createTextMessage(payload); --- converting
> the string to message to send it on above made queue
> 3. producer.send(dest, claims); --- send the message on
> ActiveMQ.Advisory.Producer.Queue.JWT_PAYLOAD instead of queue JWT_PAYLOAD
> why?
>
>
> What makes you think that this message on an advisory topic was your
> message instead of a standard advisory message that's sent anytime a
> non-anonymous producer sends a message? What makes you think that thos
> message was sent instead of the message you sent to your queue, rather than
> sent *in addition to* the message on your queue? Can you share
> before-and-after screenshots of your queue's row on the Queues page of the
> web console? And were there any warnings or errors logged in the broker's
> logs when this happened?
>
> 4. Before executing producer.send(dest, claims); ----- it is going on
> byte[] msg = ((ActiveMQMessage) messageSend).getContent().data and
> returning null as it is again checking for Content and data. So my ques is
> why it is going back again on first line? Why it is not simply putting the
> data on queue?
>
>
> I didn't understand what you meant by "it is going back again on first
> line." Are you saying that the broker is putting the original message onto
> your topic? If so, that's because by calling super.send(), you're allowing
> the publishing of the original message to the original topic to continue. I
> advised you to do that because I didn't realize that you wanted to
> transform the message and publish it to a different destination, but since
> it seems like you do, I think you don't want to call super.send() after
> all.
>
> If that's not what you meant, then please help me to better understand your
> question.
>
> @Override
>         public void send(ProducerBrokerExchange producerExchange, Message
> messageSend) throws Exception {
> if(messageSend != null)
> {
>         if(messageSend.getContent() != null)
> {
>         =               byte[] msg = ((ActiveMQMessage)
> messageSend).getContent().data;
>
>                 String message = new String(msg);
>                 logger.warn("messageBody::::::::::::::::              " +
> message);
>
>                 DecodedJWT jwt = JWT.decode(message);
>                 byte[] contentBytes = String.format("%s.%s",
> jwt.getHeader(), jwt.getPayload())
>                                 .getBytes(StandardCharsets.UTF_8);
>                 byte[] signatureBytes = Base64.decodeBase64(jwt.
> getSignature());
>                 String publicKeyPEM = "-----BEGIN PUBLIC KEY-----\n"
>                                 + "please enter your key"
>                                 + "-----END PUBLIC KEY-----";
>
>                 RsaKeyUtil rsaKeyUtil = new RsaKeyUtil();
>                 PublicKey pk = rsaKeyUtil.fromPemEncoded(publicKeyPEM);
>
>                 Security.addProvider(new BouncyCastleProvider());
>                 Signature signatureInstance =
> Signature.getInstance("SHA256withRSA",
> "BC");
>                 signatureInstance.initVerify(pk);
>                 signatureInstance.update(contentBytes);
>                 boolean result = signatureInstance.verify(signatureBytes);
>
>                 byte[] pload = Base64.decodeBase64(jwt.getPayload());
>
>                 String payload = new String(pload);
>                 logger.warn("messageBody::::::::::::::::              " +
> payload);
>                 logger.warn("result::::::::::::::::              " +
> result);
>                 if (result = true) {
>
>                         // Create a ConnectionFactory
>             ActiveMQConnectionFactory connectionFactory = new
> ActiveMQConnectionFactory("vm://localhost");
>
>             // Create a Connection
>             javax.jms.Connection jmsConnection = connectionFactory.
> createConnection();
>             jmsConnection.start();
>
>             // Create a Session
>             Session session = jmsConnection.createSession(false,
> Session.AUTO_ACKNOWLEDGE);
>
>                 Destination dest = session.createQueue("JWT_PAYLOAD");
>                         MessageProducer producer =
> session.createProducer(dest);
>                         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
>
>                         TextMessage claims = session.createTextMessage(
> payload);
>                         claims.setJMSType("TEXT");
>                         logger.warn("CLAIMS>>>>>>>>>>>>>>>>
 " + claims);
>                         producer.send(dest, claims);
>                         logger.fatal("CLAIMS>>>>>>>>>>>>>>>>
 " + claims);
>
>
> //
>                         MessageConsumer consumer =
> session.createConsumer(dest);
>                         consumer.setMessageListener(this);
>
>                 }
>
>                 else {
>
>                         logger.warn("Else Authentication failed. Please
> send Valid Token");
>                 }
>
>                 super.send(producerExchange, messageSend);
>                 // getNext().send(producerExchange, messageSend);
>
>         }
>
>         else
>         {
>                 logger.fatal("MESSAGE SEND CONTENT IS NULL");
>         }
>         }
> else
> {
>         logger.fatal("MESSAGE SEND IS NULL");
> }
>         }
>
>         @Override
>         public void onMessage(javax.jms.Message message) {
>                   String messageText = null;
>                 try {
>                     if (message instanceof TextMessage) {
>                         TextMessage textMessage = (TextMessage) message;
>                         messageText = textMessage.getText();
>                         System.out.println("messageText = " +
> messageText);
>                     }
>                 } catch (JMSException e) {
>                     //Handle the exception appropriately
>                 }
>         }
>
> Regards,
> Bharat Gupta
> Development & Support | Platform Technician, IXCB | Scania IT
> Mobile: +46700869007, +91-9718483802
> bharat.gupta@scania.com
>
> -----Original Message-----
> From: tbain98@gmail.com [mailto:tbain98@gmail.com] On Behalf Of Tim Bain
> Sent: den 27 november 2017 08:12
> To: ActiveMQ Users <users@activemq.apache.org>
> Cc: users-owner@activemq.apache.org
> Subject: Re: ActiveMQ custom plugin
>
> Thank you for providing that code. I spent some time running your code, and
> found the following problems with it, which are causing this to not work as
> you intended:
>
>    1. The signature for the send() method is "public void
>    send(ProducerBrokerExchange producerExchange, Message messageSend)
> throws
>    Exception," not the signature you had defined. Because your signature
>    didn't match the method you were trying to override, you weren't
> actually
>    overriding a method that would be called when a message was sent, but
>    instead you were simply defining a public method that nothing ever
> called.
>    The compiler would have caught this for you if you'd annotated the
> method
>    as @Override; you should be using that annotation for all overriden
>    methods, for exactly this reason.
>    2. messageSend.getContent() returns null for the test message I sent via
>    the web console, but ((ActiveMQTextMessage)messageSend).getText()
> returns
>    the body. I think this same code would work for an MQTT message, but
> since
>    I've never used MQTT you'll need to confirm that.
>    3. I mentioned this earlier, but your send() method needs to call
>    super.send() at the end, otherwise the broker's not going to process any
>    messages and you're going to be wondering why the broker "doesn't work."
>    This doesn't explain why you're not seeing your logging occur, but once
> we
>    get past that problem, this is the next issue you're going to hit.
>
> Tim
>
> On Sun, Nov 26, 2017 at 12:41 PM, Gupta Bharat <bharat.gupta@scania.com>
> wrote:
>
> > Hello,
> >
> > MyBroker.java
> >
> > package jwt.validation;
> >
> > import java.util.HashMap;
> > import java.util.Map;
> >
> > import javax.jms.Destination;
> > import javax.jms.ExceptionListener;
> > import javax.jms.JMSException;
> > import javax.jms.MessageConsumer;
> > import javax.jms.Session;
> > import javax.jms.TextMessage;
> >
> > import org.apache.activemq.ActiveMQConnection;
> > import org.apache.activemq.ActiveMQConnectionFactory;
> > import org.apache.activemq.ActiveMQMessageConsumer;
> > import org.apache.activemq.broker.BrokerFilter;
> > import org.apache.activemq.broker.Connection;
> > import org.apache.activemq.broker.ConnectionContext;
> > import org.apache.activemq.broker.ProducerBrokerExchange;
> > import org.apache.activemq.broker.region.Subscription;
> > import org.apache.activemq.command.ActiveMQDestination;
> > import org.apache.activemq.command.ActiveMQMessage;
> > import org.apache.activemq.command.ConnectionInfo;
> > import org.apache.activemq.command.ConsumerInfo;
> > import org.apache.activemq.command.Message;
> > import org.apache.activemq.command.SessionInfo;
> > import org.apache.log4j.Logger;
> >
> >
> > public class MyBroker extends BrokerFilter {
> >
> >                 final static Logger logger = Logger.getLogger(MyBroker.
> > class);
> >                 private Map<String, String> userMap = new HashMap<String,
> > String>();
> >
> >                 public MyBroker(org.apache.activemq.broker.Broker next)
> {
> >                                 super(next);
> >
> >                 }
> >
> >
> >
> >                 public void send(ProducerBrokerExchange producerExchange,
> > ActiveMQMessage messageSend) throws JMSException
> >                 {
> >                                   byte[] messageBytes =
> > messageSend.getContent().data;
> >
> >                                   Message data =
> messageSend.getMessage();
> >                                   logger.warn("
> MESSAGEEEEEEEEEEEEEEEEEEEEE"
> > + data);
> >                                   String message = new
> > String(messageBytes);
> >                                   logger.warn("messageBody" + message);
> >                                   logger.info("messageBody" + message);
> >                                   logger.fatal("messageBody" + message);
> >                                   logger.debug("messageBody" + message);
> >
> >
> >                  }
> >
> >                 public void addConnection(ConnectionContext context,
> > ConnectionInfo info)
> >             throws Exception {
> >
> >                                 info.getUserName();
> >                                 info.toString();
> >
> >
> >                                 logger.warn("info1" +
> info.getUserName());
> >                                 logger.fatal("info2" + info.toString());
> >
> >        String token = context.getUserName();
> >        logger.debug("token1" + token);
> >        logger.warn("token2" + token);
> >        logger.fatal("token3" + token);
> >
> >
> >         // Then call your parent
> >         super.addConnection(context, info);
> >     }
> >
> >     public Subscription addConsumer(ConnectionContext context,
> > ConsumerInfo info) throws Exception {
> >
> >
> >
> >         ActiveMQDestination dest = info.getDestination();
> >         Connection conn = context.getConnection();
> >         if (dest != null) {
> >             String destName = info.getDestination().getPhysicalName();
> >             String clientId = context.getClientId();
> >             String allowedDest = userMap.get(clientId);
> >
> >             logger.info(">>> Got Consumer Add request { Destination: "
+
> > destName
> >                     + ", Remote Address: " + conn.getRemoteAddress()
> >                     + ", ClientID: " + clientId
> >                     + " }");
> >             if (allowedDest != null && (allowedDest.equals("*") ||
> > allowedDest.equals(destName) || destName.startsWith("ActiveMQ"))) {
> >                 logger.info(">>> Subscription allowed");
> >             } else {
> >                 logger.error(">>> Destination not allowed. Subscription
> > denied!");
> >                 throw new Exception(">>> Subscription denied!");
> >             }
> >         } else {
> >             logger.error("<<< Got Consumer Add request from Remote
> > Address:" + conn.getRemoteAddress() + ". But destination is NULL.");
> >         }
> >         return super.addConsumer(context, info);
> >     }
> > }
> >
> > Sample.java
> >
> > package jwt.validation;
> >
> > import org.apache.activemq.ActiveMQMessageProducer;
> > import org.apache.activemq.broker.Broker;
> > import org.apache.activemq.broker.BrokerPlugin;
> > import org.apache.log4j.Logger;
> >
> > public class Sample  implements BrokerPlugin {
> >
> >                 final static Logger logger =
> Logger.getLogger(Sample.class)
> > ;
> >
> >                 public static void main(String[] args) {
> >
> >
> >                 }
> >
> >                 private void runMe(String parameter){
> >
> >                                 if(logger.isDebugEnabled()){
> >                                                 logger.debug("This is
> > debug : " + parameter);
> >                                 }
> >
> >                                 if(logger.isInfoEnabled()){
> >                                                 logger.info("This is
> info
> > : " + parameter);
> >                                 }
> >
> >                                 logger.warn("This is warn : " +
> parameter);
> >                                 logger.error("This is error : " +
> > parameter);
> >                                 logger.fatal("This is fatal : " +
> > parameter);
> >
> >                 }
> >
> >                 public Broker installPlugin(Broker broker) throws
> > Exception {
> >                                 Sample sample = new Sample();
> >                                 sample.runMe("ActiveMQ Hello");
> >
> >                                 return new MyBroker(broker);
> >                 }
> >
> >
> > }
> >
> > The above are two classes I have implemented for my custom plugin.
> >
> > Regards,
> > Bharat Gupta
> > Development & Support | Platform Technician, IXCB | Scania IT
> > Mobile: +46700869007, +91-9718483802
> > bharat.gupta@scania.com<mailto:bharat.gupta@scania.com>
> >
> > From: tbain98@gmail.com [mailto:tbain98@gmail.com] On Behalf Of Tim Bain
> > Sent: Sunday, November 26, 2017 4:31 AM
> > To: ActiveMQ Users <users@activemq.apache.org>
> > Cc: users-owner@activemq.apache.org
> > Subject: RE: ActiveMQ custom plugin
> >
> >
> >
> > On Nov 24, 2017 5:44 PM, "Gupta Bharat" <bharat.gupta@scania.com<mailto:
> > bharat.gupta@scania.com>> wrote:
> >
> > hello Tim,
> >
> >
> >
> > My Custom Plugin is getting installed as I am using log4j to print random
> > stuff and its getting printed in logs.
> > Can you please describe in more detail exactly what methods that logging
> > is done in and how it proves that the plugin was actually installed? If
> > (for example) your logging is only in your constructor or your
> > initialization method, that just proves that Spring made your bean, not
> > that it was installed in the broker as a plugin.
> >
> > I am hereby attaching my codes that you can see.
> > Where? All I see is your activemq.xml snippet below, but no actual code
> > for your plugin.
> >
> > activemq.xml configurations:
> >
> > <plugins>
> >
> >                                <bean id="sample"
> >
> >                                 class="jwt.validation.Sample"
> >
> >                                 xmlns="http://www.
> > springframework.org/schema/beans"/>
> >
> >                                </plugins>
> >
> >
> >
> > Also I would like to explain you my exact requirement.
> >
> >
> >
> > The diagram below shows what I am trying to do it now.
> >
> >
> >
> >
> >
> > [cid:image001.png@01D3658E.4A6B3E20]
> > That sounds like it should be doable via an interceptor based on what I
> > know about them (i.e. the wiki, but no hands-on experience).
> >
> > QUESTIONS:
> >
> >
> >
> > 1.       how can we provide JWT to custom plugin from client?
> > If you have to use MQTT, you'll need to encode it directly into the
> > message body, because that's all MQTT supports. You could for example
> make
> > a JSON payload with two fields: the JWT and the message body. Or you
> could
> > switch to OpenWire and put the JWT into a custom header on the message
> the
> > client is sending.
> >
> > 2.       do we need to store jwt in topic first and then consume it from
> > topic to validate it? (not idle case)
> > You need to put the JWT into the message itself so that your plugin can
> > evaluate the token and then decide what to do with the message based on
> > that evaluation.
> >
> > 3.       can we call restful service from activeMQ broker that can
> > validate token and gives the result back to broker?
> > Yes, as far as I know, you'd be able to do that from within the broker.
> > (I've never done that, but I'm not aware of anything that would prevent
> you
> > from doing it.)
> >
> > Tim please let me know if I am clear with my requirements. Else we can
> > talk more on call/phone.
> >
> >
> >
> > Thanks in advance
> >
> >
> >
> >
> >
> > Regards,
> >
> > Bharat Gupta
> >
> > Development & Support | Platform Technician, IXCB | Scania IT
> >
> > Mobile: +46700869007<tel:+46%2070%20086%2090%2007>, +91-9718483802
> > <tel:+91%2097184%2083802>
> >
> > bharat.gupta@scania.com<mailto:bharat.gupta@scania.com>
> >
> >
> >
> >
> > -----Original Message-----
> > From: tbain98@gmail.com<mailto:tbain98@gmail.com> [mailto:
> > tbain98@gmail.com<mailto:tbain98@gmail.com>] On Behalf Of Tim Bain
> > Sent: Friday, November 24, 2017 7:19 PM
> > To: ActiveMQ Users <users@activemq.apache.org<mailto:
> > users@activemq.apache.org>>
> > Cc: users-owner@activemq.apache.org<mailto:users-owner@
> activemq.apache.org
> > >
> > Subject: RE: ActiveMQ custom plugin
> >
> >
> >
> > 1. Your send() method needs to call super.send() after doing your custom
> >
> > work, otherwise the message will never be sent to the real Broker object.
> >
> > Please review the wiki page I referred to earlier, which covers how to
> >
> > implement an interceptor.
> >
> >
> >
> > 2. To me it sounds like your plugin isn't getting installed. Do you have
> >
> > any indication that it is, from logs or some other mechanism? And can you
> >
> > please share the code of the class that implements BrokerPlugin, to
> confirm
> >
> > that your installPlugin() method looks right, and the XML block where you
> >
> > create the plugin bean?
> >
> >
> >
> > Tim
> >
> >
> >
> > On Nov 24, 2017 8:39 AM, "Gupta Bharat" <bharat.gupta@scania.com<mailto:
> > bharat.gupta@scania.com>> wrote:
> >
> >
> >
> > > Hello,
> >
> > >
> >
> > > I tried implementing the method as below:
> >
> > >
> >
> > >         public void send(ProducerBrokerExchange producerExchange,
> >
> > > ActiveMQMessage messageSend) throws JMSException
> >
> > >          {
> >
> > >                   byte[] messageBytes = messageSend.getContent().data;
> >
> > >                   String message = new String(messageBytes);
> >
> > >                   logger.info<http://logger.info>("messageBody" +
> > message);
> >
> > >         }
> >
> > >
> >
> > > Also I am publishing a message on a topic even though this send()
> method
> >
> > > is not been called as logger.info<http://logger.info> is not getting
> > printed in my Custom
> >
> > > Plugin.
> >
> > >
> >
> > > Could you please help me on this?
> >
> > >
> >
> > > Regards,
> >
> > > Bharat Gupta
> >
> > > Development & Support | Platform Technician, IXCB | Scania IT
> >
> > > Mobile: +46700869007<tel:+46%2070%20086%2090%2007>, +91-9718483802
> > <tel:+91%2097184%2083802>
> >
> > > bharat.gupta@scania.com<mailto:bharat.gupta@scania.com>
> >
> > >
> >
> > >
> >
> > > -----Original Message-----
> >
> > > From: tbain98@gmail.com<mailto:tbain98@gmail.com> [mailto:
> > tbain98@gmail.com] On Behalf Of Tim Bain
> >
> > > Sent: Friday, November 24, 2017 7:15 PM
> >
> > > To: ActiveMQ Users <users@activemq.apache.org<mailto:
> > users@activemq.apache.org>>
> >
> > > Cc: users-owner@activemq.apache.org<mailto:users-owner@
> > activemq.apache.org>
> >
> > > Subject: Re: ActiveMQ custom plugin
> >
> > >
> >
> > > As I said in your other thread, you would do that by implementing code
> in
> >
> > > the send() method of your subclass of BrokerFilter, which you would
> > create
> >
> > > and return in your installPlugin() method.
> >
> > >
> >
> > > Tim
> >
> > >
> >
> > > On Nov 24, 2017 3:51 AM, "Gupta Bharat" <bharat.gupta@scania.com<
> mailto:
> > bharat.gupta@scania.com>> wrote:
> >
> > >
> >
> > > Hello,
> >
> > >
> >
> > > I have a scenario as follows:
> >
> > >
> >
> > >
> >
> > > 1.       I have Custom Plugins that decrypts the data and validates it.
> >
> > >
> >
> > > 2.       I am getting Encrypted data (token) from user that need to be
> >
> > > decrypted on custom plugin.
> >
> > >
> >
> > > 3.       Client is accessing the broker using mqtt protocol
> >
> > >
> >
> > > Need Solution for the following:
> >
> > >
> >
> > > 1.       Before getting data onto topic on ACtiveMQ broker I need to
> >
> > > validate the  data(token) in my custom plugin
> >
> > >
> >
> > > 2.       How to extract a data what customer is sending? So that I can
> >
> > > extract the data, decrypt the message and validate.
> >
> > >
> >
> > >
> >
> > > Implementation so far:
> >
> > >
> >
> > > 1.       Custom Plugin is ready and JAR has been added in
> <ActiveMQ>/libs
> >
> > >
> >
> > > 2.       Added the plugin configuration in activemq.xml
> >
> > >
> >
> > >
> >
> > > Could anyone help me with a solution for the same as it's critical
> case.
> >
> > >
> >
> > > Thank you In Advance
> >
> > >
> >
> > > Regards,
> >
> > > Bharat Gupta
> >
> > > Development & Support | Platform Technician, IXCB | Scania IT
> >
> > > Mobile: +46700869007<tel:+46%2070%20086%2090%2007>, +91-9718483802
> > <tel:+91%2097184%2083802>
> >
> > > bharat.gupta@scania.com<mailto:bharat.gupta@scania.com<mailto:
> > bharat.gupta@scania.com%3cmailto:bharat.gupta@scania.com>>
> >
> > >
> >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message