incubator-deltacloud-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mar...@redhat.com
Subject [PATCH] Adds blob streaming uploads using PUT (stream through thin+deltacloud)
Date Tue, 07 Jun 2011 17:45:24 GMT
From: marios <marios@redhat.com>


Signed-off-by: marios <marios@redhat.com>
---
 server/lib/deltacloud/drivers/ec2/ec2_driver.rb |   51 +++++++-
 server/lib/deltacloud/helpers/blob_stream.rb    |  155 +++++++++++++++++++++--
 server/server.rb                                |   56 +++++++--
 server/views/blobs/new.html.haml                |   14 +-
 4 files changed, 242 insertions(+), 34 deletions(-)

diff --git a/server/lib/deltacloud/drivers/ec2/ec2_driver.rb b/server/lib/deltacloud/drivers/ec2/ec2_driver.rb
index 4edd989..7f1e021 100644
--- a/server/lib/deltacloud/drivers/ec2/ec2_driver.rb
+++ b/server/lib/deltacloud/drivers/ec2/ec2_driver.rb
@@ -355,9 +355,9 @@ module Deltacloud
               bucket_list = s3_client.buckets
               bucket_list.each do |current|
                 buckets << Bucket.new({:name => current.name, :id => current.name})
-              end #bucket_list.each
-            end #if
-          end #safely
+              end
+            end
+          end
           filter_on(buckets, :id, opts)
         end
 
@@ -387,8 +387,12 @@ module Deltacloud
           blobs = []
           safely do
             s3_bucket = s3_client.bucket(opts['bucket'])
-            s3_bucket.keys({}, true).each do |s3_object|
-              blobs << convert_object(s3_object)
+            unless(opts[:id]).nil?
+              blobs << convert_object(s3_bucket.key(opts[:id], true))
+            else
+              s3_bucket.keys({}, true).each do |s3_object|
+                blobs << convert_object(s3_object)
+              end
             end
           end
           blobs = filter_on(blobs, :id, opts)
@@ -396,7 +400,7 @@ module Deltacloud
         end
 
         #--
-        # Create Blob
+        # Create Blob - NON Streaming way (i.e. was called with POST html multipart form
data)
         #--
         def create_blob(credentials, bucket_id, blob_id, data = nil, opts = {})
           s3_client = new_client(credentials, :s3)
@@ -445,6 +449,40 @@ module Deltacloud
           end
         end
 
+        #params: {'user','password','bucket','blob','content_type', 'content_length', 'metadata'}
+        def blob_stream_connection(params)
+          #canonicalise metadata:
+          #http://docs.amazonwebservices.com/AmazonS3/latest/dev/index.html?RESTAuthentication.html
+          metadata = params['metadata']
+          signature_meta_string = ""
+          unless metadata.nil?
+            metadata.gsub_keys('HTTP[-_]X[-_]Deltacloud[-_]Blobmeta[-_]', 'x-amz-meta-')
+            keys_array = metadata.keys.sort!
+            keys_array.each {|k| signature_meta_string << "#{k}:#{metadata[k]}\n"}
+          end
+          #s3.amazonaws.com
+          provider = "https://#{Deltacloud::Drivers::driver_config[:ec2][:entrypoints]['s3']['us-east-1']}"
+          uri = URI.parse(provider)
+          http = Net::HTTP.new("#{params['bucket']}.#{uri.host}", uri.port )
+          http.use_ssl = true
+          http.verify_mode = OpenSSL::SSL::VERIFY_NONE
+          timestamp = Time.now.httpdate
+          string_to_sign =
+            "PUT\n\n#{params['content_type']}\n#{timestamp}\n#{signature_meta_string}/#{params['bucket']}/#{params['blob']}"
+          auth_string = Aws::Utils::sign(params['password'], string_to_sign)
+          request = Net::HTTP::Put.new("/#{params['blob']}")
+          request['Host'] = "#{params['bucket']}.#{uri.host}"
+          request['Date'] = timestamp
+          request['Content-Type'] = params['content_type']
+          request['Content-Length'] = params['content_length']
+          request['Authorization'] = "AWS #{params['user']}:#{auth_string}"
+          request['Expect'] = "100-continue"
+          unless metadata.nil?
+            metadata.each{|k,v| request[k] = v}
+          end
+          return http, request
+        end
+
         def storage_volumes(credentials, opts={})
           ec2 = new_client( credentials )
           volume_list = (opts and opts[:id]) ? opts[:id] : nil
