Return-Path: X-Original-To: apmail-incubator-cloudstack-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-cloudstack-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9D770D170 for ; Thu, 6 Sep 2012 21:25:55 +0000 (UTC) Received: (qmail 16733 invoked by uid 500); 6 Sep 2012 21:25:53 -0000 Delivered-To: apmail-incubator-cloudstack-commits-archive@incubator.apache.org Received: (qmail 16607 invoked by uid 500); 6 Sep 2012 21:25:52 -0000 Mailing-List: contact cloudstack-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: cloudstack-dev@incubator.apache.org Delivered-To: mailing list cloudstack-commits@incubator.apache.org Received: (qmail 16285 invoked by uid 99); 6 Sep 2012 21:25:52 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 06 Sep 2012 21:25:52 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 2B9F01BC57; Thu, 6 Sep 2012 21:25:52 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: edison@apache.org To: cloudstack-commits@incubator.apache.org X-Mailer: ASF-Git Admin Mailer Subject: [11/15] git commit: Add initial support for Caringo CAStor object store as backing store for S3 API Message-Id: <20120906212552.2B9F01BC57@tyr.zones.apache.org> Date: Thu, 6 Sep 2012 21:25:52 +0000 (UTC) Add initial support for Caringo CAStor object store as backing store for S3 API Signed-off-by: Chiradeep Vittal Project: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/commit/5d208a5c Tree: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/tree/5d208a5c Diff: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/diff/5d208a5c Branch: refs/heads/4.0 Commit: 5d208a5c95bfd6f22332bf1f5af8c4a13749c49a Parents: 5ae15f8 Author: Jamshid Afshar Authored: Wed Sep 5 23:35:09 2012 -0700 Committer: Chiradeep Vittal Committed: Wed Sep 5 23:35:09 2012 -0700 ---------------------------------------------------------------------- NOTICE | 37 ++ awsapi/pom.xml | 5 + .../com/cloud/bridge/io/S3CAStorBucketAdapter.java | 479 +++++++++++++++ awsapi/src/com/cloud/bridge/model/SHost.java | 4 +- .../service/controller/s3/ServiceProvider.java | 23 +- .../com/cloud/bridge/service/core/s3/S3Engine.java | 6 + 6 files changed, 551 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/5d208a5c/NOTICE ---------------------------------------------------------------------- diff --git a/NOTICE b/NOTICE index 9c124ab..13f608e 100644 --- a/NOTICE +++ b/NOTICE @@ -657,3 +657,40 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. ================================================================================ + + + For + CAStorSDK.jar + + This product includes CAStorSDK (http://www.caringo.com/) + under the BSD License + + ================================================================================ + Copyright (c) 2009, Caringo, Inc. + All rights reserved. + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions are met: + + * Redistributions of source code must retain the above copyright notice, + this list of conditions and the following disclaimer. + + * Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + + * Neither the name of Caringo, Inc. nor the names of its contributors may + be used to endorse or promote products derived from this software without + specific prior written permission. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR + ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON + ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + ================================================================================ http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/5d208a5c/awsapi/pom.xml ---------------------------------------------------------------------- diff --git a/awsapi/pom.xml b/awsapi/pom.xml index cf91033..cc30062 100644 --- a/awsapi/pom.xml +++ b/awsapi/pom.xml @@ -94,6 +94,11 @@ jasypt ${cs.jasypt.version} + + com.caringo.client + CAStorSDK + 1.3.1-CS40 + install http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/5d208a5c/awsapi/src/com/cloud/bridge/io/S3CAStorBucketAdapter.java ---------------------------------------------------------------------- diff --git a/awsapi/src/com/cloud/bridge/io/S3CAStorBucketAdapter.java b/awsapi/src/com/cloud/bridge/io/S3CAStorBucketAdapter.java new file mode 100644 index 0000000..ad6f6cd --- /dev/null +++ b/awsapi/src/com/cloud/bridge/io/S3CAStorBucketAdapter.java @@ -0,0 +1,479 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package com.cloud.bridge.io; + +import java.util.Arrays; +import java.util.HashSet; +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.MalformedURLException; +import java.net.URL; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; + +import javax.activation.DataHandler; +import javax.activation.DataSource; + +import org.apache.log4j.Logger; + +import com.cloud.bridge.service.core.s3.S3BucketAdapter; +import com.cloud.bridge.service.core.s3.S3MultipartPart; +import com.cloud.bridge.service.exception.ConfigurationException; +import com.cloud.bridge.service.exception.FileNotExistException; +import com.cloud.bridge.service.exception.InternalErrorException; +import com.cloud.bridge.service.exception.OutOfStorageException; +import com.cloud.bridge.service.exception.UnsupportedException; +import com.cloud.bridge.util.StringHelper; +import com.cloud.bridge.util.OrderedPair; + +import com.caringo.client.locate.Locator; +import com.caringo.client.locate.StaticLocator; +import com.caringo.client.locate.ZeroconfLocator; +import com.caringo.client.ResettableFileInputStream; +import com.caringo.client.ScspClient; +import com.caringo.client.ScspExecutionException; +import com.caringo.client.ScspHeaders; +import com.caringo.client.ScspQueryArgs; +import com.caringo.client.ScspResponse; +import org.apache.commons.httpclient.HttpClient; +import org.apache.commons.httpclient.methods.GetMethod; +import org.apache.commons.httpclient.Header; + +/** + * Creates an SCSP client to a CAStor cluster, configured in "storage.root", + * and use CAStor as the back-end storage instead of a file system. + */ +public class S3CAStorBucketAdapter implements S3BucketAdapter { + protected final static Logger s_logger = Logger.getLogger(S3CAStorBucketAdapter.class); + + private static final int HTTP_OK = 200; + private static final int HTTP_CREATED = 201; + private static final int HTTP_UNSUCCESSFUL = 300; + private static final int HTTP_PRECONDITION_FAILED = 412; + + // For ScspClient + private static final int DEFAULT_SCSP_PORT = 80; + private static final int DEFAULT_MAX_POOL_SIZE = 50; + private static final int DEFAULT_MAX_RETRIES = 5; + private static final int CONNECTION_TIMEOUT = 60 * 1000; // Request activity timeout - 1 minute + private static final int CM_IDLE_TIMEOUT = 60 * 1000; // HttpConnectionManager idle timeout - 1 minute + private static final int LOCATOR_RETRY_TIMEOUT = 0; // StaticLocator pool retry timeout + + private ScspClient _scspClient; // talks to CAStor cluster + private Locator _locator; // maintains list of CAStor nodes + private String _domain; // domain where all CloudStack streams will live + + private synchronized ScspClient myClient(String mountedRoot) { + if (_scspClient!=null) { + return _scspClient; + } + // The castor cluster is specified either by listing the ip addresses of some nodes, or + // by specifying "zeroconf=" and the cluster's mdns name -- this is "cluster" in castor's node.cfg. + // The "domain" to store streams can be specified. If not specified, streams will be written + // without a "domain" query arg, so they will go into the castor default domain. + // The port is optional and must be at the end of the config string, defaults to 80. + // Examples: "castor 172.16.78.130 172.16.78.131 80", "castor 172.16.78.130 domain=mycluster.example.com", + // "castor zeroconf=mycluster.example.com domain=mycluster.example.com 80" + String[] cfg = mountedRoot.split(" "); + int numIPs = cfg.length-1; + String possiblePort = cfg[cfg.length-1]; + int castorPort = DEFAULT_SCSP_PORT; + try { + castorPort = Integer.parseInt(possiblePort); + --numIPs; + } catch (NumberFormatException nfe) { + // okay, it's an ip address, not a port number + } + if (numIPs <= 0) { + throw new ConfigurationException("No CAStor nodes specified in '" + mountedRoot + "'"); + } + HashSet ips = new HashSet(); + String clusterName = null; + for ( int i = 0; i < numIPs; ++i ) { + String option = cfg[i+1]; // ip address or zeroconf=mycluster.example.com or domain=mydomain.example.com + if (option.toLowerCase().startsWith("zeroconf=")) { + String[] confStr = option.split("="); + if (confStr.length != 2) { + throw new ConfigurationException("Could not parse cluster name from '" + option + "'"); + } + clusterName = confStr[1]; + } else if (option.toLowerCase().startsWith("domain=")) { + String[] confStr = option.split("="); + if (confStr.length != 2) { + throw new ConfigurationException("Could not parse domain name from '" + option + "'"); + } + _domain = confStr[1]; + } else { + ips.add(option); + } + } + if (clusterName == null && ips.isEmpty()) { + throw new ConfigurationException("No CAStor nodes specified in '" + mountedRoot + "'"); + } + String[] castorNodes = ips.toArray(new String[0]); // list of configured nodes + if (clusterName == null) { + try { + _locator = new StaticLocator(castorNodes, castorPort, LOCATOR_RETRY_TIMEOUT); + _locator.start(); + } catch (IOException e) { + throw new ConfigurationException("Could not create CAStor static locator for '" + + Arrays.toString(castorNodes) + "'"); + } + } else { + try { + clusterName = clusterName.replace(".", "_"); // workaround needed for CAStorSDK 1.3.1 + _locator = new ZeroconfLocator(clusterName); + _locator.start(); + } catch (IOException e) { + throw new ConfigurationException("Could not create CAStor zeroconf locator for '" + clusterName + "'"); + } + } + try { + s_logger.info("CAStor client starting: " + (_domain==null ? "default domain" : "domain " + _domain) + " " + (clusterName==null ? Arrays.toString(castorNodes) : clusterName) + " :" + castorPort); + _scspClient = new ScspClient(_locator, castorPort, DEFAULT_MAX_POOL_SIZE, DEFAULT_MAX_RETRIES, CONNECTION_TIMEOUT, CM_IDLE_TIMEOUT); + _scspClient.start(); + } catch (Exception e) { + s_logger.error("Unable to create CAStor client for '" + mountedRoot + "': " + e.getMessage(), e); + throw new ConfigurationException("Unable to create CAStor client for '" + mountedRoot + "': " + e); + } + return _scspClient; + } + + private String castorURL(String mountedRoot, String bucket, String fileName) { + // TODO: Replace this method with access to ScspClient's Locator, + // or add read method that returns the body as an unread + // InputStream for use by loadObject() and loadObjectRange(). + + myClient(mountedRoot); // make sure castorNodes and castorPort initialized + InetSocketAddress nodeAddr = _locator.locate(); + if (nodeAddr == null) { + throw new ConfigurationException("Unable to locate CAStor node with locator " + _locator); + } + InetAddress nodeInetAddr = nodeAddr.getAddress(); + if (nodeInetAddr == null) { + _locator.foundDead(nodeAddr); + throw new ConfigurationException("Unable to resolve CAStor node name '" + nodeAddr.getHostName() + + "' to IP address"); + } + return "http://" + nodeInetAddr.getHostAddress() + ":" + nodeAddr.getPort() + "/" + bucket + "/" + fileName + + (_domain==null ? "" : "?domain=" + _domain); + } + + private ScspQueryArgs domainQueryArg() { + ScspQueryArgs qa = new ScspQueryArgs(); + if (this._domain != null) + qa.setValue("domain", this._domain); + return qa; + } + + public S3CAStorBucketAdapter() { + // TODO: is there any way to initialize CAStor client here, can it + // get to config? + } + + @Override + public void createContainer(String mountedRoot, String bucket) { + try { + ScspResponse bwResponse = myClient(mountedRoot).write(bucket, new ByteArrayInputStream("".getBytes()), 0, domainQueryArg(), new ScspHeaders()); + if (bwResponse.getHttpStatusCode() != HTTP_CREATED) { + if (bwResponse.getHttpStatusCode() == HTTP_PRECONDITION_FAILED) + s_logger.error("CAStor unable to create bucket " + bucket + " because domain " + + (this._domain==null ? "(default)" : this._domain) + " does not exist"); + else + s_logger.error("CAStor unable to create bucket " + bucket + ": " + bwResponse.getHttpStatusCode()); + throw new OutOfStorageException("CAStor unable to create bucket " + bucket + ": " + + bwResponse.getHttpStatusCode()); + } + } catch (ScspExecutionException e) { + s_logger.error("CAStor unable to create bucket " + bucket, e); + throw new OutOfStorageException("CAStor unable to create bucket " + bucket + ": " + e.getMessage()); + } + } + + @Override + public void deleteContainer(String mountedRoot, String bucket) { + try { + ScspResponse bwResponse = myClient(mountedRoot).delete("", bucket, domainQueryArg(), new ScspHeaders()); + if (bwResponse.getHttpStatusCode() >= HTTP_UNSUCCESSFUL) { + s_logger.error("CAStor unable to delete bucket " + bucket + ": " + bwResponse.getHttpStatusCode()); + throw new OutOfStorageException("CAStor unable to delete bucket " + bucket + ": " + + bwResponse.getHttpStatusCode()); + } + } catch (ScspExecutionException e) { + s_logger.error("CAStor unable to delete bucket " + bucket, e); + throw new OutOfStorageException("CAStor unable to delete bucket " + bucket + ": " + e.getMessage()); + } + } + + @Override + public String saveObject(InputStream is, String mountedRoot, String bucket, String fileName) + { + // TODO: Currently this writes the object to a temporary file, + // so that the MD5 can be computed and so that we have the + // stream length needed by this version of CAStor SDK. Will + // change to calculate MD5 while streaming to CAStor and to + // either pass Content-length to this method or use newer SDK + // that doesn't require it. + + FileOutputStream fos = null; + MessageDigest md5 = null; + + try { + md5 = MessageDigest.getInstance("MD5"); + } catch (NoSuchAlgorithmException e) { + s_logger.error("Unexpected exception " + e.getMessage(), e); + throw new InternalErrorException("Unable to get MD5 MessageDigest", e); + } + + File spoolFile = null; + try { + spoolFile = File.createTempFile("castor", null); + } catch (IOException e) { + s_logger.error("Unexpected exception creating temporary CAStor spool file: " + e.getMessage(), e); + throw new InternalErrorException("Unable to create temporary CAStor spool file", e); + } + try { + String retVal; + int streamLen = 0; + try { + fos = new FileOutputStream(spoolFile); + byte[] buffer = new byte[4096]; + int len = 0; + while( (len = is.read(buffer)) > 0) { + fos.write(buffer, 0, len); + streamLen = streamLen + len; + md5.update(buffer, 0, len); + + } + //Convert MD5 digest to (lowercase) hex String + retVal = StringHelper.toHexString(md5.digest()); + + } catch(IOException e) { + s_logger.error("Unexpected exception " + e.getMessage(), e); + throw new OutOfStorageException(e); + } finally { + try { + if (null != fos) + fos.close(); + } catch( Exception e ) { + s_logger.error("Can't close CAStor spool file " + + spoolFile.getAbsolutePath() + ": " + e.getMessage(), e); + throw new OutOfStorageException("Unable to close CAStor spool file: " + e.getMessage(), e); + } + } + + try { + ScspResponse bwResponse = + myClient(mountedRoot).write(bucket + "/" + fileName, + new ResettableFileInputStream(spoolFile), streamLen, + domainQueryArg(), new ScspHeaders()); + if (bwResponse.getHttpStatusCode() >= HTTP_UNSUCCESSFUL) { + s_logger.error("CAStor write responded with error " + bwResponse.getHttpStatusCode()); + throw new OutOfStorageException("Unable to write object to CAStor " + + bucket + "/" + fileName + ": " + bwResponse.getHttpStatusCode()); + } + } catch (ScspExecutionException e) { + s_logger.error("Unable to write object to CAStor " + bucket + "/" + fileName, e); + throw new OutOfStorageException("Unable to write object to CAStor " + bucket + "/" + fileName + ": " + + e.getMessage()); + } catch (IOException ie) { + s_logger.error("Unable to write object to CAStor " + bucket + "/" + fileName, ie); + throw new OutOfStorageException("Unable to write object to CAStor " + bucket + "/" + fileName + ": " + + ie.getMessage()); + } + return retVal; + } finally { + try { + if (!spoolFile.delete()) { + s_logger.error("Failed to delete CAStor spool file " + spoolFile.getAbsolutePath()); + } + } catch (SecurityException e) { + s_logger.error("Unable to delete CAStor spool file " + spoolFile.getAbsolutePath(), e); + } + } + } + + /** + * From a list of files (each being one part of the multipart upload), concatentate all files into a single + * object that can be accessed by normal S3 calls. This function could take a long time since a multipart is + * allowed to have upto 10,000 parts (each 5 gib long). Amazon defines that while this operation is in progress + * whitespace is sent back to the client inorder to keep the HTTP connection alive. + * + * @param mountedRoot - where both the source and dest buckets are located + * @param destBucket - resulting location of the concatenated objects + * @param fileName - resulting file name of the concatenated objects + * @param sourceBucket - special bucket used to save uploaded file parts + * @param parts - an array of file names in the sourceBucket + * @param client - if not null, then keep the servlet connection alive while this potentially long concatentation takes place + * @return OrderedPair with the first value the MD5 of the final object, and the second value the length of the final object + */ + @Override + public OrderedPair concatentateObjects(String mountedRoot, String destBucket, String fileName, String sourceBucket, S3MultipartPart[] parts, OutputStream client) + { + // TODO + throw new UnsupportedException("Multipart upload support not yet implemented in CAStor plugin"); + + /* + MessageDigest md5; + long totalLength = 0; + + try { + md5 = MessageDigest.getInstance("MD5"); + } catch (NoSuchAlgorithmException e) { + s_logger.error("Unexpected exception " + e.getMessage(), e); + throw new InternalErrorException("Unable to get MD5 MessageDigest", e); + } + + File file = new File(getBucketFolderDir(mountedRoot, destBucket) + File.separatorChar + fileName); + try { + // -> when versioning is off we need to rewrite the file contents + file.delete(); + file.createNewFile(); + + final FileOutputStream fos = new FileOutputStream(file); + byte[] buffer = new byte[4096]; + + // -> get the input stream for the next file part + for( int i=0; i < parts.length; i++ ) + { + DataHandler nextPart = loadObject( mountedRoot, sourceBucket, parts[i].getPath()); + InputStream is = nextPart.getInputStream(); + + int len = 0; + while( (len = is.read(buffer)) > 0) { + fos.write(buffer, 0, len); + md5.update(buffer, 0, len); + totalLength += len; + } + is.close(); + + // -> after each file write tell the client we are still here to keep connection alive + if (null != client) { + client.write( new String(" ").getBytes()); + client.flush(); + } + } + fos.close(); + return new OrderedPair(StringHelper.toHexString(md5.digest()), new Long(totalLength)); + //Create an ordered pair whose first element is the MD4 digest as a (lowercase) hex String + } + catch(IOException e) { + s_logger.error("concatentateObjects unexpected exception " + e.getMessage(), e); + throw new OutOfStorageException(e); + } + */ + } + + @Override + public DataHandler loadObject(String mountedRoot, String bucket, String fileName) { + try { + return new DataHandler(new URL(castorURL(mountedRoot, bucket, fileName))); + } catch (MalformedURLException e) { + s_logger.error("Failed to loadObject from CAStor", e); + throw new FileNotExistException("Unable to load object from CAStor: " + e.getMessage()); + } + } + + @Override + public void deleteObject(String mountedRoot, String bucket, String fileName) { + String filePath = bucket + "/" + fileName; + try { + ScspResponse bwResponse = myClient(mountedRoot).delete("", filePath, domainQueryArg(), new ScspHeaders()); + if (bwResponse.getHttpStatusCode() != HTTP_OK) { + s_logger.error("CAStor delete object responded with error " + bwResponse.getHttpStatusCode()); + throw new OutOfStorageException("CAStor unable to delete object " + filePath + ": " + + bwResponse.getHttpStatusCode()); + } + } catch (ScspExecutionException e) { + s_logger.error("CAStor unable to delete object " + filePath, e); + throw new OutOfStorageException("CAStor unable to delete object " + filePath + ": " + e.getMessage()); + } + } + + public class ScspDataSource implements DataSource { + GetMethod method; + public ScspDataSource(GetMethod m) { + method = m; + } + @Override + public String getContentType() { + Header h = method.getResponseHeader("Content-type"); + return h==null ? null : h.getValue(); + } + @Override + public InputStream getInputStream() throws IOException { + try { + return method.getResponseBodyAsStream(); + } catch (Exception e) { + s_logger.error("CAStor loadObjectRange getInputStream error", e); + return null; + } + } + @Override + public String getName() { + assert(false); + return null; + } + @Override + public OutputStream getOutputStream() throws IOException { + assert(false); + return null; + } + } + + @Override + public DataHandler loadObjectRange(String mountedRoot, String bucket, String fileName, long startPos, long endPos) { + try { + HttpClient httpClient = new HttpClient(); + // Create a method instance. + GetMethod method = new GetMethod(castorURL(mountedRoot, bucket, fileName)); + method.addRequestHeader("Range", "bytes=" + startPos + "-" + endPos); + int statusCode = httpClient.executeMethod(method); + if (statusCode < HTTP_OK || statusCode >= HTTP_UNSUCCESSFUL) { + s_logger.error("CAStor loadObjectRange response: "+ statusCode); + throw new FileNotExistException("CAStor loadObjectRange response: " + statusCode); + } + return new DataHandler(new ScspDataSource(method)); + } catch (Exception e) { + s_logger.error("CAStor loadObjectRange failure", e); + throw new FileNotExistException("CAStor loadObjectRange failure: " + e); + } + } + + @Override + public String getBucketFolderDir(String mountedRoot, String bucket) { + // This method shouldn't be needed and doesn't need to use + // mountedRoot (which is CAStor config values here), right? + String bucketFolder = getBucketFolderName(bucket); + return bucketFolder; + } + + private String getBucketFolderName(String bucket) { + // temporary + String name = bucket.replace(' ', '_'); + name = bucket.replace('\\', '-'); + name = bucket.replace('/', '-'); + + return name; + } +} http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/5d208a5c/awsapi/src/com/cloud/bridge/model/SHost.java ---------------------------------------------------------------------- diff --git a/awsapi/src/com/cloud/bridge/model/SHost.java b/awsapi/src/com/cloud/bridge/model/SHost.java index 4ed5b7e..a6a2f58 100644 --- a/awsapi/src/com/cloud/bridge/model/SHost.java +++ b/awsapi/src/com/cloud/bridge/model/SHost.java @@ -24,9 +24,11 @@ public interface SHost { public static final int STORAGE_HOST_TYPE_LOCAL = 0; public static final int STORAGE_HOST_TYPE_NFS = 1; + public static final int STORAGE_HOST_TYPE_CASTOR = 2; public static enum StorageHostType { STORAGE_HOST_TYPE_LOCAL, //0 - STORAGE_HOST_TYPE_NFS //1 + STORAGE_HOST_TYPE_NFS, //1 + STORAGE_HOST_TYPE_CASTOR //2 } /* private Long id; http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/5d208a5c/awsapi/src/com/cloud/bridge/service/controller/s3/ServiceProvider.java ---------------------------------------------------------------------- diff --git a/awsapi/src/com/cloud/bridge/service/controller/s3/ServiceProvider.java b/awsapi/src/com/cloud/bridge/service/controller/s3/ServiceProvider.java index 2f1791e..2ddbbf2 100644 --- a/awsapi/src/com/cloud/bridge/service/controller/s3/ServiceProvider.java +++ b/awsapi/src/com/cloud/bridge/service/controller/s3/ServiceProvider.java @@ -243,7 +243,13 @@ public class ServiceProvider { //PersistContext.flush(); String localStorageRoot = properties.getProperty("storage.root"); - if (localStorageRoot != null) setupLocalStorage(localStorageRoot); + if (localStorageRoot != null) { + if (localStorageRoot.toLowerCase().startsWith("castor")) { + setupCAStorStorage(localStorageRoot); + } else { + setupLocalStorage(localStorageRoot); + } + } multipartDir = properties.getProperty("storage.multipartDir"); @@ -318,7 +324,20 @@ public class ServiceProvider { } } - public void shutdown() { + private void setupCAStorStorage(String storageRoot) { + SHostVO shost = shostDao.getLocalStorageHost(mhost.getId(), storageRoot); + if(shost == null) { + shost = new SHostVO(); + shost.setMhost(mhost); + shost.setMhostid(mhost.getId()); + shost.setHostType(SHost.STORAGE_HOST_TYPE_CASTOR); + shost.setHost(NetHelper.getHostName()); + shost.setExportRoot(storageRoot); + shostDao.persist(shost); + } + } + + public void shutdown() { timer.cancel(); if(logger.isInfoEnabled()) http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/5d208a5c/awsapi/src/com/cloud/bridge/service/core/s3/S3Engine.java ---------------------------------------------------------------------- diff --git a/awsapi/src/com/cloud/bridge/service/core/s3/S3Engine.java b/awsapi/src/com/cloud/bridge/service/core/s3/S3Engine.java index a117d13..916c51d 100644 --- a/awsapi/src/com/cloud/bridge/service/core/s3/S3Engine.java +++ b/awsapi/src/com/cloud/bridge/service/core/s3/S3Engine.java @@ -39,6 +39,7 @@ import org.apache.log4j.Logger; import org.json.simple.parser.ParseException; import com.cloud.bridge.io.S3FileSystemBucketAdapter; +import com.cloud.bridge.io.S3CAStorBucketAdapter; import com.cloud.bridge.model.BucketPolicyVO; import com.cloud.bridge.model.MHostMountVO; import com.cloud.bridge.model.MHostVO; @@ -115,6 +116,7 @@ public class S3Engine { public S3Engine() { bucketAdapters.put(SHost.STORAGE_HOST_TYPE_LOCAL, new S3FileSystemBucketAdapter()); + bucketAdapters.put(SHost.STORAGE_HOST_TYPE_CASTOR, new S3CAStorBucketAdapter()); } @@ -1398,6 +1400,10 @@ public class S3Engine { return new OrderedPair(shost, shost.getExportRoot()); } + if(shost.getHostType() == SHost.STORAGE_HOST_TYPE_CASTOR ) { + return new OrderedPair(shost, shost.getExportRoot()); + } + MHostMountVO mount = mountDao.getHostMount(ServiceProvider.getInstance().getManagementHostId(), shost.getId()); if(mount != null) { return new OrderedPair(shost, mount.getMountPath());