incubator-deltacloud-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mandr...@redhat.com
Subject ['PATCH' 3/4] Adds driver methods and helpers for blobstore api - rackspace cloudfiles, amazon s3, micfosoft azure. Also helper method for 'streaming' using (thin) eventmachine async callback.
Date Mon, 20 Sep 2010 18:52:18 GMT
From: marios <marios@redhat.com>

---
 server/lib/deltacloud/base_driver/base_driver.rb   |   30 +++++
 server/lib/deltacloud/base_driver/features.rb      |    7 +
 .../lib/deltacloud/drivers/azure/azure_driver.rb   |  127 ++++++++++++++++++++
 server/lib/deltacloud/drivers/ec2/ec2_driver.rb    |  111 +++++++++++++++++-
 .../drivers/rackspace/rackspace_driver.rb          |  115 ++++++++++++++++--
 .../lib/deltacloud/helpers/application_helper.rb   |    7 +
 server/lib/deltacloud/helpers/blob_stream.rb       |   51 ++++++++
 server/lib/drivers.rb                              |    1 +
 8 files changed, 439 insertions(+), 10 deletions(-)
 create mode 100644 server/lib/deltacloud/drivers/azure/azure_driver.rb
 create mode 100644 server/lib/deltacloud/helpers/blob_stream.rb

diff --git a/server/lib/deltacloud/base_driver/base_driver.rb b/server/lib/deltacloud/base_driver/base_driver.rb
index 88563a5..639b13e 100644
--- a/server/lib/deltacloud/base_driver/base_driver.rb
+++ b/server/lib/deltacloud/base_driver/base_driver.rb
@@ -185,6 +185,36 @@ module Deltacloud
       []
     end
 
+    def buckets(credentials, opts = nil)
+      #list of buckets belonging to account
+      []
+    end
+
+    def bucket(credentials, opts = nil)
+    #list of objects within bucket
+      list = buckets(credentials, opts)
+      return list.first unless list.empty?
+      nil
+    end
+    
+    def create_bucket(credentials, name, opts=nil)
+    end
+    
+    def delete_bucket(credentials, name, opts=nil)
+    end
+    
+    def blobs(credentials, opts = nil)
+      []
+    end
+    
+    def blob(credentials, opts = nil)
+       list = blobs(credentials, opts)
+       return list.first unless list.empty?
+    end
+    
+    def blob_data(credentials, bucket_id, blob_id, opts)
+    end
+
     def filter_on(collection, attribute, opts)
       return collection if opts.nil?
       return collection if opts[attribute].nil?
diff --git a/server/lib/deltacloud/base_driver/features.rb b/server/lib/deltacloud/base_driver/features.rb
index 3ed4085..3b19ca2 100644
--- a/server/lib/deltacloud/base_driver/features.rb
+++ b/server/lib/deltacloud/base_driver/features.rb
@@ -162,5 +162,12 @@ module Deltacloud
       description "Size instances according to changes to a hardware profile"
       # The parameters are filled in from the hardware profiles
     end
+
+    declare_feature :buckets, :bucket_location do
+      description "Take extra location parameter for Bucket creation (e.g. S3, 'eu' or 'us-west-1')"
+      operation :create do
+        param :location, :string, :optional
+      end
+    end
   end
 end