@@ -582,7 +620,6 @@ module Deltacloud
         end
 
         private
-
         def new_client(credentials, type = :ec2)
           klass = case type
                     when :elb then Aws::Elb
diff --git a/server/lib/deltacloud/helpers/blob_stream.rb b/server/lib/deltacloud/helpers/blob_stream.rb
index 00879a9..355a66a 100644
--- a/server/lib/deltacloud/helpers/blob_stream.rb
+++ b/server/lib/deltacloud/helpers/blob_stream.rb
@@ -17,7 +17,8 @@ include Deltacloud
 begin
   require 'eventmachine'
   #--
-  # based on the example from http://macournoyer.com/blog/2009/06/04/pusher-and-async-with-thin/
+  # based on the example from
+  #   http://macournoyer.com/blog/2009/06/04/pusher-and-async-with-thin/
   #--
   class BlobStream
     AsyncResponse = [-1, {}, []].freeze
@@ -32,11 +33,11 @@ begin
         'Content-Disposition' => params["content_disposition"],
         'Content-Length' => "#{params['content_length']}"}, body]
       }
-      #call the driver from here. the driver method yields for every chunk of blob it receives.
We then
-      #use body.call to write that chunk as received.
+      #call the driver from here. the driver method yields for every chunk
+      #of blob it receives. Then use body.call to write that chunk as received.
       driver.blob_data(credentials, params[:bucket], params[:blob], params) {|chunk| body.call
["#{chunk}"]} #close blob_data block
       body.succeed
-      AsyncResponse # Tells Thin to not close the connection and continue it's work on other
request
+      AsyncResponse # Tell Thin to not close connection & work other requests
     end
   end
 
@@ -69,13 +70,149 @@ class Hash
     remove = []
     self.each_key do |key|
       if key.to_s.match(rgx_pattern)
-         new_key = key.to_s.gsub(rgx_pattern, replacement)
+         new_key = key.to_s.gsub(rgx_pattern, replacement).downcase
          self[new_key] = self[key]
          remove << key
-      end #key.match
-    end # each_key do
+      end
+    end
     #remove the original keys
     self.delete_if{|k,v| remove.include?(k)}
-  end #def
+  end
+
+end
+
+#Monkey patch for streaming blobs:
+# Normally a client will upload a blob to deltacloud and thin will put
+# this into a tempfile. Then deltacloud would stream up to the provider:
+#   i.e.  client =-->>TEMP_FILE-->> deltacloud =-->>STREAM-->> provider
+# Instead we want to recognise that this is a 'PUT blob' operation and
+# start streaming to the provider as the request is received:
+#   i.e.  client =-->>STREAM-->> deltacloud =-->>STREAM-->> provider
+module Thin
+  class Request
+
+    alias_method :move_body_to_tempfile_orig, :move_body_to_tempfile if defined?(Thin::Response)
+    private
+      def move_body_to_tempfile
+        if BlobStreamIO::is_put_blob(self)
+          @body = BlobStreamIO.new(self)
+        else
+          move_body_to_tempfile_orig
+        end
+      end
+
+  end
+end
+
+require 'net/http'
+#monkey patch for Net:HTTP
+module Net
+  class HTTP
 
