activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Curt Jutzi (JIRA)" <j...@apache.org>
Subject [jira] [Created] (AMQ-5387) NIO Fails (for MQTT at least) on back-to-back Transport frames
Date Fri, 10 Oct 2014 21:13:33 GMT
Curt Jutzi created AMQ-5387:
-------------------------------

             Summary: NIO Fails (for MQTT at least) on back-to-back Transport frames
                 Key: AMQ-5387
                 URL: https://issues.apache.org/jira/browse/AMQ-5387
             Project: ActiveMQ
          Issue Type: Bug
    Affects Versions: 5.11.0
         Environment: Paho as client 
            Reporter: Curt Jutzi
             Fix For: NEEDS_REVIEW


{noformat}
**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.activemq.transport.mqtt;

import static org.junit.Assert.*;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.net.util.Base64;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import org.eclipse.paho.client.mqttv3.MqttSecurityException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.junit.Test;



/**
 * Test the NIO transport with this Test group
 */
public class PahoMQTTNIOTest extends PahoMQTTTest  implements MqttCallback {

    AtomicInteger m_receiveCounter = new AtomicInteger();
    String BigMessage
    static ArrayList<MqttClient> mqttClients = null;
    static final Integer staticSyncObj = new Integer(1); 
    String messagePayload = null;
    
    public final int numberOfThreads = 500;
    static ArrayList<pubThreadBitMsg> arrThreads = new ArrayList<pubThreadBitMsg>();
    
    boolean f_messageReceived = false;
    boolean f_ackReceived = false;
    boolean f_lost = false;

