Return-Path: X-Original-To: apmail-incubator-cloudstack-dev-archive@minotaur.apache.org Delivered-To: apmail-incubator-cloudstack-dev-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id ED89191A8 for ; Tue, 22 May 2012 19:59:18 +0000 (UTC) Received: (qmail 70837 invoked by uid 500); 22 May 2012 19:59:18 -0000 Delivered-To: apmail-incubator-cloudstack-dev-archive@incubator.apache.org Received: (qmail 70757 invoked by uid 500); 22 May 2012 19:59:18 -0000 Mailing-List: contact cloudstack-dev-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: cloudstack-dev@incubator.apache.org Delivered-To: mailing list cloudstack-dev@incubator.apache.org Received: (qmail 70749 invoked by uid 99); 22 May 2012 19:59:18 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 22 May 2012 19:59:18 +0000 X-ASF-Spam-Status: No, hits=-0.0 required=5.0 tests=RCVD_IN_DNSWL_LOW,SPF_NEUTRAL X-Spam-Check-By: apache.org Received-SPF: neutral (nike.apache.org: local policy) Received: from [209.85.210.47] (HELO mail-pz0-f47.google.com) (209.85.210.47) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 22 May 2012 19:59:10 +0000 Received: by dalh21 with SMTP id h21so7769609dal.6 for ; Tue, 22 May 2012 12:58:48 -0700 (PDT) X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=google.com; s=20120113; h=mime-version:from:date:message-id:subject:to:content-type :content-transfer-encoding:x-gm-message-state; bh=JE9zWr3dTmTlAvarOBejRsx//CQWJonZM7ILtQvd2Ys=; b=aMKKrzl2OHTaDTQwr0Oa/XOA+Tlx1tnRrWXobQ7BUwcmqm0krzEWShcmrc3fwEwfOr Vq3pMQ+5JUPn/OhdtW4o4B3phksOQE90BHj7AImDP+0bqJLX7DmfcRjuZtSfmucwfoGY ik/udxht0feSluGBLt67A50dqgora90T2f91DCcM8z+RlpHaKGazGtYY8NhNwZwoIQr4 3Lr4nzkwk4aW+Vo3nMYLBNjVjDNxkIenzz61sojBx8yMz016U+QWRVJPL+huD9mCByxe LRL97CZ0hVocDWPEp8uTapK1xS7X3/HAflNKtJzEfl5UpDk0Nlrq+YC660XPu08zddCY 39Dg== Received: by 10.50.89.166 with SMTP id bp6mr10846298igb.69.1337716728582; Tue, 22 May 2012 12:58:48 -0700 (PDT) MIME-Version: 1.0 Received: by 10.64.169.137 with HTTP; Tue, 22 May 2012 12:58:28 -0700 (PDT) From: David Nalley Date: Tue, 22 May 2012 15:58:28 -0400 Message-ID: Subject: HDFS and CloudStack? was: [SCM] CloudStack OSS branch s3-hdfs To: cloudstack-dev@incubator.apache.org Content-Type: text/plain; charset=ISO-8859-1 Content-Transfer-Encoding: quoted-printable X-Gm-Message-State: ALoCoQlx5is3cAJZXoVOlv6AEpEwX9dirO7CwqtSX9QZvtpoqCA3oRQ/JDXmBRizknMmf5M3+p2p Hi Chiradeep - I just saw this branch created - which has a fascinating name - sadly I can't find any docs that talk about Hadoop or HDFS on the wiki (there is a parenthetical statement on the wiki that says 'like Hadoop' but nothing much past that) Can you tell us about what your plan is here, objectives? Functional spec exist for any of this work? --David -------- Original Message -------- Subject: [SCM] CloudStack OSS branch s3-hdfs created. 15dd6a25254d83076cd362d65a728fee8681a4b4 Date: Tue, 22 May 2012 15:11:14 -0400 From: git@cloud.com To: SourceForge This is an automated email from the git hooks/post-receive script. It was generated because a ref change was pushed to the repository containing the project "CloudStack OSS". The branch, s3-hdfs has been created =A0 =A0 =A0 =A0at =A015dd6a25254d83076cd362d65a728fee8681a4b4 (commit) - Log ----------------------------------------------------------------- http://git.cloud.com/cgit/cloudstack-oss/commit/?id=3D15dd6a25254d83076cd36= 2d65a728fee8681a4b4 commit 15dd6a25254d83076cd362d65a728fee8681a4b4 Author: Chiradeep Vittal Date: =A0 Tue May 22 12:44:56 2012 -0700 =A0 =A0Handle HDFS as the backend diff --git a/awsapi/.classpath b/awsapi/.classpath index ab9174c..18f729d 100644 --- a/awsapi/.classpath +++ b/awsapi/.classpath @@ -4,64 +4,5 @@ =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 diff --git a/awsapi/src/com/cloud/bridge/model/SHost.java b/awsapi/src/com/cloud/bridge/model/SHost.java index 2d26968..864956c 100644 --- a/awsapi/src/com/cloud/bridge/model/SHost.java +++ b/awsapi/src/com/cloud/bridge/model/SHost.java @@ -27,6 +27,8 @@ public class SHost implements Serializable { =A0 =A0 =A0 =A0public static final int STORAGE_HOST_TYPE_LOCAL =3D 0; =A0 =A0 =A0 =A0public static final int STORAGE_HOST_TYPE_NFS =3D 1; + =A0 =A0 =A0 public static final int STORAGE_HOST_TYPE_HDFS =3D 2; + =A0 =A0 =A0 =A0private Long id; diff --git a/awsapi/src/com/cloud/bridge/service/controller/s3/ServiceProvider.java b/awsapi/src/com/cloud/bridge/service/controller/s3/ServiceProvider.java index 18acb1a..6b3e24e 100644 --- a/awsapi/src/com/cloud/bridge/service/controller/s3/ServiceProvider.jav= a +++ b/awsapi/src/com/cloud/bridge/service/controller/s3/ServiceProvider.jav= a @@ -26,6 +26,10 @@ import java.lang.reflect.InvocationHandler; =A0import java.lang.reflect.Method; =A0import java.lang.reflect.Proxy; =A0import java.net.InetAddress; +import java.net.MalformedURLException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; =A0import java.sql.SQLException; =A0import java.util.HashMap; =A0import java.util.Map; @@ -34,6 +38,7 @@ import java.util.Timer; =A0import java.util.TimerTask; =A0import org.apache.axis2.AxisFault; +import org.apache.hadoop.fs.FsUrlStreamHandlerFactory; =A0import org.apache.log4j.Logger; =A0import org.apache.log4j.xml.DOMConfigurator; =A0import org.hibernate.SessionException; @@ -233,7 +238,12 @@ public class ServiceProvider { =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0PersistContext.flush(); =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0String localStorageRoot =3D properties.getProperty("storage.root"); - =A0 =A0 =A0 =A0 =A0 =A0 =A0 if (localStorageRoot !=3D null) setupLocalStorage(localStorageRoot); + =A0 =A0 =A0 =A0 =A0 =A0 =A0 if (localStorageRoot !=3D null) { + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 if (localStorageRoot.startsWith("hdfs= ")) + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 setupHdfsStorage(loca= lStorageRoot); + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 else + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 setupLocalStorage(localStorag= eRoot); + =A0 =A0 =A0 =A0 =A0 =A0 =A0 } =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0multipartDir =3D properties.getProperty("storage.multipartDir"); @@ -243,7 +253,29 @@ public class ServiceProvider { =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0logger.info("ServiceProvider= initialized"); =A0 =A0 =A0 =A0} - =A0 =A0 =A0 private void loadStartupProperties() { + =A0 =A0 =A0 private void setupHdfsStorage(String storageRoot) { + =A0 =A0 =A0 =A0 =A0 URI uri; + =A0 =A0 =A0 =A0try { + =A0 =A0 =A0 =A0 =A0 =A0uri =3D new URI(storageRoot); + =A0 =A0 =A0 =A0} catch (URISyntaxException e) { + =A0 =A0 =A0 =A0 =A0 throw new ConfigurationException("Invalid hdfs URI: "= + storageRoot + " should be hdfs://host/path"); + =A0 =A0 =A0 =A0} + =A0 =A0 =A0 =A0 =A0 SHostDao shostDao =3D new SHostDao(); + =A0 =A0 =A0 =A0SHost shost =3D shostDao.getLocalStorageHost(mhost.getId()= , storageRoot); + =A0 =A0 =A0 =A0if(shost =3D=3D null) { + =A0 =A0 =A0 =A0 =A0 =A0shost =3D new SHost(); + =A0 =A0 =A0 =A0 =A0 =A0shost.setMhost(mhost); + =A0 =A0 =A0 =A0 =A0 =A0mhost.getLocalSHosts().add(shost); + =A0 =A0 =A0 =A0 =A0 =A0shost.setHostType(SHost.STORAGE_HOST_TYPE_HDFS); + =A0 =A0 =A0 =A0 =A0 =A0shost.setHost(uri.getHost()); + =A0 =A0 =A0 =A0 =A0 =A0//shost.setExportRoot(url.getPath()); + =A0 =A0 =A0 =A0 =A0 =A0shost.setExportRoot(storageRoot); + =A0 =A0 =A0 =A0 =A0 =A0PersistContext.getSession().save(shost); + =A0 =A0 =A0 =A0} + + =A0 =A0} + + =A0 =A0private void loadStartupProperties() { =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0File propertiesFile =3D ConfigurationHelper.findConfigurationFile("cloud-bridge.properties"); =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0properties =3D new Properties(); =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0if(propertiesFile !=3D null) { diff --git a/awsapi/src/com/cloud/bridge/service/core/s3/S3Engine.java b/awsapi/src/com/cloud/bridge/service/core/s3/S3Engine.java index 4b04d3d..4af18a3 100644 --- a/awsapi/src/com/cloud/bridge/service/core/s3/S3Engine.java +++ b/awsapi/src/com/cloud/bridge/service/core/s3/S3Engine.java @@ -18,6 +18,8 @@ package com.cloud.bridge.service.core.s3; =A0import java.io.IOException; =A0import java.io.InputStream; =A0import java.io.OutputStream; +import java.net.URI; +import java.net.URISyntaxException; =A0import java.sql.SQLException; =A0import java.util.ArrayList; =A0import java.util.Calendar; @@ -39,6 +41,7 @@ import org.hibernate.LockMode; =A0import org.hibernate.Session; =A0import org.json.simple.parser.ParseException; +import com.cloud.bridge.io.HdfsBucketAdapter; =A0import com.cloud.bridge.io.S3FileSystemBucketAdapter; =A0import com.cloud.bridge.model.MHost; =A0import com.cloud.bridge.model.MHostMount; @@ -94,6 +97,14 @@ public class S3Engine { =A0 =A0 public S3Engine() { =A0 =A0 =A0 =A0bucketAdapters.put(SHost.STORAGE_HOST_TYPE_LOCAL, new S3FileSystemBucketAdapter()); + =A0 =A0 =A0 HdfsBucketAdapter hdfsAdapter =3D new HdfsBucketAdapter(); + =A0 =A0 =A0 try { + =A0 =A0 =A0 =A0 =A0 hdfsAdapter.initialize(); + =A0 =A0 =A0 =A0 =A0 =A0bucketAdapters.put(SHost.STORAGE_HOST_TYPE_HDFS, h= dfsAdapter); + =A0 =A0 =A0 }catch (IOException e ){ + =A0 =A0 =A0 =A0 =A0 return; + =A0 =A0 =A0 } + =A0 =A0 } @@ -1375,7 +1386,16 @@ public class S3Engine { =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0if(shost.getHostType() =3D=3D SHost.STORAGE_= HOST_TYPE_LOCAL) { =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0return new OrderedPair(shost, shost.getExportRoot()); =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0} - + + =A0 =A0 =A0 =A0 =A0 =A0 =A0 if(shost.getHostType() =3D=3D SHost.STORAGE_H= OST_TYPE_HDFS ) { + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 try { + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 URI uri =3D new URI(shost.get= ExportRoot()); + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 return new OrderedPair(shost, uri.getPath()); + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 } catch (URISyntaxException use) { + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 throw new InternalErrorExcept= ion("Could not determine storage path for HDFS"); + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 } + =A0 =A0 =A0 =A0 =A0 =A0 =A0 } + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0MHostMount mount =3D mountDao.getHostMount(ServiceProvider.getInstance().getManagementHostId(), shost.getId()); =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0if(mount !=3D null) { =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0return new OrderedPair(shost, mount.getMountPath()); diff --git a/deps/.classpath b/deps/.classpath index f704a05..66687cb 100755 --- a/deps/.classpath +++ b/deps/.classpath @@ -1,65 +1,71 @@ - - - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - =A0 =A0 =A0 - + + + =A0 =A0 =A0 + =A0 =A0 =A0 + =A0 =A0 =A0 + =A0 =A0 =A0 + =A0 =A0 =A0 + =A0 =A0 =A0 + =A0 =A0 =A0 + =A0 =A0 =A0 + =A0 =A0 =A0 + =A0 =A0 =A0 + =A0 =A0 =A0 + =A0 =A0 =A0 + =A0 =A0 =A0 + =A0 =A0 =A0 + =A0 =A0 =A0 + =A0 =A0 =A0 + =A0 =A0 =A0 + =A0 =A0 =A0 + =A0 =A0 =A0 + =A0 =A0 =A0 + =A0 =A0 =A0 + =A0 =A0 =A0 + =A0 =A0 =A0 + =A0 =A0 =A0 + =A0 =A0 =A0 + =A0 =A0 =A0 + =A0 =A0 =A0 + =A0 =A0 =A0 + =A0 =A0 =A0 + =A0 =A0 =A0 + =A0 =A0 =A0 + =A0 =A0 =A0 + =A0 =A0 =A0 + =A0 =A0 =A0 + =A0 =A0 =A0 + =A0 =A0 =A0 + =A0 =A0 =A0 + =A0 =A0 =A0 + =A0 =A0 =A0 + =A0 =A0 =A0 + =A0 =A0 =A0 + =A0 =A0 =A0 + =A0 =A0 =A0 + =A0 =A0 =A0 + =A0 =A0 =A0 + =A0 =A0 =A0 + =A0 =A0 =A0 + =A0 =A0 =A0 + =A0 =A0 =A0 + =A0 =A0 =A0 + =A0 =A0 =A0 + =A0 =A0 =A0 + =A0 =A0 =A0 + =A0 =A0 =A0 + =A0 =A0 =A0 + =A0 =A0 =A0 + =A0 =A0 =A0 + =A0 =A0 =A0 + =A0 =A0 =A0 + =A0 =A0 =A0 + =A0 =A0 =A0 + =A0 =A0 =A0 + =A0 =A0 =A0 =A0 =A0 =A0 =A0 + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 + =A0 =A0 =A0 =A0 =A0 =A0 =A0 + =A0 =A0 =A0 + =A0 =A0 =A0 + =A0 =A0 =A0 + diff --git a/deps/awsapi-lib/xercesImpl.jar b/deps/awsapi-lib/xercesImpl.ja= r index 3b351f6..0aaa990 100644 Binary files a/deps/awsapi-lib/xercesImpl.jar and b/deps/awsapi-lib/xercesImpl.jar differ diff --git a/deps/awsapi-lib/xml-apis.jar b/deps/awsapi-lib/xml-apis.jar index b338fb6..4673346 100644 Binary files a/deps/awsapi-lib/xml-apis.jar and b/deps/awsapi-lib/xml-apis.jar differ http://git.cloud.com/cgit/cloudstack-oss/commit/?id=3D52c59e2df65b6b7345a82= 28dbe3a9945644cf4a8 commit 52c59e2df65b6b7345a8228dbe3a9945644cf4a8 Author: Chiradeep Vittal Date: =A0 Tue May 22 12:44:02 2012 -0700 =A0 =A0Initial commit of HDFS-related files diff --git a/awsapi/src/com/cloud/bridge/io/HdfsBucketAdapter.java b/awsapi/src/com/cloud/bridge/io/HdfsBucketAdapter.java new file mode 100644 index 0000000..7a10f15 --- /dev/null +++ b/awsapi/src/com/cloud/bridge/io/HdfsBucketAdapter.java @@ -0,0 +1,117 @@ +package com.cloud.bridge.io; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import javax.activation.DataHandler; + +import org.apache.log4j.Logger; + +import com.cloud.bridge.service.core.s3.S3BucketAdapter; +import com.cloud.bridge.service.core.s3.S3MultipartPart; +import com.cloud.bridge.service.exception.InternalErrorException; +import com.cloud.bridge.service.exception.OutOfStorageException; +import com.cloud.bridge.service.exception.UnsupportedException; +import com.cloud.bridge.util.OrderedPair; + +public class HdfsBucketAdapter implements S3BucketAdapter { + =A0 =A0protected final static Logger logger =3D Logger.getLogger(HdfsBucketAdapter.class); + + =A0 =A0HdfsClient _client =3D new HdfsClient(); + + =A0 =A0public HdfsBucketAdapter() { + =A0 =A0} + + =A0 =A0public void initialize() throws IOException { + =A0 =A0 =A0 =A0_client.initialize(); + =A0 =A0} + + =A0 =A0@Override + =A0 =A0public void createContainer(String mountedRoot, String bucket) { + =A0 =A0 =A0 =A0String dir =3D getBucketFolderDir(mountedRoot, bucket); + =A0 =A0 =A0 =A0try { + =A0 =A0 =A0 =A0 =A0 =A0_client.mkdir(dir); + =A0 =A0 =A0 =A0} catch (IOException e) { + =A0 =A0 =A0 =A0 =A0 =A0throw new OutOfStorageException("Unable to create = container " + bucket, e); + =A0 =A0 =A0 =A0} + + =A0 =A0} + + =A0 =A0@Override + =A0 =A0public void deleteContainer(String mountedRoot, String bucket) { + =A0 =A0 =A0 =A0String dir =3D getBucketFolderDir(mountedRoot, bucket); + =A0 =A0 =A0 =A0try { + =A0 =A0 =A0 =A0 =A0 =A0_client.deleteFile(dir); + =A0 =A0 =A0 =A0} catch (IOException e) { + =A0 =A0 =A0 =A0 =A0 =A0throw new OutOfStorageException("Unable to delete = " + dir + " for bucket " + bucket); + =A0 =A0 =A0 =A0} + + =A0 =A0} + + =A0 =A0@Override + =A0 =A0public String getBucketFolderDir(String mountedRoot, String bucket= ) { + =A0 =A0 =A0 =A0String bucketFolder =3D getBucketFolderName(bucket); + =A0 =A0 =A0 =A0String dir; + =A0 =A0 =A0 =A0String separator =3D "" + File.separatorChar; + =A0 =A0 =A0 =A0if(!mountedRoot.endsWith(separator)) + =A0 =A0 =A0 =A0 =A0 =A0dir =3D mountedRoot + separator + bucketFolder; + =A0 =A0 =A0 =A0else + =A0 =A0 =A0 =A0 =A0 =A0dir =3D mountedRoot + bucketFolder; + + =A0 =A0 =A0 =A0return dir; + =A0 =A0} + + =A0 =A0@Override + =A0 =A0public String saveObject(InputStream is, String mountedRoot, Strin= g bucket, String fileName) { + =A0 =A0 =A0 =A0String name =3D getBucketFolderDir(mountedRoot, bucket) + File.separatorChar + fileName; + =A0 =A0 =A0 =A0try { + =A0 =A0 =A0 =A0 =A0 =A0return _client.saveFile(is, name); + =A0 =A0 =A0 =A0} catch (IOException e) { + =A0 =A0 =A0 =A0 =A0 =A0throw new OutOfStorageException(e); + =A0 =A0 =A0 =A0} + =A0 =A0} + + =A0 =A0@Override + =A0 =A0public DataHandler loadObject(String mountedRoot, String bucket, String fileName) { + =A0 =A0 =A0 =A0String src =3D getBucketFolderDir(mountedRoot, bucket) + File.separatorChar + fileName; + =A0 =A0 =A0 =A0try { + =A0 =A0 =A0 =A0 =A0 =A0return _client.readFile(src); + =A0 =A0 =A0 =A0} catch (IOException e) { + =A0 =A0 =A0 =A0 =A0 throw new OutOfStorageException(e); + =A0 =A0 =A0 =A0} + =A0 =A0} + + =A0 =A0@Override + =A0 =A0public DataHandler loadObjectRange(String mountedRoot, String bucket, String fileName, long startPos, long endPos) { + =A0 =A0 =A0 =A0String src =3D getBucketFolderDir(mountedRoot, bucket) + File.separatorChar + fileName; + =A0 =A0 =A0 =A0return _client.readRange(src, startPos, endPos); + =A0 =A0} + + =A0 =A0@Override + =A0 =A0public void deleteObject(String mountedRoot, String bucket, String fileName) { + =A0 =A0 =A0 =A0String src =3D getBucketFolderDir(mountedRoot, bucket) + File.separatorChar + fileName; + =A0 =A0 =A0 =A0try { + =A0 =A0 =A0 =A0 =A0 =A0_client.deleteFile(src); + =A0 =A0 =A0 =A0} catch (IOException e) { + =A0 =A0 =A0 =A0 =A0 =A0throw new InternalErrorException("Unable to delete= object " + src, e); + =A0 =A0 =A0 =A0} + + =A0 =A0} + + =A0 =A0@Override + =A0 =A0public OrderedPair concatentateObjects(String mountedRoot, String destBucket, String fileName, String sourceBucket, S3MultipartPart[] parts, OutputStream client) { + =A0 =A0 =A0 =A0throw new UnsupportedException("HDFS backing store does no= t support concatenate yet"); + =A0 =A0 =A0 =A0//TODO + =A0 =A0} + + =A0 =A0private String getBucketFolderName(String bucket) { + =A0 =A0 =A0 =A0// temporary + =A0 =A0 =A0 =A0String name =3D bucket.replace(' ', '_'); + =A0 =A0 =A0 =A0name =3D bucket.replace('\\', '-'); + =A0 =A0 =A0 =A0name =3D bucket.replace('/', '-'); + + =A0 =A0 =A0 =A0return name; + =A0 =A0} +} diff --git a/awsapi/src/com/cloud/bridge/io/HdfsClient.java b/awsapi/src/com/cloud/bridge/io/HdfsClient.java new file mode 100644 index 0000000..0079a53 --- /dev/null +++ b/awsapi/src/com/cloud/bridge/io/HdfsClient.java @@ -0,0 +1,175 @@ +package com.cloud.bridge.io; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.MalformedURLException; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; + +import javax.activation.DataHandler; +import javax.activation.DataSource; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.Logger; + +import com.cloud.bridge.service.exception.FileNotExistException; +import com.cloud.bridge.service.exception.InternalErrorException; +import com.cloud.bridge.service.exception.OutOfStorageException; +import com.cloud.bridge.util.ConfigurationHelper; +import com.cloud.bridge.util.StringHelper; + +public class HdfsClient { + =A0 =A0private static final class HdfsDataSource implements DataSource { + =A0 =A0 =A0 =A0InputStream istream; + + =A0 =A0 =A0 =A0public HdfsDataSource(InputStream in) { + =A0 =A0 =A0 =A0 =A0 istream =3D in; + =A0 =A0 =A0 =A0} + + =A0 =A0 =A0 =A0@Override + =A0 =A0 =A0 =A0public OutputStream getOutputStream() throws IOException { + =A0 =A0 =A0 =A0 =A0 =A0return null; //TODO + =A0 =A0 =A0 =A0} + + =A0 =A0 =A0 =A0@Override + =A0 =A0 =A0 =A0public String getName() { + =A0 =A0 =A0 =A0 =A0 =A0return this.getClass().getName(); + =A0 =A0 =A0 =A0} + + =A0 =A0 =A0 =A0@Override + =A0 =A0 =A0 =A0public InputStream getInputStream() throws IOException { + =A0 =A0 =A0 =A0 =A0 =A0return istream; + =A0 =A0 =A0 =A0} + + =A0 =A0 =A0 =A0@Override + =A0 =A0 =A0 =A0public String getContentType() { + =A0 =A0 =A0 =A0 =A0 =A0return "application/octet-stream"; + =A0 =A0 =A0 =A0} + =A0 =A0} + =A0 =A0protected final static Logger logger =3D Logger.getLogger(HdfsClient.class); + + =A0 =A0Configuration conf =3D new Configuration(); + =A0 =A0FileSystem fileSystem; + =A0 =A0boolean initialized =3D false; + + + =A0 =A0public HdfsClient() { + + =A0 =A0} + + =A0 =A0public void initialize() throws IOException { + =A0 =A0 =A0 =A0if (initialized) { + =A0 =A0 =A0 =A0 =A0 =A0return; + =A0 =A0 =A0 =A0} + =A0 =A0 =A0 =A0File coreCfg =3D ConfigurationHelper.findConfigurationFile("core-site.xml"); + =A0 =A0 =A0 =A0File siteCfg =3D ConfigurationHelper.findConfigurationFile("hdfs-site.xml"); + =A0 =A0 =A0 =A0conf.addResource(coreCfg.getAbsolutePath()); + =A0 =A0 =A0 =A0conf.addResource(siteCfg.getAbsolutePath()); + =A0 =A0 =A0 =A0fileSystem =3D FileSystem.get(conf); + =A0 =A0 =A0 =A0initialized =3D true; + + =A0 =A0} + + =A0 =A0public String saveFile(InputStream is, String dest) throws IOException { + + =A0 =A0 =A0 =A0Path path =3D new Path(dest); + + =A0 =A0 =A0 =A0FSDataOutputStream fos =3D null; + =A0 =A0 =A0 =A0MessageDigest md5 =3D null; + + =A0 =A0 =A0 =A0try { + =A0 =A0 =A0 =A0 =A0 =A0md5 =3D MessageDigest.getInstance("MD5"); + =A0 =A0 =A0 =A0} catch (NoSuchAlgorithmException e) { + =A0 =A0 =A0 =A0 =A0 =A0throw new InternalErrorException("Unable to get MD= 5 MessageDigest", e); + =A0 =A0 =A0 =A0} + + =A0 =A0 =A0 =A0try { + =A0 =A0 =A0 =A0 =A0 =A0// -> when versioning is off we need to rewrite th= e file contents + =A0 =A0 =A0 =A0 =A0 =A0fileSystem.delete(path, true); + =A0 =A0 =A0 =A0 =A0 =A0fos =3D fileSystem.create(path); + + =A0 =A0 =A0 =A0 =A0 =A0byte[] buffer =3D new byte[4096]; + =A0 =A0 =A0 =A0 =A0 =A0int len =3D 0; + =A0 =A0 =A0 =A0 =A0 =A0while( (len =3D is.read(buffer)) > 0) { + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0fos.write(buffer, 0, len); + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0md5.update(buffer, 0, len); + + =A0 =A0 =A0 =A0 =A0 =A0} + =A0 =A0 =A0 =A0 =A0 =A0//Convert MD5 digest to (lowercase) hex String + =A0 =A0 =A0 =A0 =A0 =A0return StringHelper.toHexString(md5.digest()); + + =A0 =A0 =A0 =A0} + =A0 =A0 =A0 =A0catch(IOException e) { + =A0 =A0 =A0 =A0 =A0 =A0throw new OutOfStorageException(e); + =A0 =A0 =A0 =A0} + =A0 =A0 =A0 =A0finally { + =A0 =A0 =A0 =A0 =A0 =A0try { + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0if (null !=3D fos) fos.close(); + =A0 =A0 =A0 =A0 =A0 =A0} + =A0 =A0 =A0 =A0 =A0 =A0catch( Exception e ) { + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0logger.error("Can't close FileOutputStream= " + e.getMessage(), e); + =A0 =A0 =A0 =A0 =A0 =A0} + =A0 =A0 =A0 =A0} + + =A0 =A0} + + =A0 =A0public DataHandler readFile(String file) throws IOException { + + =A0 =A0 =A0 =A0Path path =3D new Path(file); + =A0 =A0 =A0 =A0if (!fileSystem.exists(path)) { + =A0 =A0 =A0 =A0 =A0 =A0throw new FileNotExistException("Unable to open un= derlying object file"); + =A0 =A0 =A0 =A0} + + =A0 =A0 =A0 =A0FSDataInputStream in =3D fileSystem.open(path); + + =A0 =A0 =A0 =A0return new DataHandler(new HdfsDataSource(in)); + =A0 =A0} + + =A0 =A0public DataHandler readRange(String file, long startPos, long endP= os) { + =A0 =A0 =A0 =A0try { + =A0 =A0 =A0 =A0 =A0 =A0DataSource ds =3D new HdfsRangeDataSource(fileSyst= em, file, startPos, endPos); + =A0 =A0 =A0 =A0 =A0 =A0return new DataHandler(ds); + =A0 =A0 =A0 =A0} catch(IOException e) { + =A0 =A0 =A0 =A0 =A0 =A0throw new FileNotExistException("Unable to open un= derlying object file"); + =A0 =A0 =A0 =A0} + =A0 =A0} + + =A0 =A0public void deleteFile(String file) throws IOException { + + =A0 =A0 =A0 =A0Path path =3D new Path(file); + =A0 =A0 =A0 =A0if (!fileSystem.exists(path)) { + =A0 =A0 =A0 =A0 =A0 =A0throw new FileNotExistException("File " + file + "= does not exist"); + =A0 =A0 =A0 =A0} + =A0 =A0 =A0 =A0fileSystem.delete(path, true); + + =A0 =A0} + + =A0 =A0public void mkdir(String dir) throws IOException { + + =A0 =A0 =A0 =A0Path path =3D new Path(dir); + =A0 =A0 =A0 =A0if (fileSystem.exists(path)) { + =A0 =A0 =A0 =A0 =A0 =A0return; + =A0 =A0 =A0 =A0} + + =A0 =A0 =A0 =A0fileSystem.mkdirs(path); + + =A0 =A0} + + =A0 =A0public void createFile(String fileName) throws IOException { + + =A0 =A0 =A0 =A0Path path =3D new Path(fileName); + =A0 =A0 =A0 =A0if (fileSystem.exists(path)) { + =A0 =A0 =A0 =A0 =A0 =A0return; + =A0 =A0 =A0 =A0} + + =A0 =A0 =A0 =A0fileSystem.create(path); + + =A0 =A0} + + +} diff --git a/awsapi/src/com/cloud/bridge/io/HdfsRangeDataSource.java b/awsapi/src/com/cloud/bridge/io/HdfsRangeDataSource.java new file mode 100644 index 0000000..0f585ff --- /dev/null +++ b/awsapi/src/com/cloud/bridge/io/HdfsRangeDataSource.java @@ -0,0 +1,60 @@ +/* + * Copyright (C) 2011 Citrix Systems, Inc. =A0All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * =A0 =A0 =A0http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied= . + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.cloud.bridge.io; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import javax.activation.DataSource; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.Logger; + + +public class HdfsRangeDataSource implements DataSource { + =A0 =A0protected final static Logger logger =3D Logger.getLogger(HdfsRangeDataSource.class); + + =A0 =A0 =A0 private HdfsRangeInputStream is; + + =A0 =A0 =A0 public HdfsRangeDataSource(FileSystem fs, String src, long startPos, long endPos) throws IOException { + =A0 =A0 =A0 =A0 =A0 =A0 =A0 is =3D new HdfsRangeInputStream(fs, new Path(= src), startPos, endPos); + =A0 =A0 =A0 } + + =A0 =A0 =A0 @Override + =A0 =A0 =A0 public String getContentType() { + =A0 =A0 =A0 =A0 =A0 =A0 =A0 assert(false); + =A0 =A0 =A0 =A0 =A0 =A0 =A0 return null; + =A0 =A0 =A0 } + + =A0 =A0 =A0 @Override + =A0 =A0 =A0 public InputStream getInputStream() throws IOException { + =A0 =A0 =A0 =A0 =A0 =A0 =A0 return is; + =A0 =A0 =A0 } + + =A0 =A0 =A0 @Override + =A0 =A0 =A0 public String getName() { + =A0 =A0 =A0 =A0 =A0 =A0 =A0 assert(false); + =A0 =A0 =A0 =A0 =A0 =A0 =A0 return null; + =A0 =A0 =A0 } + + =A0 =A0 =A0 @Override + =A0 =A0 =A0 public OutputStream getOutputStream() throws IOException { + =A0 =A0 =A0 =A0 =A0 =A0 =A0 assert(false); + =A0 =A0 =A0 =A0 =A0 =A0 =A0 return null; + =A0 =A0 =A0 } +} diff --git a/awsapi/src/com/cloud/bridge/io/HdfsRangeInputStream.java b/awsapi/src/com/cloud/bridge/io/HdfsRangeInputStream.java new file mode 100644 index 0000000..1b4d2e2 --- /dev/null +++ b/awsapi/src/com/cloud/bridge/io/HdfsRangeInputStream.java @@ -0,0 +1,102 @@ +/* + * Copyright (C) 2011 Citrix Systems, Inc. =A0All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * =A0 =A0 =A0http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied= . + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.cloud.bridge.io; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +/** + * @author Kelven Yang + */ +public class HdfsRangeInputStream extends InputStream { + =A0 =A0 =A0 private FSDataInputStream in; + =A0 =A0 =A0 private long curPos; + =A0 =A0 =A0 private long endPos; + =A0 =A0 =A0 private long fileLength; + + =A0 =A0 =A0 public HdfsRangeInputStream(FileSystem hdfs, Path src, long startPos, long endPos) throws IOException { + =A0 =A0 =A0 =A0 =A0 =A0 =A0 FileStatus status =3D hdfs.getFileStatus(src)= ; + =A0 =A0 =A0 =A0 =A0 =A0 =A0 fileLength =3D status.getLen(); + + =A0 =A0 =A0 =A0 =A0 =A0 =A0 if(startPos > fileLength) + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 startPos =3D fileLength; + + =A0 =A0 =A0 =A0 =A0 =A0 =A0 if(endPos > fileLength) + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 endPos =3D fileLength; + + =A0 =A0 =A0 =A0 =A0 =A0 =A0 if(startPos > endPos) + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 throw new IllegalArgumentExce= ption("Invalid file range " + startPos + "-" + endPos); + + =A0 =A0 =A0 =A0 =A0 =A0 =A0 this.curPos =3D startPos; + =A0 =A0 =A0 =A0 =A0 =A0 =A0 this.endPos =3D endPos; + =A0 =A0 =A0 =A0 =A0 =A0 =A0 in =3D hdfs.open(src); + + =A0 =A0 =A0 =A0 =A0 =A0 =A0 in.seek(startPos); + =A0 =A0 =A0 } + + =A0 =A0 =A0 @Override + =A0 =A0 =A0 public int available() throws IOException { + =A0 =A0 =A0 =A0 =A0 =A0 =A0 return (int)(endPos - curPos); + =A0 =A0 =A0 } + + =A0 =A0 =A0 @Override + =A0 =A0 =A0 public int read() throws IOException { + =A0 =A0 =A0 =A0 =A0 =A0 =A0 if(available() > 0) { + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 int value =3D in.read(); + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 curPos++; + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 return value; + =A0 =A0 =A0 =A0 =A0 =A0 =A0 } + =A0 =A0 =A0 =A0 =A0 =A0 =A0 return -1; + =A0 =A0 =A0 } + + =A0 =A0 =A0 @Override + =A0 =A0 =A0 public int read(byte[] b) throws IOException { + =A0 =A0 =A0 =A0 =A0 =A0 =A0 return read(b, 0, b.length); + =A0 =A0 =A0 } + + =A0 =A0 =A0 @Override + =A0 =A0 =A0 public int read(byte[] b, int off, int len) throws IOExceptio= n { + =A0 =A0 =A0 =A0 =A0 =A0 =A0 int bytesToRead =3D Math.min(len, available()= ); + =A0 =A0 =A0 =A0 =A0 =A0 =A0 if(bytesToRead =3D=3D 0) + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 return -1; + + =A0 =A0 =A0 =A0 =A0 =A0 =A0 int bytesRead =3D in.read(b, off, bytesToRead= ); + =A0 =A0 =A0 =A0 =A0 =A0 =A0 if(bytesRead < 0) + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 return -1; + + =A0 =A0 =A0 =A0 =A0 =A0 =A0 curPos +=3D bytesRead; + =A0 =A0 =A0 =A0 =A0 =A0 =A0 return bytesRead; + =A0 =A0 =A0 } + + =A0 =A0 =A0 @Override + =A0 =A0 =A0 public long skip(long n) throws IOException { + =A0 =A0 =A0 =A0 =A0 =A0 =A0 long skipped =3D Math.min(n, available()); + =A0 =A0 =A0 =A0 =A0 =A0 =A0 in.skipBytes((int)skipped); + =A0 =A0 =A0 =A0 =A0 =A0 =A0 curPos +=3D skipped; + =A0 =A0 =A0 =A0 =A0 =A0 =A0 return skipped; + =A0 =A0 =A0 } + + =A0 =A0 =A0 @Override + =A0 =A0 =A0 public void close() throws IOException { + =A0 =A0 =A0 =A0 =A0 =A0 =A0 in.close(); + =A0 =A0 =A0 } +} diff --git a/deps/awsapi-lib/hadoop-common-0.23.1.jar b/deps/awsapi-lib/hadoop-common-0.23.1.jar new file mode 100755 index 0000000..c73f20c Binary files /dev/null and b/deps/awsapi-lib/hadoop-common-0.23.1.jar diffe= r diff --git a/deps/awsapi-lib/hadoop-hdfs-0.23.1.jar b/deps/awsapi-lib/hadoop-hdfs-0.23.1.jar new file mode 100755 index 0000000..e3bb8c3 Binary files /dev/null and b/deps/awsapi-lib/hadoop-hdfs-0.23.1.jar differ ----------------------------------------------------------------------- hooks/post-receive -- CloudStack OSS