diff --git a/server/lib/deltacloud/drivers/azure/azure_driver.rb b/server/lib/deltacloud/drivers/azure/azure_driver.rb
new file mode 100644
index 0000000..ab4caeb
--- /dev/null
+++ b/server/lib/deltacloud/drivers/azure/azure_driver.rb
@@ -0,0 +1,127 @@
+#
+# Copyright (C) 2010  Red Hat, Inc.
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.  The
+# ASF licenses this file to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance with the
+# License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+#Windows Azure (WAZ) gem at http://github.com/johnnyhalife/waz-storage
+require 'waz-blobs'
+require 'deltacloud/base_driver'
+module Deltacloud
+  module Drivers
+    module Azure
+
+class AzureDriver < Deltacloud::BaseDriver
+
+  def supported_collections; [:buckets] 
+  end
+  
+#--
+# Buckets
+#--
+  def buckets(credentials, opts)
+    buckets = []
+    azure_connect(credentials)
+    safely do
+      WAZ::Blobs::Container.list.each do |waz_container|
+        buckets << convert_container(waz_container)
+      end
+    end
+    buckets = filter_on(buckets, :id, opts)
+  end
+
+#--
+# Create bucket
+#--
+  def create_bucket(credentials, name, opts)
+    bucket = nil
+    azure_connect(credentials)
+    safely do
+      waz_container = WAZ::Blobs::Container.create(name)
+      bucket = convert_container(waz_container)
+    end
+    bucket
+  end
+
+#--
+# Delete bucket
+#--
+  def delete_bucket(credentials, name, opts)
+    azure_connect(credentials)
+    safely do
+      WAZ::Blobs::Container.find(name).destroy!
+    end
+  end
+  
+#--
+# Blobs
+#--
+  def blobs(credentials, opts)
+    blob_list = []
+    azure_connect(credentials)
+    safely do
+      the_bucket = WAZ::Blobs::Container.find(opts['bucket'])
+      the_bucket.blobs.each do |waz_blob|
+        blob_list << convert_blob(waz_blob)
+      end
+    end
+    blob_list = filter_on(blob_list, :id, opts)
+    blob_list
+  end
+  
+  def blob_data(credentials, bucket_id, blob_id, opts)
+    azure_connect(credentials)
+    # WAZ get blob data methods cant accept blocks for 'streaming'... FIXME
+      yield WAZ::Blobs::Container.find(bucket_id)[blob_id].value 
+  end
+  
+  private
+  
+  def azure_connect(credentials)
+    options = {:account_name => credentials.user, :access_key => credentials.password}
+    safely do
+      WAZ::Storage::Base.establish_connection!(options)
+    end
+  end
+
+  def convert_container(waz_container)
+    blob_list = []
+    waz_container.blobs.each do |blob|
+      blob_list << blob.name
+    end
+    Bucket.new({ :id => waz_container.name, 
+                    :name => waz_container.name,
+                    :size => blob_list.size,
+                    :blob_list => blob_list
+                  })  
+  end
+
+  def convert_blob(waz_blob)
+    url = waz_blob.url.split('/')
+    bucket = url[url.length-2] #FIXME
+    Blob.new({   :id => waz_blob.name,
+                 :bucket => bucket,
+                 :content_length => waz_blob.metadata[:content_length],
+                 :content_type => waz_blob.metadata[:content_type],
+                 :last_modified => waz_blob.metadata[:last_modified]
+              })
+  end
+
+
+end
+
+    end #module Azure
+  end #module Drivers
+end #module Deltacloud
diff --git a/server/lib/deltacloud/drivers/ec2/ec2_driver.rb b/server/lib/deltacloud/drivers/ec2/ec2_driver.rb
index 5e57f4e..1e5032d 100644
--- a/server/lib/deltacloud/drivers/ec2/ec2_driver.rb
+++ b/server/lib/deltacloud/drivers/ec2/ec2_driver.rb
@@ -19,6 +19,7 @@
 
 require 'deltacloud/base_driver'
 require 'AWS'
+require 'right_aws'
 
 class Instance
   attr_accessor :keyname
@@ -36,12 +37,13 @@ module Deltacloud
 class EC2Driver < Deltacloud::BaseDriver
 
   def supported_collections
-    DEFAULT_COLLECTIONS + [ :keys ]
+    DEFAULT_COLLECTIONS + [ :keys, :buckets ]
   end
 
   feature :instances, :user_data
   feature :instances, :authentication_key
   feature :images, :owner_id
+  feature :buckets, :bucket_location 
 
   define_hardware_profile('m1.small') do
     cpu                1
@@ -319,6 +321,83 @@ class EC2Driver < Deltacloud::BaseDriver
     return realms ? true : false
   end
 
+#--
+# Buckets 
+#-- get a list of your buckets from the s3 service
+  def buckets(credentials, opts)
+    buckets = []
+    safely do
+      s3_client = s3_client(credentials)
+      bucket_list = s3_client.buckets
+      bucket_list.each do |current|
+        buckets << convert_bucket(current)
+      end
+    end
+    buckets = filter_on(buckets, :id, opts) 
+    buckets
+  end
+
+#--
+# Create bucket
+#-- 
+#valid values for bucket location: 'EU'|'us-west1'|'ap-southeast-1' - if you
+#don't specify a location then by default buckets are created in 'us-east'
+#[but if you *do* specify 'us-east' things blow up] 
+  def create_bucket(credentials, name, opts={})
+    bucket = nil
+    safely do
+      begin 
+        s3_client = s3_client(credentials)
+        bucket_location = opts['location']
+        if bucket_location
+          bucket = RightAws::S3::Bucket.create(s3_client, name, true, nil, :location =>
bucket_location)
+        else
+          bucket = RightAws::S3::Bucket.create(s3_client, name, true)
+        end #if
+        rescue RightAws::AwsError => e
+          raise e unless e.message =~ /BucketAlreadyExists/
+          raise Deltacloud::BackendError.new(409, e.class.to_s, e.message, e.backtrace) 
+      end #begin
+    end #do
+    convert_bucket(bucket)
+  end
+
+#--
+# Delete_bucket
+#--
+  def delete_bucket(credentials, name, opts={})
+    s3_client = s3_client(credentials)
+    safely do
+      s3_client.interface.delete_bucket(name)
+    end
+  end
+
+#--
+# Blobs 
+#--
+  def blobs(credentials, opts = nil)
+    s3_client = s3_client(credentials)
+    blobs = []
+    safely do
+      s3_bucket = s3_client.bucket(opts['bucket'])
+      s3_bucket.keys({}, true).each do |s3_object|
+        blobs << convert_object(s3_object)
+      end
+    end
+    blobs = filter_on(blobs, :id, opts)
+    blobs
+  end
+
+#--
+# Blob data
+#--  
+  def blob_data(credentials, bucket_id, blob_id, opts)
+    s3_client = s3_client(credentials)
+    s3_client.interface.get(bucket_id, blob_id) do |chunk|
+      yield chunk
+    end
+  end
+
   private
 
   def new_client(credentials)