-end #class
+    alias :request_orig :request
+
+    @blob_req # needs global scope for close op later
+
+    def request(req, body = nil, blob_stream = nil, &block)
+      unless blob_stream
+        return request_orig(req, body, &block)
+      end
+      @blob_req = req
+      do_start #start the connection
+
+      req.set_body_internal body
+      begin_transport req
+      req.write_header_m @socket,@curr_http_version, edit_path(req.path)
+      @socket
+    end
+
+    class Put < HTTPRequest
+      def write_header_m(sock, ver, path)
+        write_header(sock, ver, path)
+      end
+    end
+
+    def end_request
+      begin
+      res = HTTPResponse.read_new(@socket)
+      end while res.kind_of?(HTTPContinue)
+      res.reading_body(@socket, @blob_req.response_body_permitted?) {
+        yield res if block_given?
+       }
+  end_transport @blob_req, res
+      do_finish
+      res
+    end
+  end
+
+end
+
+require 'base64'
+class BlobStreamIO
+
+  attr_accessor :size, :provider, :sock
+
+  def initialize(request)
+    @client_request = request
+    @size = 0
+    bucket, blob = parse_bucket_blob(request.env["PATH_INFO"])
+    user, password = parse_credentials(request.env['HTTP_AUTHORIZATION'])
+    content_type = request.env['CONTENT_TYPE'] || ""
+    #deal with blob_metadata: (X-Deltacloud-Blobmeta-name: value)
+    user_meta = {}
+    meta_array = request.env.select{|k,v| k.match(/^HTTP[-_]X[-_]Deltacloud[-_]Blobmeta[-_]/i)}
+    meta_array.inject({}){ |result, array| user_meta[array.first.upcase] = array.last}
+    @content_length = request.env['CONTENT_LENGTH']
+    @http, provider_request = driver.blob_stream_connection({'user'=>user,
+       'password'=>password, 'bucket'=>bucket, 'blob'=>blob, 'metadata'=> user_meta,
+       'content_type'=>content_type, 'content_length'=>@content_length })
+    @content_length = @content_length.to_i #for comparison of size in '<< (data)'
+    @sock = @http.request(provider_request, nil, true)
+  end
+
+  def << (data)
+    @sock.write(data)
+    @size += data.length
+    if (@size >= @content_length)
+      result = @http.end_request
+      if result.is_a?(Net::HTTPSuccess)
+        @client_request.env["BLOB_SUCCESS"] = "true"
+      else
+        @client_request.env["BLOB_FAIL"] = result.body
+      end
+    end
+  end
+
+  def rewind
+  end
+
+  #use the Request.env hash (populated by the ThinParser) to determine whether
+  #this is a post blob operation. By definition, only get here with a body of
+  # > 112kbytes - thin/lib/thin/request.rb:12 MAX_BODY = 1024 * (80 + 32)
+  def self.is_put_blob(request = nil)
+    path = request.env['PATH_INFO']
+    method = request.env['REQUEST_METHOD']
+    if ( path =~ /^#{Regexp.escape(Sinatra::UrlForHelper::DEFAULT_URI_PREFIX)}\/buckets/
&& method == 'PUT' )
+      return true
+    else
+      return false
+    end
+  end
+
+  private
+
+  def parse_bucket_blob(request_string)
+    array = request_string.split("/")
+    blob = array.pop
+    bucket = array.pop
+    return bucket, blob
+  end
+
+  def parse_credentials(request_string)
+    decoded = Base64.decode64(request_string.split('Basic ').last)
+    key = decoded.split(':').first
+    pass = decoded.split(':').last
+    return key, pass
+  end
+
+end
diff --git a/server/server.rb b/server/server.rb
index e332679..a8f5ee0 100644
--- a/server/server.rb
+++ b/server/server.rb
@@ -696,13 +696,46 @@ get "#{Sinatra::UrlForHelper::DEFAULT_URI_PREFIX}/buckets/:bucket/new_blob"
do
   end
 end
 
-#create a new blob
+#create a new blob using PUT - streams through deltacloud
+put "#{Sinatra::UrlForHelper::DEFAULT_URI_PREFIX}/buckets/:bucket/:blob" do
+  if(env["BLOB_SUCCESS"]) #ie got a 200ok after putting blob
+    content_type = env["CONTENT_TYPE"]
+    content_type ||=  ""
+    @blob = driver.blob(credentials, {:id => params[:blob],
+                                      'bucket' => params[:bucket]})
+    respond_to do |format|
+      format.html { haml :"blobs/show" }
+      format.xml { haml :"blobs/show" }
+      format.json { convert_to_json(:blobs, @blob) }
+    end
+  elsif(env["BLOB_FAIL"])
+    report_error(500) #OK?
+  else # small blobs - < 112kb dont hit the streaming monkey patch - redirect to POST
rule
+       # also, if running under webrick don't hit the streaming patch (Thin specific)
+    path = bucket_url(params[:bucket])
+    status, headers, body = call! env.merge({"PATH_INFO" => "#{Sinatra::UrlForHelper::DEFAULT_URI_PREFIX}/buckets/#{params[:bucket]}",
+                                             "REQUEST_METHOD" => "POST",
+                                             "DELTACLOUD_BLOB_NAME" => params[:blob]})
+  end
+end
+
+#create a new blob using html interface - NON STREAMING (i.e. browser POST http form data)
 post "#{Sinatra::UrlForHelper::DEFAULT_URI_PREFIX}/buckets/:bucket" do
   bucket_id = params[:bucket]
