activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r800235 - in /activemq/trunk/activemq-core: ./ src/main/java/org/apache/activemq/ src/main/java/org/apache/activemq/blob/ src/main/java/org/apache/activemq/command/ src/test/java/org/apache/activemq/blob/
Date Mon, 03 Aug 2009 07:42:44 GMT
Author: rajdavies
Date: Mon Aug  3 07:42:43 2009
New Revision: 800235

URL: http://svn.apache.org/viewvc?rev=800235&view=rev
Log:
Added patch for https://issues.apache.org/activemq/browse/AMQ-1744

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobDownloadStrategy.java
  (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobDownloader.java
  (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/DefaultBlobDownloadStrategy.java
  (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/FTPBlobDownloadStrategy.java
  (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/FTPBlobUploadStrategy.java
  (with props)
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobDownloadStrategyTest.java
  (with props)
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobTest.java 
 (with props)
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobUploadStrategyTest.java
  (with props)
Modified:
    activemq/trunk/activemq-core/pom.xml
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageTransformation.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobTransferPolicy.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQBlobMessage.java

Modified: activemq/trunk/activemq-core/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/pom.xml?rev=800235&r1=800234&r2=800235&view=diff
==============================================================================
--- activemq/trunk/activemq-core/pom.xml (original)
+++ activemq/trunk/activemq-core/pom.xml Mon Aug  3 07:42:43 2009
@@ -140,6 +140,11 @@
       <artifactId>xalan</artifactId>
       <optional>true</optional>
     </dependency>
+    <dependency>
+    <groupId>commons-net</groupId>
+    <artifactId>commons-net</artifactId>
+    
+</dependency>
 
 
     <!-- not really a dependency at all - just added optionally to get the generator working
-->
@@ -460,6 +465,10 @@
             <!-- https://issues.apache.org/activemq/browse/AMQ-2050 -->
             <exclude>**/ProxyConnectorTest.*</exclude>
             
+            <!-- FTPBlob tests need FTP server running -->
+            
+            <exclude>**/FTPBlob*/</exclude>
+            
           </excludes>
         </configuration>
       </plugin>

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=800235&r1=800234&r2=800235&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Mon Aug  3 07:42:43 2009
@@ -16,29 +16,12 @@
  */
 package org.apache.activemq;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import javax.jms.IllegalStateException;
-import javax.jms.InvalidDestinationException;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-
+import org.apache.activemq.blob.BlobDownloader;
+import org.apache.activemq.command.ActiveMQBlobMessage;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQTempDestination;
+import org.apache.activemq.command.CommandTypes;
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.MessageAck;
@@ -57,6 +40,24 @@
 import org.apache.activemq.util.JMSExceptionSupport;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.jms.IllegalStateException;
+import javax.jms.InvalidDestinationException;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
 
 /**
  * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
@@ -485,6 +486,9 @@
      */
     private ActiveMQMessage createActiveMQMessage(final MessageDispatch md) throws JMSException
{
         ActiveMQMessage m = (ActiveMQMessage)md.getMessage().copy();
+        if (m.getDataStructureType()==CommandTypes.ACTIVEMQ_BLOB_MESSAGE) {
+        	((ActiveMQBlobMessage)m).setBlobDownloader(new BlobDownloader(session.getBlobTransferPolicy()));
+        }
         if (transformer != null) {
             Message transformedMessage = transformer.consumerTransform(session, this, m);
             if (transformedMessage != null) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageTransformation.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageTransformation.java?rev=800235&r1=800234&r2=800235&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageTransformation.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageTransformation.java
Mon Aug  3 07:42:43 2009
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq;
 
+import java.net.MalformedURLException;
 import java.util.Enumeration;
 
 import javax.jms.BytesMessage;
@@ -32,6 +33,9 @@
 import javax.jms.TextMessage;
 import javax.jms.Topic;
 
+import org.apache.activemq.blob.BlobDownloader;
+import org.apache.activemq.blob.BlobUploader;
+import org.apache.activemq.command.ActiveMQBlobMessage;
 import org.apache.activemq.command.ActiveMQBytesMessage;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMapMessage;
@@ -165,6 +169,17 @@
                 msg.setConnection(connection);
                 msg.setText(textMsg.getText());
                 activeMessage = msg;
+            } else if (message instanceof BlobMessage) {
+            	BlobMessage blobMessage = (BlobMessage)message;
+            	ActiveMQBlobMessage msg = new ActiveMQBlobMessage();
+            	msg.setConnection(connection);
+            	msg.setBlobDownloader(new BlobDownloader(connection.getBlobTransferPolicy()));
+            	try {
+					msg.setURL(blobMessage.getURL());
+				} catch (MalformedURLException e) {
+					
+				}
+            	activeMessage = msg;
             } else {
                 activeMessage = new ActiveMQMessage();
                 activeMessage.setConnection(connection);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?rev=800235&r1=800234&r2=800235&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java Mon
Aug  3 07:42:43 2009
@@ -42,6 +42,7 @@
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.activemq.blob.BlobDownloader;
 
 /**
  * <P>
@@ -410,6 +411,7 @@
         configureMessage(message);
         message.setURL(url);
         message.setDeletedByBroker(deletedByBroker);
+        message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy()));
         return message;
     }
 
@@ -430,6 +432,7 @@
         ActiveMQBlobMessage message = new ActiveMQBlobMessage();
         configureMessage(message);
         message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), file));
+        message.setBlobDownloader(new BlobDownloader((getBlobTransferPolicy())));
         message.setDeletedByBroker(true);
         message.setName(file.getName());
         return message;
@@ -452,6 +455,7 @@
         ActiveMQBlobMessage message = new ActiveMQBlobMessage();
         configureMessage(message);
         message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), in));
+        message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy()));
         message.setDeletedByBroker(true);
         return message;
     }

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobDownloadStrategy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobDownloadStrategy.java?rev=800235&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobDownloadStrategy.java
(added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobDownloadStrategy.java
Mon Aug  3 07:42:43 2009
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.blob;
+
+import java.io.IOException;
+import java.io.InputStream;
+import javax.jms.JMSException;
+import org.apache.activemq.command.ActiveMQBlobMessage;
+
+/**
+ * Represents a strategy of downloading a file/stream from some remote
+ */
+public interface BlobDownloadStrategy {
+    
+    InputStream getInputStream(ActiveMQBlobMessage message) throws IOException, JMSException;
+
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobDownloadStrategy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobDownloadStrategy.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobDownloader.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobDownloader.java?rev=800235&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobDownloader.java
(added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobDownloader.java
Mon Aug  3 07:42:43 2009
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.blob;
+
+import java.io.IOException;
+import java.io.InputStream;
+import javax.jms.JMSException;
+import org.apache.activemq.command.ActiveMQBlobMessage;
+
+
+/**
+ * Mediator for Blob Download
+ */
+public class BlobDownloader {
+
+    private BlobTransferPolicy blobTransferPolicy;
+    
+    public BlobDownloader(BlobTransferPolicy transferPolicy) {
+        this.blobTransferPolicy = transferPolicy;
+    }
+    
+    public InputStream getInputStream(ActiveMQBlobMessage message) throws IOException, JMSException
{
+        return getStrategy().getInputStream(message);
+    }
+    
+    public BlobTransferPolicy getBlobTransferPolicy() {
+        return blobTransferPolicy;
+    }
+    
+    public BlobDownloadStrategy getStrategy() {
+        return getBlobTransferPolicy().getDownloadStrategy();
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobDownloader.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobDownloader.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobTransferPolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobTransferPolicy.java?rev=800235&r1=800234&r2=800235&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobTransferPolicy.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobTransferPolicy.java
Mon Aug  3 07:42:43 2009
@@ -16,6 +16,9 @@
  */
 package org.apache.activemq.blob;
 
+import java.net.MalformedURLException;
+import java.net.URL;
+
 /**
  * The policy for configuring how BLOBs (Binary Large OBjects) are transferred
  * out of band between producers, brokers and consumers.
@@ -28,6 +31,7 @@
     private String uploadUrl;
     private int bufferSize = 128 * 1024;
     private BlobUploadStrategy uploadStrategy;
+    private BlobDownloadStrategy downloadStrategy;
 
     /**
      * Returns a copy of this policy object
@@ -90,6 +94,13 @@
         return uploadStrategy;
     }
 
+    public BlobDownloadStrategy getDownloadStrategy() {
+        if(downloadStrategy == null) {
+            downloadStrategy = createDownloadStrategy();
+        }
+        return downloadStrategy;
+    }
+
     /**
      * Sets the upload strategy to use for uploading BLOBs to some URL
      */
@@ -108,7 +119,49 @@
         this.bufferSize = bufferSize;
     }
 
+    /**
+     * Returns the upload strategy depending on the information from the
+     * uploadURL. Currently supportet HTTP and FTP
+     * 
+     * @return
+     */
     protected BlobUploadStrategy createUploadStrategy() {
-        return new DefaultBlobUploadStrategy(this);
+    	BlobUploadStrategy strategy;
+    	try {
+            URL url = new URL(getUploadUrl());
+
+            if(url.getProtocol().equalsIgnoreCase("FTP")) {
+                strategy = new FTPBlobUploadStrategy(this);
+            } else {
+                strategy = new DefaultBlobUploadStrategy(this);
     }
+        } catch (MalformedURLException e) {
+                strategy = new DefaultBlobUploadStrategy(this);
+}
+        return strategy;
+    }
+    
+    /**
+     * Returns the download strategy depending on the information from the
+     * uploadURL. Currently supportet HTTP and FTP
+     * 
+     * @return
+     */
+    protected BlobDownloadStrategy createDownloadStrategy() {
+        BlobDownloadStrategy strategy;
+        try {
+            URL url = new URL(getUploadUrl());
+            
+            if(url.getProtocol().equalsIgnoreCase("FTP")) {
+                strategy = new FTPBlobDownloadStrategy();
+            } else {
+                strategy = new DefaultBlobDownloadStrategy();
+            }
+        } catch (MalformedURLException e) {
+            strategy = new DefaultBlobDownloadStrategy();
+        }
+        return strategy;
+    }
+
+    
 }

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/DefaultBlobDownloadStrategy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/DefaultBlobDownloadStrategy.java?rev=800235&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/DefaultBlobDownloadStrategy.java
(added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/DefaultBlobDownloadStrategy.java
Mon Aug  3 07:42:43 2009
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.blob;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import javax.jms.JMSException;
+import org.apache.activemq.command.ActiveMQBlobMessage;
+
+/**
+ * A default implementation of {@link BlobDownloadStrategy} which uses the URL
+ * class to download files or streams from a remote URL
+ */
+public class DefaultBlobDownloadStrategy implements BlobDownloadStrategy{
+
+    public InputStream getInputStream(ActiveMQBlobMessage message) throws IOException, JMSException
{
+        URL value = message.getURL();
+        if (value == null) {
+            return null;
+        }
+        return value.openStream();
+    }
+
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/DefaultBlobDownloadStrategy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/DefaultBlobDownloadStrategy.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/FTPBlobDownloadStrategy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/FTPBlobDownloadStrategy.java?rev=800235&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/FTPBlobDownloadStrategy.java
(added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/FTPBlobDownloadStrategy.java
Mon Aug  3 07:42:43 2009
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.blob;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.ConnectException;
+import java.net.URL;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.command.ActiveMQBlobMessage;
+import org.apache.commons.net.ftp.FTPClient;
+
+/**
+ * A FTP implementation for {@link BlobDownloadStrategy}.
+ */
+public class FTPBlobDownloadStrategy implements BlobDownloadStrategy {
+    private String ftpUser;
+    private String ftpPass;
+
+    public InputStream getInputStream(ActiveMQBlobMessage message) throws IOException, JMSException
{
+        URL url = message.getURL();
+        
+        setUserInformation(url.getUserInfo());
+        String connectUrl = url.getHost();
+        int port = url.getPort() < 1 ? 21 : url.getPort();
+
+        FTPClient ftp = new FTPClient();
+        try {
+        	ftp.connect(connectUrl, port);
+        } catch(ConnectException e) {
+        	throw new JMSException("Problem connecting the FTP-server");
+        }
+        
+        if(!ftp.login(ftpUser, ftpPass)) {
+        	ftp.quit();
+            ftp.disconnect();
+            throw new JMSException("Cant Authentificate to FTP-Server");
+        }
+        String path = url.getPath();
+        String workingDir = path.substring(0, path.lastIndexOf("/"));
+        String file = path.substring(path.lastIndexOf("/")+1);
+        
+        ftp.changeWorkingDirectory(workingDir);
+        ftp.setFileType(FTPClient.BINARY_FILE_TYPE);
+        InputStream input = ftp.retrieveFileStream(file);
+        ftp.quit();
+        ftp.disconnect();
+        
+        return input;
+    }
+    
+    private void setUserInformation(String userInfo) {
+        if(userInfo != null) {
+            String[] userPass = userInfo.split(":");
+            if(userPass.length > 0) this.ftpUser = userPass[0];
+            if(userPass.length > 1) this.ftpPass = userPass[1];
+        } else {
+            this.ftpUser = "anonymous";
+            this.ftpPass = "anonymous";
+        }
+    }
+
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/FTPBlobDownloadStrategy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/FTPBlobDownloadStrategy.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/FTPBlobUploadStrategy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/FTPBlobUploadStrategy.java?rev=800235&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/FTPBlobUploadStrategy.java
(added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/FTPBlobUploadStrategy.java
Mon Aug  3 07:42:43 2009
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.blob;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.ConnectException;
+import java.net.MalformedURLException;
+import java.net.URL;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.command.ActiveMQBlobMessage;
+import org.apache.commons.net.ftp.FTPClient;
+
+/**
+ * A FTP implementation of {@link BlobUploadStrategy}.
+ */
+public class FTPBlobUploadStrategy implements BlobUploadStrategy {
+	
+	private URL url;
+	private String ftpUser = "";
+	private String ftpPass = "";
+	private BlobTransferPolicy transferPolicy;
+	
+	public FTPBlobUploadStrategy(BlobTransferPolicy transferPolicy) throws MalformedURLException
{
+		this.transferPolicy = transferPolicy;
+		this.url = new URL(this.transferPolicy.getUploadUrl());
+		
+		setUserInformation(url.getUserInfo());
+	}
+
+	public URL uploadFile(ActiveMQBlobMessage message, File file)
+			throws JMSException, IOException {
+		return uploadStream(message, new FileInputStream(file));
+	}
+
+	public URL uploadStream(ActiveMQBlobMessage message, InputStream in)
+			throws JMSException, IOException {
+		String connectUrl = url.getHost();
+		int port = url.getPort() < 1 ? 21 : url.getPort();
+		
+		FTPClient ftp = new FTPClient();
+		try {
+        	ftp.connect(connectUrl, port);
+        } catch(ConnectException e) {
+        	throw new JMSException("Problem connecting the FTP-server");
+        }
+		if(!ftp.login(ftpUser, ftpPass)) {
+			ftp.quit();
+			ftp.disconnect();
+			throw new JMSException("Cant Authentificate to FTP-Server");
+		}
+		String path = url.getPath();
+        String workingDir = path.substring(0, path.lastIndexOf("/"));
+		String filename = message.getMessageId().toString().replaceAll(":", "_");
+        ftp.setFileType(FTPClient.BINARY_FILE_TYPE);
+        
+        String url;
+        if(!ftp.changeWorkingDirectory(workingDir)) {
+        	url = this.url.toString().replaceFirst(this.url.getPath(), "")+"/";
+        } else {
+        	url = this.url.toString();
+        }
+        
+		ftp.storeFile(filename, in);
+		ftp.quit();
+		ftp.disconnect();
+		
+		return new URL(url + filename);
+	}
+	
+	private void setUserInformation(String userInfo) {
+		if(userInfo != null) {
+			String[] userPass = userInfo.split(":");
+			if(userPass.length > 0) this.ftpUser = userPass[0];
+			if(userPass.length > 1) this.ftpPass = userPass[1];
+		} else {
+			this.ftpUser = "anonymous";
+			this.ftpPass = "anonymous";
+		}
+	}
+
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/FTPBlobUploadStrategy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/FTPBlobUploadStrategy.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQBlobMessage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQBlobMessage.java?rev=800235&r1=800234&r2=800235&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQBlobMessage.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQBlobMessage.java
Mon Aug  3 07:42:43 2009
@@ -24,6 +24,7 @@
 import javax.jms.JMSException;
 
 import org.apache.activemq.BlobMessage;
+import org.apache.activemq.blob.BlobDownloader;
 import org.apache.activemq.blob.BlobUploader;
 import org.apache.activemq.util.JMSExceptionSupport;
 
@@ -44,6 +45,7 @@
     private boolean deletedByBroker;
 
     private transient BlobUploader blobUploader;
+    private transient BlobDownloader blobDownloader;
     private transient URL url;
 
     public Message copy() {
@@ -123,11 +125,10 @@
     }
 
     public InputStream getInputStream() throws IOException, JMSException {
-        URL value = getURL();
-        if (value == null) {
+        if(blobDownloader == null) {
             return null;
         }
-        return value.openStream();
+        return blobDownloader.getInputStream(this);
     }
 
     public URL getURL() throws JMSException {
@@ -154,6 +155,14 @@
         this.blobUploader = blobUploader;
     }
 
+    public BlobDownloader getBlobDownloader() {
+        return blobDownloader;
+    }
+
+    public void setBlobDownloader(BlobDownloader blobDownloader) {
+        this.blobDownloader = blobDownloader;
+    }
+
     public void onSend() throws JMSException {
         super.onSend();
 

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobDownloadStrategyTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobDownloadStrategyTest.java?rev=800235&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobDownloadStrategyTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobDownloadStrategyTest.java
Mon Aug  3 07:42:43 2009
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.blob;
+
+import java.io.InputStream;
+import java.net.URL;
+
+import javax.jms.JMSException;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.activemq.command.ActiveMQBlobMessage;
+
+/**
+ * To start this test make sure an ftp server is running with
+ * user: activemq and password: activemq. 
+ * Also a file called test.txt with the content <b>hello world</b> must be in
the ftptest directory.
+ */
+public class FTPBlobDownloadStrategyTest extends TestCase {
+	
+	public void xtestDownload() {
+		ActiveMQBlobMessage message = new ActiveMQBlobMessage();
+		BlobDownloadStrategy strategy = new FTPBlobDownloadStrategy();
+		InputStream stream;
+		try {
+			message.setURL(new URL("ftp://activemq:activemq@localhost/ftptest/test.txt"));
+			stream = strategy.getInputStream(message);
+			int i = stream.read();
+			StringBuilder sb = new StringBuilder(10);
+			while(i != -1) {
+				sb.append((char)i);
+				i = stream.read();
+			}
+			Assert.assertEquals("hello world", sb.toString());
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.assertTrue(false);
+		}
+	}
+	
+	public void xtestWrongAuthentification() {
+		ActiveMQBlobMessage message = new ActiveMQBlobMessage();
+		BlobDownloadStrategy strategy = new FTPBlobDownloadStrategy();
+		try {
+			message.setURL(new URL("ftp://activemq:activemq_wrong@localhost/ftptest/test.txt"));
+			strategy.getInputStream(message);
+		} catch(JMSException e) {
+			Assert.assertEquals("Wrong Exception", "Cant Authentificate to FTP-Server", e.getMessage());
+			return;
+		} catch(Exception e) {
+			System.out.println(e);
+			Assert.assertTrue("Wrong Exception "+ e, false);
+			return;
+		}
+		
+		Assert.assertTrue("Expect Exception", false);
+	}
+	
+	public void xtestWrongFTPPort() {
+		ActiveMQBlobMessage message = new ActiveMQBlobMessage();
+		BlobDownloadStrategy strategy = new FTPBlobDownloadStrategy();
+		try {
+			message.setURL(new URL("ftp://activemq:activemq@localhost:442/ftptest/test.txt"));
+			strategy.getInputStream(message);
+		} catch(JMSException e) {
+			Assert.assertEquals("Wrong Exception", "Problem connecting the FTP-server", e.getMessage());
+			return;
+		} catch(Exception e) {
+			e.printStackTrace();
+			Assert.assertTrue("Wrong Exception "+ e, false);
+			return;
+		}
+		
+		Assert.assertTrue("Expect Exception", false);
+	}
+
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobDownloadStrategyTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobDownloadStrategyTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobTest.java?rev=800235&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobTest.java Mon
Aug  3 07:42:43 2009
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.blob;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.InputStream;
+
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import junit.framework.Assert;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.BlobMessage;
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.command.ActiveMQBlobMessage;
+
+/**
+ * To start this test make sure an ftp server is running with
+ * user: activemq and password: activemq
+ */
+public class FTPBlobTest extends EmbeddedBrokerTestSupport {
+	
+	private ActiveMQConnection connection;
+
+	protected void setUp() throws Exception {
+		bindAddress = "vm://localhost?jms.blobTransferPolicy.defaultUploadUrl=ftp://activemq:activemq@localhost/ftptest/";
+        super.setUp();
+
+        connection = (ActiveMQConnection) createConnection();
+        connection.start();
+	}
+	
+	public void testBlobFile() throws Exception {
+		// first create Message
+		File file = File.createTempFile("amq-data-file-", ".dat");
+        // lets write some data
+		String content = "hello world "+ System.currentTimeMillis();
+        BufferedWriter writer = new BufferedWriter(new FileWriter(file));
+        writer.append(content);
+        writer.close();
+        
+        ActiveMQSession session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(destination);
+        MessageConsumer consumer = session.createConsumer(destination);
+        BlobMessage message = session.createBlobMessage(file);
+        
+        producer.send(message);
+        Thread.sleep(1000);
+
+        // check message send
+        Message msg = consumer.receive(1000);
+        Assert.assertTrue(msg instanceof ActiveMQBlobMessage);
+
+        InputStream input = ((ActiveMQBlobMessage) msg).getInputStream();
+        StringBuilder b = new StringBuilder();
+        int i = input.read();
+        while(i != -1) {
+        	b.append((char) i);
+        	i = input.read();
+        }
+        Assert.assertEquals(content, b.toString());
+	}
+
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobUploadStrategyTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobUploadStrategyTest.java?rev=800235&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobUploadStrategyTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobUploadStrategyTest.java
Mon Aug  3 07:42:43 2009
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.blob;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.net.URL;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Session;
+
+import junit.framework.Assert;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.command.ActiveMQBlobMessage;
+import org.apache.activemq.command.MessageId;
+import org.apache.commons.net.ftp.FTPClient;
+
+/**
+ * To start this test make sure an ftp server is running with
+ * user: activemq and password: activemq
+ */
+public class FTPBlobUploadStrategyTest extends EmbeddedBrokerTestSupport {
+	
+	private Connection connection;
+
+	protected void setUp() throws Exception {
+		bindAddress = "vm://localhost?jms.blobTransferPolicy.defaultUploadUrl=ftp://activemq:activemq@localhost/ftptest/";
+        super.setUp();
+
+        connection = createConnection();
+        connection.start();
+        
+        // check if file exist and delete it
+        URL url = new URL("ftp://activemq:activemq@localhost/ftptest/");
+        String connectUrl = url.getHost();
+		int port = url.getPort() < 1 ? 21 : url.getPort();
+		
+		FTPClient ftp = new FTPClient();
+		ftp.connect(connectUrl, port);
+		if(!ftp.login("activemq", "activemq")) {
+			ftp.quit();
+			ftp.disconnect();
+			throw new JMSException("Cant Authentificate to FTP-Server");
+		}
+		ftp.changeWorkingDirectory("ftptest");
+		ftp.deleteFile("testmessage");
+		ftp.quit();
+		ftp.disconnect();
+    }
+	
+	public void testFileUpload() throws Exception {
+		File file = File.createTempFile("amq-data-file-", ".dat");
+        // lets write some data
+        BufferedWriter writer = new BufferedWriter(new FileWriter(file));
+        writer.append("hello world");
+        writer.close();
+        
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        ((ActiveMQConnection)connection).setCopyMessageOnSend(false);
+        
+        ActiveMQBlobMessage message = (ActiveMQBlobMessage) ((ActiveMQSession)session).createBlobMessage(file);
+        message.setMessageId(new MessageId("testmessage"));
+        message.onSend();
+        Assert.assertEquals("ftp://activemq:activemq@localhost/ftptest/testmessage", message.getURL().toString());

+	}
+
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobUploadStrategyTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobUploadStrategyTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain



Mime
View raw message