@@ -404,6 +483,36 @@ class EC2Driver < Deltacloud::BaseDriver
     } )
   end
 
+  def s3_client(credentials)
+    safely do
+      s3_client = RightAws::S3.new(credentials.user, credentials.password)
+    end
+  end
+
+  def convert_bucket(s3_bucket)
+    #get blob list:
+    blob_list = []
+    s3_bucket.keys.each do |s3_object|
+      blob_list << s3_object.name
+    end
+    #can use AWS::S3::Owner.current.display_name or current.id
+    Bucket.new(  { :id => s3_bucket.name, 
+                      :name => s3_bucket.name,
+                      :size => s3_bucket.keys.length,
+                      :blob_list => blob_list
+                    }
+                 )
+  end
+  
+  def convert_object(s3_object)
+    Blob.new({   :id => s3_object.name,
+                 :bucket => s3_object.bucket.name.to_s,
+                 :content_length => s3_object.size,
+                 :content_type => s3_object.content_type,
+                 :last_modified => s3_object.last_modified
+              })
+  end
+
   def catched_exceptions_list
     {
       :auth => [ AWS::AuthFailure ],
diff --git a/server/lib/deltacloud/drivers/rackspace/rackspace_driver.rb b/server/lib/deltacloud/drivers/rackspace/rackspace_driver.rb
index 7926600..f8115a0 100644
--- a/server/lib/deltacloud/drivers/rackspace/rackspace_driver.rb
+++ b/server/lib/deltacloud/drivers/rackspace/rackspace_driver.rb
@@ -18,6 +18,7 @@
 
 require 'deltacloud/base_driver'
 require 'deltacloud/drivers/rackspace/rackspace_client'
+require 'cloudfiles'
 
 module Deltacloud
   module Drivers
@@ -27,6 +28,10 @@ class RackspaceDriver < Deltacloud::BaseDriver
 
   feature :instances, :user_name
 
+  def supported_collections
+    DEFAULT_COLLECTIONS + [ :buckets ]
+  end
+
   def hardware_profiles(credentials, opts = nil)
     racks = new_client( credentials )
     safely do
@@ -141,6 +146,88 @@ class RackspaceDriver < Deltacloud::BaseDriver
   end
 
 
+ 
+
+  define_instance_states do
+    start.to( :pending )          .on( :create )
+
+    pending.to( :running )        .automatically
+
+    running.to( :running )        .on( :reboot )
+    running.to( :shutting_down )  .on( :stop )
+
+    shutting_down.to( :stopped )  .automatically
+
+    stopped.to( :finish )         .automatically
+  end
+
+#--
+# Buckets
+#--
+  def buckets(credentials, opts)
+    bucket_list = []
+    cf = cloudfiles_client(credentials)
+    safely do
+      cf.containers.each do |container_name|
+        current = cf.container(container_name)
+        bucket_list << convert_container(current)
+      end #containers.each
+    end #safely
+    bucket_list = filter_on(bucket_list, :id, opts)
+    bucket_list
+  end
+
+#--
+# Create Bucket
+#--
+  def create_bucket(credentials, name, opts)
+    bucket = nil
+    cf = cloudfiles_client(credentials)
+    safely do
+      new_bucket = cf.create_container(name)
+      bucket = convert_container(new_bucket)
+    end
+    bucket
+  end
+
+#--
+# Delete Bucket
+#--  
+  def delete_bucket(credentials, name, opts)
+    cf = cloudfiles_client(credentials)
+    safely do
+      cf.delete_container(name)
+    end
+  end
+
+#--
+# Blobs
+#--
+  def blobs(credentials, opts)
+    cf = cloudfiles_client(credentials)
+    blobs = []
+    safely do
+      cf_container = cf.container(opts['bucket'])
+      cf_container.objects.each do |object_name|
+        blobs << convert_object(cf_container.object(object_name))
+      end
+    end
+    blobs = filter_on(blobs, :id, opts)
+    blobs
+  end
+
+#-
+# Blob data
+#-  
+  def blob_data(credentials, bucket_id, blob_id, opts)
+    cf = cloudfiles_client(credentials)
+    cf.container(bucket_id).object(blob_id).data_stream do |chunk|
+      yield chunk 
+    end
+  end  
+
+private
+
   def convert_srv_to_instance(srv)
     inst = Instance.new(:id => srv["id"].to_s,
                         :owner_id => "root",
@@ -163,17 +250,27 @@ class RackspaceDriver < Deltacloud::BaseDriver
     end
   end
 
-  define_instance_states do
-    start.to( :pending )          .on( :create )
-
-    pending.to( :running )        .automatically
-
-    running.to( :running )        .on( :reboot )
-    running.to( :shutting_down )  .on( :stop )
+  def convert_container(cf_container)
+    Bucket.new({ :id => cf_container.name, 
+                    :name => cf_container.name,
+                    :size => cf_container.count,
+                    :blob_list => cf_container.objects
+                 }) 
+  end
 
-    shutting_down.to( :stopped )  .automatically
+  def convert_object(cf_object)
+    Blob.new({   :id => cf_object.name,
+                 :bucket => cf_object.container.name,
+                 :content_length => cf_object.bytes,
+                 :content_type => cf_object.content_type,
+                 :last_modified => cf_object.last_modified
+              })
+  end
 
-    stopped.to( :finish )         .automatically
+  def cloudfiles_client(credentials)
+    safely do
+      CloudFiles::Connection.new(credentials.user, credentials.password)
+    end
   end
 
   def safely(&block)
diff --git a/server/lib/deltacloud/helpers/application_helper.rb b/server/lib/deltacloud/helpers/application_helper.rb
index 9a9dfdc..35daf67 100644
--- a/server/lib/deltacloud/helpers/application_helper.rb
+++ b/server/lib/deltacloud/helpers/application_helper.rb
@@ -54,6 +54,13 @@ module ApplicationHelper
     return 'password' if driver_has_feature?(:authentication_password)
   end
 
+  def driver_has_bucket_location_feature?
+    driver.features(:buckets).each do |feat|
+      return true if feat.name == :bucket_location
+    end
+    false
+  end
+
   def filter_all(model)
       filter = {}
       filter.merge!(:id => params[:id]) if params[:id]
diff --git a/server/lib/deltacloud/helpers/blob_stream.rb b/server/lib/deltacloud/helpers/blob_stream.rb
new file mode 100644
index 0000000..41b2cef
--- /dev/null
+++ b/server/lib/deltacloud/helpers/blob_stream.rb
@@ -0,0 +1,51 @@
+# Copyright (C) 2010  Red Hat, Inc.
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.  The
+# ASF licenses this file to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance with the
+# License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+#--
+# based on the example from http://macournoyer.com/blog/2009/06/04/pusher-and-async-with-thin/
+#--
+
+class BlobStream
+  AsyncResponse = [-1, {}, []].freeze
+  def self.call(env, credentials, params)
+    body = DeferrableBody.new
+        #Get the headers out asap. Don't specify a content-type let
+        #the client guess and if they can't they SHOULD default to
+        #'application/octet-stream' anyway as per:
+        #http://www.w3.org/Protocols/rfc2616/rfc2616-sec7.html#sec7.2.1
+    EM.next_tick { env['async.callback'].call [200, {}, body] }
+    #call the driver from here. the driver method yields for every chunk of blob it receives.
We then
+    #use body.call to write that chunk as received.
+    driver.blob_data(credentials, params[:bucket], params[:blob], params) {|chunk| body.call
["#{chunk}"]} #close blob_data block
+    body.succeed
+    AsyncResponse # Tells Thin to not close the connection and continue it's work on other
request
+  end
+end
+
+class DeferrableBody
+  include EventMachine::Deferrable
+
+  def call(body)
+    body.each do |chunk|
+      @body_callback.call(chunk)
+    end
+  end
+
+  def each(&blk)
+    @body_callback = blk
+  end
+end
diff --git a/server/lib/drivers.rb b/server/lib/drivers.rb
index 261e0c8..6e31bb7 100644
--- a/server/lib/drivers.rb
+++ b/server/lib/drivers.rb
@@ -6,6 +6,7 @@ DRIVERS = {
   :rimuhosting => { :name => "RimuHosting"},
   :opennebula => { :name => "Opennebula", :class => "OpennebulaDriver" },
   :terremark => { :name => "Terremark"},
+  :azure => { :name => "Azure" },
   :mock => { :name => "Mock" }
 }
 
-- 
1.7.2.3


Mime
View raw message