Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 96144 invoked from network); 23 Apr 2007 19:17:54 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 23 Apr 2007 19:17:54 -0000 Received: (qmail 71871 invoked by uid 500); 23 Apr 2007 19:18:01 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 71825 invoked by uid 500); 23 Apr 2007 19:18:01 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 71812 invoked by uid 99); 23 Apr 2007 19:18:01 -0000 Received: from herse.apache.org (HELO herse.apache.org) (140.211.11.133) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 23 Apr 2007 12:18:01 -0700 X-ASF-Spam-Status: No, hits=-99.5 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 23 Apr 2007 12:17:53 -0700 Received: by eris.apache.org (Postfix, from userid 65534) id 5B5681A9838; Mon, 23 Apr 2007 12:17:33 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r531566 - in /activemq/camel/trunk/camel-file: ./ src/ src/main/ src/main/java/ src/main/java/org/ src/main/java/org/apache/ src/main/java/org/apache/camel/ src/main/java/org/apache/camel/component/ src/main/java/org/apache/camel/component/... Date: Mon, 23 Apr 2007 19:17:33 -0000 To: commits@activemq.apache.org From: rajdavies@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20070423191733.5B5681A9838@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: rajdavies Date: Mon Apr 23 12:17:31 2007 New Revision: 531566 URL: http://svn.apache.org/viewvc?view=rev&rev=531566 Log: An initial cut of camel-file Added: activemq/camel/trunk/camel-file/ activemq/camel/trunk/camel-file/pom.xml (with props) activemq/camel/trunk/camel-file/src/ activemq/camel/trunk/camel-file/src/main/ activemq/camel/trunk/camel-file/src/main/java/ activemq/camel/trunk/camel-file/src/main/java/org/ activemq/camel/trunk/camel-file/src/main/java/org/apache/ activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/ activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/ activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/ activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileComponent.java (with props) activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java (with props) activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileEndpoint.java (with props) activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileExchange.java (with props) activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileMessage.java (with props) activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileProducer.java (with props) activemq/camel/trunk/camel-file/src/test/ activemq/camel/trunk/camel-file/src/test/java/ activemq/camel/trunk/camel-file/target/ Added: activemq/camel/trunk/camel-file/pom.xml URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-file/pom.xml?view=auto&rev=531566 ============================================================================== --- activemq/camel/trunk/camel-file/pom.xml (added) +++ activemq/camel/trunk/camel-file/pom.xml Mon Apr 23 12:17:31 2007 @@ -0,0 +1,94 @@ + + + + + + + 4.0.0 + + + org.apache.camel + camel-parent + 1.0-SNAPSHOT + + + camel-file + Camel :: File + A file reading/writing component + + + + + org.apache.camel + camel-core + + + + commons-logging + commons-logging + false + + + + junit + junit + test + + + + + + + + + + + maven-jar-plugin + + + + test-jar + + + + + + + + + Propchange: activemq/camel/trunk/camel-file/pom.xml ------------------------------------------------------------------------------ svn:executable = * Added: activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileComponent.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileComponent.java?view=auto&rev=531566 ============================================================================== --- activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileComponent.java (added) +++ activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileComponent.java Mon Apr 23 12:17:31 2007 @@ -0,0 +1,45 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file; + +import java.io.File; +import java.util.Map; +import org.apache.camel.CamelContext; +import org.apache.camel.Endpoint; +import org.apache.camel.impl.DefaultComponent; +import org.apache.camel.util.IntrospectionSupport; + +/** + * @version $Revision: 523772 $ + */ +public class FileComponent extends DefaultComponent { + public FileComponent() { + } + + public FileComponent(CamelContext context) { + super(context); + } + + protected Endpoint createEndpoint(String uri, String remaining, Map parameters) throws Exception { + File file = new File(remaining); + FileEndpoint result = new FileEndpoint(file, remaining, this); + IntrospectionSupport.setProperties(result, parameters); + return result; + } + +} Propchange: activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileComponent.java ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java?view=auto&rev=531566 ============================================================================== --- activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java (added) +++ activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java Mon Apr 23 12:17:31 2007 @@ -0,0 +1,172 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file; + +import java.io.BufferedInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.io.RandomAccessFile; +import java.net.SocketAddress; +import java.nio.channels.FileChannel; +import java.util.List; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.locks.Lock; +import javax.management.Query; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.impl.PollingConsumer; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * @version $Revision: 523016 $ + */ +public class FileConsumer extends PollingConsumer { + private static final transient Log log = LogFactory.getLog(FileConsumer.class); + + private final FileEndpoint endpoint; + private boolean recursive=true; + private boolean attemptFileLock=false; + private String regexPattern = ""; + private long lastPollTime = 0l; + + + + public FileConsumer(final FileEndpoint endpoint, Processor processor,ScheduledExecutorService executor) { + super(endpoint, processor,executor); + this.endpoint = endpoint; + + + } + protected void poll() throws Exception { + pollFileOrDirectory(endpoint.getFile(),isRecursive()); + lastPollTime=System.currentTimeMillis(); + } + + + protected void pollFileOrDirectory(File fileOrDirectory, boolean processDir) { + if (!fileOrDirectory.isDirectory()) { + pollFile(fileOrDirectory); // process the file + } + else if (processDir) { + log.debug("Polling directory " + fileOrDirectory); + File[] files = fileOrDirectory.listFiles(); + for (int i = 0; i < files.length; i++) { + pollFileOrDirectory(files[i], isRecursive()); // self-recursion + } + } + else { + log.debug("Skipping directory " + fileOrDirectory); + } + } + + protected void pollFile(final File file) { + if (file.exists() && file.lastModified() > lastPollTime) { + if (isValidFile(file)) { + processFile(file); + } + } + } + + + + protected void processFile(File file) { + getProcessor().process(endpoint.createExchange(file)); + } + + + + protected boolean isValidFile(File file){ + boolean result=false; + if(file!=null&&file.exists()){ + if (isMatched(file)) { + if(isAttemptFileLock()){ + FileChannel fc=null; + try{ + fc=new RandomAccessFile(file,"rw").getChannel(); + fc.lock(); + result=true; + }catch(Throwable e){ + }finally{ + if(fc!=null){ + try{ + fc.close(); + }catch(IOException e){ + } + } + } + } + } + } + return result; + } + + protected boolean isMatched(File file) { + boolean result = true; + if ( regexPattern != null && regexPattern.length() > 0 ) { + result = file.getName().matches(getRegexPattern()); + } + return result; + } + + /** + * @return the recursive + */ + public boolean isRecursive(){ + return this.recursive; + } + + /** + * @param recursive the recursive to set + */ + public void setRecursive(boolean recursive){ + this.recursive=recursive; + } + + /** + * @return the attemptFileLock + */ + public boolean isAttemptFileLock(){ + return this.attemptFileLock; + } + + /** + * @param attemptFileLock the attemptFileLock to set + */ + public void setAttemptFileLock(boolean checkAppending){ + this.attemptFileLock=checkAppending; + } + + /** + * @return the regexPattern + */ + public String getRegexPattern(){ + return this.regexPattern; + } + + /** + * @param regexPattern the regexPattern to set + */ + public void setRegexPattern(String regexPattern){ + this.regexPattern=regexPattern; + } + +} Propchange: activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileEndpoint.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileEndpoint.java?view=auto&rev=531566 ============================================================================== --- activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileEndpoint.java (added) +++ activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileEndpoint.java Mon Apr 23 12:17:31 2007 @@ -0,0 +1,105 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file; + +import java.io.File; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import org.apache.camel.Component; +import org.apache.camel.Consumer; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.impl.DefaultEndpoint; + +/** + * @version $Revision: 523016 $ + */ +public class FileEndpoint extends DefaultEndpoint { + private File file; + protected FileEndpoint(File file,String endpointUri, Component component){ + super(endpointUri,component); + this.file = file; + } + + + private ScheduledExecutorService executor; + + /** + * @param file + * @return a Consumer + * @throws Exception + * @see org.apache.camel.Endpoint#createConsumer(org.apache.camel.Processor) + */ + public Consumer createConsumer(Processor file) throws Exception{ + return new FileConsumer(this, file, executor); + } + + /** + * @param file + * @return a FileExchange + * @see org.apache.camel.Endpoint#createExchange() + */ + public FileExchange createExchange(File file){ + return new FileExchange(getContext(),file); + } + + /** + * @return an Exchange + * @see org.apache.camel.Endpoint#createExchange() + */ + public FileExchange createExchange(){ + return createExchange(this.file); + } + + + /** + * @return a Producer + * @throws Exception + * @see org.apache.camel.Endpoint#createProducer() + */ + public Producer createProducer() throws Exception{ + return new FileProducer(this); + } + + + /** + * @return the executor + */ + public synchronized ScheduledExecutorService getExecutor(){ + if (this.executor==null) { + this.executor=new ScheduledThreadPoolExecutor(10); + } + return executor; + } + + + /** + * @param executor the executor to set + */ + public synchronized void setExecutor(ScheduledExecutorService executor){ + this.executor=executor; + } + + public File getFile() { + return file; + } + + + + +} Propchange: activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileEndpoint.java ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileExchange.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileExchange.java?view=auto&rev=531566 ============================================================================== --- activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileExchange.java (added) +++ activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileExchange.java Mon Apr 23 12:17:31 2007 @@ -0,0 +1,57 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file; + +import java.io.File; +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.impl.DefaultExchange; + +/** + * A {@link Exchange} for MINA + * + * @version $Revision: 520985 $ + */ +public class FileExchange extends DefaultExchange { + + private File file; + /** + * Constructor + * @param camelContext + * @param file + */ + public FileExchange(CamelContext camelContext, File file) { + super(camelContext); + setIn(new FileMessage(file)); + this.file = file; + } + + /** + * @return the file + */ + public File getFile(){ + return this.file; + } + + /** + * @param file the file to set + */ + public void setFile(File file){ + this.file=file; + } +} Propchange: activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileExchange.java ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileMessage.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileMessage.java?view=auto&rev=531566 ============================================================================== --- activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileMessage.java (added) +++ activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileMessage.java Mon Apr 23 12:17:31 2007 @@ -0,0 +1,66 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file; + +import java.io.File; +import org.apache.camel.Exchange; +import org.apache.camel.impl.DefaultMessage; + +/** + * A {@link Exchange} for MINA + * + * @version $Revision: 520985 $ + */ +public class FileMessage extends DefaultMessage{ + private File file; + + public FileMessage() { + this(new File(".")); + } + + public FileMessage(File file) { + this.file=file; + } + + @Override + public String toString() { + return "FileMessage: " + file; + } + + @Override + public FileExchange getExchange() { + return (FileExchange) super.getExchange(); + } + + + public File getFile() { + return file; + } + + public void setFile(File file) { + this.file=file; + } + + + + @Override + public FileMessage newInstance() { + return new FileMessage(); + } + +} Propchange: activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileMessage.java ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileProducer.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileProducer.java?view=auto&rev=531566 ============================================================================== --- activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileProducer.java (added) +++ activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileProducer.java Mon Apr 23 12:17:31 2007 @@ -0,0 +1,61 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package org.apache.camel.component.file; + +import java.io.File; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import org.apache.camel.Producer; +import org.apache.camel.impl.DefaultProducer; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * A {@link Producer} implementation for MINA + * + * @version $Revision: 523016 $ + */ +public class FileProducer extends DefaultProducer{ + private static final transient Log log = LogFactory.getLog(FileProducer.class); + + private final FileEndpoint endpoint; + public FileProducer(FileEndpoint endpoint){ + super(endpoint); + this.endpoint = endpoint; + } + + /** + * @param arg0 + * @see org.apache.camel.Processor#process(java.lang.Object) + */ + public void process(FileExchange exchange){ + ByteBuffer payload = exchange.getIn().getBody(ByteBuffer.class); + File file = null; + if (endpoint.getFile() != null && endpoint.getFile().isDirectory()) { + file = new File(endpoint.getFile(),exchange.getFile().getName()); + }else { + file = exchange.getFile(); + } + try{ + FileChannel fc=new RandomAccessFile(file,"rw").getChannel(); + fc.position(fc.size()); + fc.write(payload); + fc.close(); + }catch(Throwable e){ + log.error("Failed to write to File: " + file,e); + } + } +} Propchange: activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileProducer.java ------------------------------------------------------------------------------ svn:eol-style = native