-  blob_id = params['blob_id']
-  blob_data = params['blob_data']
+  #check if we were passed here from the PUT method - with a small blob (<112kb)
+  if(env['DELTACLOUD_BLOB_NAME'])
+    blob_id = env['DELTACLOUD_BLOB_NAME']
+    temp_file = Tempfile.new("temp_blob_file")
+    temp_file.write(env['rack.input'].read)
+    temp_file.flush
+    content_type = env['CONTENT_TYPE'] || ""
+    blob_data = {:tempfile => temp_file, :type => content_type}
+  else
+    blob_id = params['blob']
+    blob_data = params['blob_data']
+  end
   user_meta = {}
-#first try get blob_metadata from params (i.e., passed by http form post, e.g. browser)
+  #first try metadata from params (i.e., passed by http form post, e.g. browser)
   max = params[:meta_params]
   if(max)
     (1..max.to_i).each do |i|
@@ -710,12 +743,13 @@ post "#{Sinatra::UrlForHelper::DEFAULT_URI_PREFIX}/buckets/:bucket"
do
       key = "HTTP_X_Deltacloud_Blobmeta_#{key}"
       value = params[:"meta_value#{i}"]
       user_meta[key] = value
-    end #max.each do
-  else #can try to get blob_metadata from http headers
-    meta_array = request.env.select{|k,v| k.match(/^HTTP[-_]X[-_]Deltacloud[-_]Blobmeta[-_]/i)}
-    meta_array.inject({}){ |result, array| user_meta[array.first.upcase] = array.last}
-  end #end if
+    end
+  else #can try to get blob_metadata from http headers (i.e. from PUT)
+      meta_array = request.env.select{|k,v| k.match(/^HTTP[-_]X[-_]Deltacloud[-_]Blobmeta[-_]/i)}
+      meta_array.inject({}){ |result, array| user_meta[array.first.upcase] = array.last}
+  end
   @blob = driver.create_blob(credentials, bucket_id, blob_id, blob_data, user_meta)
+  temp_file.delete if temp_file
   respond_to do |format|
     format.html { haml :"blobs/show"}
     format.xml { haml :"blobs/show" }
@@ -730,7 +764,7 @@ delete "#{Sinatra::UrlForHelper::DEFAULT_URI_PREFIX}/buckets/:bucket/:blob"
do
   respond_to do |format|
     format.xml {  204 }
     format.json {  204 }
-    format.html { bucket_url(bucket_id) }
+    format.html { redirect(bucket_url(bucket_id)) }
   end
 end
 
@@ -768,7 +802,7 @@ get "#{Sinatra::UrlForHelper::DEFAULT_URI_PREFIX}/buckets/:bucket/:blob"
do
     respond_to do |format|
       format.html { haml :"blobs/show" }
       format.xml { haml :"blobs/show" }
-      format.json { convert_to_json(blobs, @blob) }
+      format.json { convert_to_json(:blobs, @blob) }
       end
   else
       report_error(404)
diff --git a/server/views/blobs/new.html.haml b/server/views/blobs/new.html.haml
index a075f0a..bf5c6f5 100644
--- a/server/views/blobs/new.html.haml
+++ b/server/views/blobs/new.html.haml
@@ -3,13 +3,7 @@
 %form{ :action => bucket_url(@bucket_id), :method => :post, :enctype => 'multipart/form-data'}
   %label
     Blob Name:
-    %input{ :name => 'blob_id', :size => 512}/
-  %label
-    Blob Data:
-    %br
-    %input{ :type => "file", :name => 'blob_data', :size => 50}/
-    %br
-    %br
+    %input{ :name => 'blob', :size => 512}/
   %input{ :type => "hidden", :name => "meta_params", :value => "0"}
   %a{ :href => "javascript:;", :onclick => "more_fields();"} Add Metadata
   %div{ :id => "metadata_holder", :style => "display: none;"}
@@ -23,4 +17,10 @@
   %a{ :href => "javascript:;", :onclick => "less_fields();"} Less Metadata
   %br
   %br
+  %label
+    Blob Data:
+    %br
+    %input{ :type => "file", :name => 'blob_data', :size => 50}/
+    %br
+    %br
   %input{ :type => :submit, :name => "commit", :value => "create"}/
-- 
1.7.3.4


Mime
View raw message