jmeter-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pmouawad <...@git.apache.org>
Subject [GitHub] jmeter pull request #325: 61544 - Added read, browse and clear as communicat...
Date Fri, 10 Nov 2017 20:20:33 GMT
Github user pmouawad commented on a diff in the pull request:

    https://github.com/apache/jmeter/pull/325#discussion_r150328999
  
    --- Diff: src/protocol/jms/org/apache/jmeter/protocol/jms/sampler/JMSSampler.java ---
    @@ -142,48 +173,244 @@ public SampleResult sample(Entry entry) {
             res.sampleStart();
     
             try {
    -            TextMessage msg = createMessage();
    -            if (isOneway()) {
    -                int deliveryMode = isNonPersistent() ? 
    -                        DeliveryMode.NON_PERSISTENT:DeliveryMode.PERSISTENT;
    -                producer.send(msg, deliveryMode, Integer.parseInt(getPriority()), 
    -                        Long.parseLong(getExpiration()));
    -                res.setRequestHeaders(Utils.messageProperties(msg));
    -                res.setResponseOK();
    -                res.setResponseData("Oneway request has no response data", null);
    +            LOGGER.debug("Point-to-point mode: " + getCommunicationstyle());
    +            if (isBrowse()) {
    +                handleBrowse(res);
    +            } else if (isClearQueue()) {
    +                handleClearQueue(res);
    +            } else if (isOneway()) {
    +                handleOneWay(res);
    +            } else if (isRead()) {
    +                handleRead(context, res);
                 } else {
    -                if (!useTemporyQueue()) {
    -                    msg.setJMSReplyTo(receiveQueue);
    -                }
    -                Message replyMsg = executor.sendAndReceive(msg,
    -                        isNonPersistent() ? DeliveryMode.NON_PERSISTENT : DeliveryMode.PERSISTENT,

    -                        Integer.parseInt(getPriority()), 
    -                        Long.parseLong(getExpiration()));
    -                res.setRequestHeaders(Utils.messageProperties(msg));
    -                if (replyMsg == null) {
    -                    res.setResponseMessage("No reply message received");
    -                } else {
    -                    if (replyMsg instanceof TextMessage) {
    -                        res.setResponseData(((TextMessage) replyMsg).getText(), null);
    -                    } else {
    -                        res.setResponseData(replyMsg.toString(), null);
    -                    }
    -                    res.setResponseHeaders(Utils.messageProperties(replyMsg));
    -                    res.setResponseOK();
    -                }
    +                handleRequestResponse(res);
                 }
             } catch (Exception e) {
                 LOGGER.warn(e.getLocalizedMessage(), e);
    -            if (thrown != null){
    +            if (thrown != null) {
                     res.setResponseMessage(thrown.toString());
    -            } else {                
    +            } else {
                     res.setResponseMessage(e.getLocalizedMessage());
                 }
             }
             res.sampleEnd();
             return res;
         }
     
    +    private void handleBrowse(SampleResult res) throws JMSException {
    +        LOGGER.debug("isBrowseOnly");
    +        StringBuffer sb = new StringBuffer("");
    +        res.setSuccessful(true);
    +        sb.append("\n \n  Browse message on Send Queue " + sendQueue.getQueueName());
    +        sb.append(browseQueueDetails(sendQueue, res));
    +        res.setResponseData(sb.toString().getBytes());
    +    }
    +
    +    private void handleClearQueue(SampleResult res) throws JMSException {
    +        LOGGER.debug("isClearQueue");
    +        StringBuffer sb = new StringBuffer("");
    +        res.setSuccessful(true);
    +        sb.append("\n \n  Clear messages on Send Queue " + sendQueue.getQueueName());
    +        sb.append(clearQueue(sendQueue, res));
    +        res.setResponseData(sb.toString().getBytes());
    +    }
    +
    +    private void handleOneWay(SampleResult res) throws JMSException {
    +        LOGGER.debug("isOneWay");
    +        TextMessage msg = createMessage();
    +        int deliveryMode = isNonPersistent() ? DeliveryMode.NON_PERSISTENT : DeliveryMode.PERSISTENT;
    +        producer.send(msg, deliveryMode, Integer.parseInt(getPriority()), Long.parseLong(getExpiration()));
    +        res.setRequestHeaders(Utils.messageProperties(msg));
    +        res.setResponseOK();
    +        res.setResponseData("Oneway request has no response data", null);
    +    }
    +
    +    private void handleRead(JMeterContext context, SampleResult res) {
    +        LOGGER.debug("isRead");
    +        StringBuffer sb = new StringBuffer("");
    +        res.setSuccessful(true);
    +        Sampler sampler = context.getPreviousSampler();
    +        SampleResult sr = context.getPreviousResult();
    +        String jmsSelector = getJMSSelector();
    +        if (jmsSelector.equals("_PREV_SAMPLER_")) {
    +            if (sampler instanceof JMSSampler) {
    +                jmsSelector = sr.getResponseMessage();
    +            }
    +        }
    +        int sampleCounter = 0;
    +        int sampleTries = 0;
    +        String result = null;
    +
    +        StringBuilder buffer = new StringBuilder();
    +        StringBuilder propBuffer = new StringBuilder();
    +
    +        do {
    +            result = browseQueueForConsumption(sendQueue, jmsSelector, res, buffer, propBuffer);
    +            if (result != null) {
    +                sb.append(result);
    +                sb.append('\n');
    +                sampleCounter++;
    +            }
    +            sampleTries++;
    +        } while ((result != null) && (sampleTries < getNumberOfSamplesToAggregateAsInt()));
    +
    +        res.setResponseMessage(sampleCounter + " samples messages received");
    +        res.setResponseData(buffer.toString().getBytes()); // TODO - charset?
    +        res.setResponseHeaders(propBuffer.toString());
    +        if (sampleCounter == 0) {
    +            res.setResponseCode("404");
    +            res.setSuccessful(false);
    +        } else {
    +            res.setResponseCodeOK();
    +            res.setSuccessful(true);
    +        }
    +        res.setResponseMessage(sampleCounter + " message(s) received successfully");
    +        res.setSamplerData(getNumberOfSamplesToAggregateAsInt() + " messages expected");
    +        res.setSampleCount(sampleCounter);
    +    }
    +
    +    private void handleRequestResponse(SampleResult res) throws JMSException {
    +        TextMessage msg = createMessage();
    +        if (!useTemporyQueue()) {
    +            LOGGER.debug("NO TEMP QUEUE");
    +            msg.setJMSReplyTo(receiveQueue);
    +        }
    +        LOGGER.debug("Create temp message");
    +        Message replyMsg = executor.sendAndReceive(msg,
    +                isNonPersistent() ? DeliveryMode.NON_PERSISTENT : DeliveryMode.PERSISTENT,
    +                Integer.parseInt(getPriority()), Long.parseLong(getExpiration()));
    +        res.setRequestHeaders(Utils.messageProperties(msg));
    +        if (replyMsg == null) {
    +            res.setResponseMessage("No reply message received");
    +        } else {
    +            if (replyMsg instanceof TextMessage) {
    +                res.setResponseData(((TextMessage) replyMsg).getText(), null);
    +            } else {
    +                res.setResponseData(replyMsg.toString(), null);
    +            }
    +            res.setResponseHeaders(Utils.messageProperties(replyMsg));
    +            res.setResponseOK();
    +        }
    +    }
    +
    +    private String browseQueueForConsumption(Queue queue, String jmsSelector, SampleResult
res, StringBuilder buffer,
    +            StringBuilder propBuffer) {
    +        String retVal = null;
    +        try {
    +            QueueReceiver consumer = session.createReceiver(queue, jmsSelector);
    +            Message reply = consumer.receive(Long.valueOf(getTimeout()));
    +            LOGGER.debug("Message: " + reply);
    +            consumer.close();
    +            if (reply != null) {
    +                res.setResponseMessage("1 message(s) received successfully");
    +                res.setResponseHeaders(reply.toString());
    +                TextMessage msg = (TextMessage) reply;
    +                retVal = msg.getText();
    +                extractContent(buffer, propBuffer, msg);
    +            } else {
    +                res.setResponseMessage("No message received");
    +            }
    +        } catch (Exception ex) {
    +            ex.printStackTrace();
    +            LOGGER.error(ex.getMessage());
    +        }
    +        return retVal;
    +    }
    +
    +    private void extractContent(StringBuilder buffer, StringBuilder propBuffer, Message
msg) {
    +        if (msg != null) {
    +            try {
    +                if (msg instanceof TextMessage) {
    +                    buffer.append(((TextMessage) msg).getText());
    +                } else if (msg instanceof ObjectMessage) {
    +                    ObjectMessage objectMessage = (ObjectMessage) msg;
    +                    if (objectMessage.getObject() != null) {
    +                        buffer.append(objectMessage.getObject().getClass());
    +                    } else {
    +                        buffer.append("object is null");
    +                    }
    +                } else if (msg instanceof BytesMessage) {
    +                    BytesMessage bytesMessage = (BytesMessage) msg;
    +                    buffer.append(bytesMessage.getBodyLength() + " bytes received in
BytesMessage");
    +                } else if (msg instanceof MapMessage) {
    +                    MapMessage mapm = (MapMessage) msg;
    +                    @SuppressWarnings("unchecked") // MapNames are Strings
    +                    Enumeration<String> enumb = mapm.getMapNames();
    +                    while (enumb.hasMoreElements()) {
    +                        String name = enumb.nextElement();
    +                        Object obj = mapm.getObject(name);
    +                        buffer.append(name);
    +                        buffer.append(",");
    +                        buffer.append(obj.getClass().getCanonicalName());
    +                        buffer.append(",");
    +                        buffer.append(obj);
    +                        buffer.append("\n");
    +                    }
    +                }
    +                Utils.messageProperties(propBuffer, msg);
    +            } catch (JMSException e) {
    +                LOGGER.error(e.getMessage());
    +            }
    +        }
    +    }
    +
    +    private String browseQueueDetails(Queue queue, SampleResult res) {
    +        try {
    +            String messageBodies = new String("\n==== Browsing Messages === \n");
    +            // get some queue details
    +            QueueBrowser qBrowser = session.createBrowser(queue);
    +            // browse the messages
    +            Enumeration<?> e = qBrowser.getEnumeration();
    +            int numMsgs = 0;
    +            // count number of messages
    +            String corrID = "";
    +            while (e.hasMoreElements()) {
    +                TextMessage message = (TextMessage) e.nextElement();
    +                corrID = message.getJMSCorrelationID();
    +                if (corrID == null) {
    +                    corrID = message.getJMSMessageID();
    +                    messageBodies = messageBodies + numMsgs + " - MessageID: " + corrID
+ ": " + message.getText()
    +                            + "\n";
    +                } else {
    +                    messageBodies = messageBodies + numMsgs + " - CorrelationID: " +
corrID + ": " + message.getText()
    +                            + "\n";
    +                }
    +                numMsgs++;
    +            }
    +            res.setResponseMessage(numMsgs + " messages available on the queue");
    +            res.setResponseHeaders(qBrowser.toString());
    +            return (messageBodies + queue.getQueueName() + " has " + numMsgs + " messages");
    +        } catch (Exception e) {
    +            res.setResponseMessage("Error counting message on the queue");
    +            e.printStackTrace();
    +            LOGGER.error(e.getMessage());
    +            return "";
    +        }
    +    }
    +
    +    private String clearQueue(Queue queue, SampleResult res) {
    +        String retVal = null;
    +        try {
    +            QueueReceiver consumer = session.createReceiver(queue);
    +            Message deletedMsg = null;
    +            long deletedMsgCount = 0;
    +            do {
    +                deletedMsg = consumer.receiveNoWait();
    +                if (deletedMsg != null) {
    +                    deletedMsgCount++;
    +                    deletedMsg.acknowledge();
    --- End diff --
    
    Shouldn't this be parameterized ? There are 4 modes, if we ACK for AUTO_ACK that would
be wrong no ?


---

Mime
View raw message