    /**
     * 
     * 
     * @param client
     * @param location
     * @param accountId
     * @param userId
     * @param clientId
     * @param token
     * @param nameSpace
     * @param message
     * @param qos
     * @param f_retained
     * @param f_keepOpen
     * @return
     * @throws MqttException
     */
    private MqttClient pubNameSpace(MqttClient client,  
                                    String    location, 
                                    String    accountId, 
                                    String    userId, 
                                    String    clientId,
                                    String    token, 
                                    String    nameSpace, 
                                    String    message, 
                                    int       qos, 
                                    boolean   f_retained,  
                                    boolean   f_keepOpen) throws MqttException
    {

        try
        {        
            boolean f_wasConnected = true; 
            
            if (client == null)
            {
                f_wasConnected = false; 
                client = new MqttClient(location, clientId/*, persistence*/);
            }
            
            if (!f_wasConnected)
            {
                MqttConnectOptions options = new MqttConnectOptions();
                options.setKeepAliveInterval(60);
                options.setConnectionTimeout(120);
                options.setPassword(token.toCharArray());
                options.setUserName(accountId+":"+userId);
                client.connect(options);
                client.setCallback(this);
            }
    
            MqttMessage mqttMessage = new MqttMessage();
            mqttMessage.setPayload(message.getBytes());
            mqttMessage.setQos(qos);
            mqttMessage.setRetained(f_retained);
    
            
            client.publish(nameSpace, mqttMessage);
            
            if (!f_keepOpen)
            {
                client.disconnect();
                client.close();
                client = null; 
            }
            return client; 
        }
        catch (MqttPersistenceException e)
        {
            System.err.println("pubNameSpace : YOU CAN'T USE THE SAME CLIENTID MULTIPLE TIMES
IN A PERSISTENT WAY Excpetion  - "+e.getMessage());
            if (client != null)
            {
                client.disconnect(); 
                client.close();
                client = null;
            }
        }
        catch (Exception e)
        {
            e.printStackTrace();
            System.err.println("Exception e = "+e.getMessage());
            if (client != null)
            {
                client.disconnect(); 
                client.close();
                client = null;
            }
        }
        return null;
    }
    /**
     * 
     * @param location
     * @param accountId
     * @param userId
     * @param clientId
     * @param token
     * @param nameSpace
     * @param message
     * @param qos
     * @param f_retained
     * @param f_keepOpen
     * @return
     * @throws MqttException
     */
    private MqttClient pubNameSpace(String    location, 
                                    String    accountId, 
                                    String    userId, 
                                    String    clientId,
                                    String    token, 
                                    String    nameSpace, 
                                    String    message, 
                                    int       qos, 
                                    boolean   f_retained,  
                                    boolean   f_keepOpen) throws MqttException
    {

        try
        {        
            MqttClient client = new MqttClient(location, clientId/*, persistence*/);
            client.setCallback(this);
            MqttConnectOptions options = new MqttConnectOptions();
            options.setKeepAliveInterval(60);
            options.setConnectionTimeout(120);
            options.setPassword(token.toCharArray());
            options.setUserName(accountId+":"+userId);
    
            MqttMessage mqttMessage = new MqttMessage();
            mqttMessage.setPayload(message.getBytes());
            mqttMessage.setQos(qos);
            mqttMessage.setRetained(f_retained);
    
            client.connect(options);
            client.publish(nameSpace, mqttMessage);
            
            if (!f_keepOpen)
            {
                client.disconnect();
                client.close();
            }
            return client; 
        }
        catch (MqttPersistenceException e)
        {
            System.err.println("YOU CAN'T USE THE SAME CLIENTID MULTIPLE TIMES IN A PERSISTENT
WAY : "+e.getMessage());
            assertTrue(false);
        }
        catch (Exception e)
        {
            e.printStackTrace();
            System.err.println("Exceptoin e = "+e.getMessage()); 
            assertTrue(false);
        }
        return null;
    }
   
    
    /**
     * 
     * @param input
     * @param output
     * @return
     * @throws IOException
     */
    public static long copyLarge(InputStream input, OutputStream output) throws IOException

    {
      byte[] buffer = new byte[4096];
      long count = 0L;
      int n = 0;
      while (-1 != (n = input.read(buffer))) {
       output.write(buffer, 0, n);
       count += n;
      }
      return count;
    }
    
    /**
     * 
     * @param p
     * @throws IOException
     */
    public static void outputResults (Process p) throws IOException
    {
        BufferedReader stdInput = new BufferedReader(new
                                         InputStreamReader(p.getInputStream()));

        BufferedReader stdError = new BufferedReader(new
                        InputStreamReader(p.getErrorStream()));

        String s; 
        while ((s = stdInput.readLine()) != null) 
        {
            System.out.println(s);
        }
        while ((s = stdError.readLine()) != null) 
        {
            System.out.println(s);
        }
    }
    
    /**
     * HELPER
     * @param msec
     * @throws InterruptedException
     */
    void pause_til_done_or_time(int msec) throws InterruptedException
    {
        int pauseTime = 100; 
        while (!f_messageReceived && msec > 0 && !f_lost)
        {
            Thread.sleep(pauseTime);
            msec -= pauseTime;
        }
        
    }

    static Integer numberOfMessages = 0; 
    public void clearMessageCount()
    {
        numberOfMessages = 0; 
    }
    public Integer getMessageCount()
    {
        return numberOfMessages; 
    }
    /**
     * 
     * @param msec
     */
    private void waitForItAck(int msec)
    {
        while (!f_ackReceived)
        {
            
            try { Thread.sleep(1000); } catch (Exception e){}
            
            msec= msec-1000;
                            
            if (msec < 0)
            {
                break;
            }
        }
    }
    
    @Override
    public String getProtocolScheme() {
        return "mqtt+nio";
    }

    @Override
    public boolean isUseSSL() {
        return false;
    }

    public class PahoCallback implements MqttCallback {

        @Override
        public void connectionLost(Throwable cause) {
            // TODO Auto-generated method stub
            
        }

        @Override
        public void messageArrived(String topic, MqttMessage message)
                throws Exception {
            m_receiveCounter.incrementAndGet();
        }

        @Override
        public void deliveryComplete(IMqttDeliveryToken token) {
            // TODO Auto-generated method stub
            
        }
        
    }
 
    static   MqttClient BalstTestClient = null;
    String   loc = "tcp://localhost:1883";

    public class pubThreadBitMsg extends Thread {
        
        public pubThreadBitMsg()
        {
            synchronized (staticSyncObj)
            {
                try
                {
                    System.out.println("---- pubTheadBitMsg - constructor"); 
                    if (BalstTestClient == null)
                    {
                        BalstTestClient = pubNameSpace(loc, "cjutzi", 
                                        "curt", 
                                         "myclientid_cjutzi_pub", 
                                         "curts_client_pub_token",
                                         "/accounts/cjutzi/users/curt/test", 
                                         "Starting Client", 1, false, true);
                        System.out.println("---- pubTheadBitMsg - init"); 
                    }
                } 
                catch (MqttException e)
                {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }

        public void run()
        {
//            synchronized (staticSyncObj) 
            {
            try
            {
                System.out.println("---- pubTheadBitMsg - send"); 
                pubNameSpace(BalstTestClient, loc, "cjutzi", 
                              "curt", 
                               "myclientid_cjutzi_pub", 
                               "curts_client_pub_token",
                               "/accounts/cjutzi/users/curt/test", 
                               BigMessage, 1, false, true);
                } catch (MqttException e)
                {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }
    }
    
    @Test
    public void test_AckOnOldListenerQos1Blast100B10KBlocks() throws MqttException, InterruptedException
    {
        MqttClient subClient = new MqttClient("tcp://localhost:1883",
                "niosubclient",
                new MemoryPersistence());
        MqttConnectOptions cOpts = new MqttConnectOptions();
        cOpts.setCleanSession(true);
        cOpts.setUserName("system:system");
        cOpts.setPassword("system_token".toCharArray());
        
       
        subClient.setCallback(new PahoCallback());
        subClient.connect(cOpts);
        subClient.subscribe("nio/test");
       
        
        for (int i = 0; i < numberOfThreads; i++) 
        {
            arrThreads.add(new pubThreadBitMsg());
        }
        System.out.println("--started"); 
        for (int i = 0; i < numberOfThreads; i++) 
        {
            arrThreads.get(i).start(); 
        }
        System.out.println("-- waiting"); 
        Thread.sleep(10000);
        assertTrue(numberOfThreads == m_receiveCounter.get());
    }
    
    
    /**
     * 
     */
    private void resetFlag()
    {
        f_messageReceived = false; 
        f_ackReceived = false; 
    }
    /***************************************************************/
    /**              CALL BACKS FOR MQTT                           */
    /***************************************************************/
   
    

    
    /**
     * 
     */
      public void connectionLost(Throwable arg0)
      {
          System.out.println("MQTT - Connection Lost");
//          f_terminate = true;
          f_lost = true;
      }

      /**
       * 
       */
      public void deliveryComplete(IMqttDeliveryToken arg0)
      {
          System.out.println("MQTT - delivery complete: Delivery Tokeh = "+arg0.isComplete());
          f_ackReceived = true;
      }
      
      /**
       * 
       */
      public void messageArrived(String arg0, MqttMessage arg1) throws Exception
      {
          synchronized (numberOfMessages)
          {
              numberOfMessages++;   
              System.out.println("MQTT - messageArrived "+arg0+"\nMessage: \t["+arg1+"] QoS:
["+arg1.getQos()+"] isDup ["+arg1.isDuplicate()+"] nameSpace = ");
//            System.out.print(arg1.isDuplicate()?"*":".");
              byte[] payloadBytes = arg1.getPayload(); 
              if (payloadBytes.length >0 )
              {
                  messagePayload = new String(payloadBytes); 
              }
              System.out.println("Message Recieved..."); 
              f_messageReceived = true;
          } 
      }
      
}


{noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message