incubator-deltacloud-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From David Lutterkort <lut...@redhat.com>
Subject Re: [PATCH] Adds blob streaming uploads using PUT (stream through thin+deltacloud)
Date Wed, 08 Jun 2011 08:52:52 GMT
On Tue, 2011-06-07 at 20:45 +0300, marios@redhat.com wrote:
> From: marios <marios@redhat.com>
> 
> 
> Signed-off-by: marios <marios@redhat.com>

Almost ACK; there's a few stylistic things that should be done to
simplify the code:

> ---
>  server/lib/deltacloud/drivers/ec2/ec2_driver.rb |   51 +++++++-
>  server/lib/deltacloud/helpers/blob_stream.rb    |  155 +++++++++++++++++++++--
>  server/server.rb                                |   56 +++++++--
>  server/views/blobs/new.html.haml                |   14 +-
>  4 files changed, 242 insertions(+), 34 deletions(-)
> 
> diff --git a/server/lib/deltacloud/drivers/ec2/ec2_driver.rb b/server/lib/deltacloud/drivers/ec2/ec2_driver.rb
> index 4edd989..7f1e021 100644
> --- a/server/lib/deltacloud/drivers/ec2/ec2_driver.rb
> +++ b/server/lib/deltacloud/drivers/ec2/ec2_driver.rb
> @@ -355,9 +355,9 @@ module Deltacloud
>                bucket_list = s3_client.buckets
>                bucket_list.each do |current|
>                  buckets << Bucket.new({:name => current.name, :id => current.name})
> -              end #bucket_list.each
> -            end #if
> -          end #safely
> +              end
> +            end
> +          end
>            filter_on(buckets, :id, opts)
>          end
>  
> @@ -387,8 +387,12 @@ module Deltacloud
>            blobs = []
>            safely do
>              s3_bucket = s3_client.bucket(opts['bucket'])
> -            s3_bucket.keys({}, true).each do |s3_object|
> -              blobs << convert_object(s3_object)
> +            unless(opts[:id]).nil?

Better: 'if opts[:id]'

> +              blobs << convert_object(s3_bucket.key(opts[:id], true))
> +            else
> +              s3_bucket.keys({}, true).each do |s3_object|
> +                blobs << convert_object(s3_object)
> +              end
>              end
>            end
>            blobs = filter_on(blobs, :id, opts)

This and the previous hunk really belong into a different patch.

> @@ -396,7 +400,7 @@ module Deltacloud
>          end
>  
>          #--
> -        # Create Blob
> +        # Create Blob - NON Streaming way (i.e. was called with POST html multipart
form data)
>          #--
>          def create_blob(credentials, bucket_id, blob_id, data = nil, opts = {})
>            s3_client = new_client(credentials, :s3)
> @@ -445,6 +449,40 @@ module Deltacloud
>            end
>          end
>  
> +        #params: {'user','password','bucket','blob','content_type', 'content_length',
'metadata'}
> +        def blob_stream_connection(params)
> +          #canonicalise metadata:
> +          #http://docs.amazonwebservices.com/AmazonS3/latest/dev/index.html?RESTAuthentication.html
> +          metadata = params['metadata']
> +          signature_meta_string = ""
> +          unless metadata.nil?
> +            metadata.gsub_keys('HTTP[-_]X[-_]Deltacloud[-_]Blobmeta[-_]', 'x-amz-meta-')
> +            keys_array = metadata.keys.sort!
> +            keys_array.each {|k| signature_meta_string << "#{k}:#{metadata[k]}\n"}
> +          end
> +          #s3.amazonaws.com
> +          provider = "https://#{Deltacloud::Drivers::driver_config[:ec2][:entrypoints]['s3']['us-east-1']}"

This should use endpoint_for_service so that provider selection works as
expected.

> +          uri = URI.parse(provider)
> +          http = Net::HTTP.new("#{params['bucket']}.#{uri.host}", uri.port )
> +          http.use_ssl = true
> +          http.verify_mode = OpenSSL::SSL::VERIFY_NONE
> +          timestamp = Time.now.httpdate
> +          string_to_sign =
> +            "PUT\n\n#{params['content_type']}\n#{timestamp}\n#{signature_meta_string}/#{params['bucket']}/#{params['blob']}"
> +          auth_string = Aws::Utils::sign(params['password'], string_to_sign)
> +          request = Net::HTTP::Put.new("/#{params['blob']}")
> +          request['Host'] = "#{params['bucket']}.#{uri.host}"
> +          request['Date'] = timestamp
> +          request['Content-Type'] = params['content_type']
> +          request['Content-Length'] = params['content_length']
> +          request['Authorization'] = "AWS #{params['user']}:#{auth_string}"
> +          request['Expect'] = "100-continue"
> +          unless metadata.nil?
> +            metadata.each{|k,v| request[k] = v}
> +          end
> +          return http, request
> +        end
> +
>          def storage_volumes(credentials, opts={})
>            ec2 = new_client( credentials )
>            volume_list = (opts and opts[:id]) ? opts[:id] : nil
> @@ -582,7 +620,6 @@ module Deltacloud
>          end
>  
>          private
> -
>          def new_client(credentials, type = :ec2)
>            klass = case type
>                      when :elb then Aws::Elb
> diff --git a/server/lib/deltacloud/helpers/blob_stream.rb b/server/lib/deltacloud/helpers/blob_stream.rb
> index 00879a9..355a66a 100644
> --- a/server/lib/deltacloud/helpers/blob_stream.rb
> +++ b/server/lib/deltacloud/helpers/blob_stream.rb
> @@ -17,7 +17,8 @@ include Deltacloud
>  begin
>    require 'eventmachine'
>    #--
> -  # based on the example from http://macournoyer.com/blog/2009/06/04/pusher-and-async-with-thin/
> +  # based on the example from
> +  #   http://macournoyer.com/blog/2009/06/04/pusher-and-async-with-thin/
>    #--
>    class BlobStream
>      AsyncResponse = [-1, {}, []].freeze
> @@ -32,11 +33,11 @@ begin
>          'Content-Disposition' => params["content_disposition"],
>          'Content-Length' => "#{params['content_length']}"}, body]
>        }
> -      #call the driver from here. the driver method yields for every chunk of blob it
receives. We then
> -      #use body.call to write that chunk as received.
> +      #call the driver from here. the driver method yields for every chunk
> +      #of blob it receives. Then use body.call to write that chunk as received.
>        driver.blob_data(credentials, params[:bucket], params[:blob], params) {|chunk|
body.call ["#{chunk}"]} #close blob_data block
>        body.succeed
> -      AsyncResponse # Tells Thin to not close the connection and continue it's work
on other request
> +      AsyncResponse # Tell Thin to not close connection & work other requests
>      end
>    end

The last two hunks only change formatting and should really go into a
separate patch.

> @@ -69,13 +70,149 @@ class Hash
>      remove = []
>      self.each_key do |key|
>        if key.to_s.match(rgx_pattern)
> -         new_key = key.to_s.gsub(rgx_pattern, replacement)
> +         new_key = key.to_s.gsub(rgx_pattern, replacement).downcase
>           self[new_key] = self[key]
>           remove << key
> -      end #key.match
> -    end # each_key do
> +      end
> +    end
>      #remove the original keys
>      self.delete_if{|k,v| remove.include?(k)}
> -  end #def
> +  end
> +
> +end
> +
> +#Monkey patch for streaming blobs:
> +# Normally a client will upload a blob to deltacloud and thin will put
> +# this into a tempfile. Then deltacloud would stream up to the provider:
> +#   i.e.  client =-->>TEMP_FILE-->> deltacloud =-->>STREAM-->>
provider
> +# Instead we want to recognise that this is a 'PUT blob' operation and
> +# start streaming to the provider as the request is received:
> +#   i.e.  client =-->>STREAM-->> deltacloud =-->>STREAM-->>
provider
> +module Thin
> +  class Request
> +
> +    alias_method :move_body_to_tempfile_orig, :move_body_to_tempfile if defined?(Thin::Response)

Why dies this need to be made conditional ? If we are not running under
thin, it doesn't really matter what move_body_to_tempfile does, since
e.g. Webrick will never call it.

> +    private
> +      def move_body_to_tempfile
> +        if BlobStreamIO::is_put_blob(self)
> +          @body = BlobStreamIO.new(self)
> +        else
> +          move_body_to_tempfile_orig
> +        end
> +      end
> +
> +  end
> +end
> +
> +require 'net/http'
> +#monkey patch for Net:HTTP
> +module Net
> +  class HTTP
>  
> -end #class
> +    alias :request_orig :request
> +
> +    @blob_req # needs global scope for close op later

That line isn't needed.

> +    def request(req, body = nil, blob_stream = nil, &block)
> +      unless blob_stream
> +        return request_orig(req, body, &block)
> +      end
> +      @blob_req = req
> +      do_start #start the connection
> +
> +      req.set_body_internal body
> +      begin_transport req
> +      req.write_header_m @socket,@curr_http_version, edit_path(req.path)
> +      @socket
> +    end
> +
> +    class Put < HTTPRequest
> +      def write_header_m(sock, ver, path)
> +        write_header(sock, ver, path)
> +      end
> +    end
> +
> +    def end_request
> +      begin
> +      res = HTTPResponse.read_new(@socket)
> +      end while res.kind_of?(HTTPContinue)
> +      res.reading_body(@socket, @blob_req.response_body_permitted?) {
> +        yield res if block_given?
> +       }
> +  end_transport @blob_req, res

Spaces, please

> +      do_finish
> +      res
> +    end
> +  end
> +
> +end
> +
> +require 'base64'
> +class BlobStreamIO
> +
> +  attr_accessor :size, :provider, :sock
> +
> +  def initialize(request)
> +    @client_request = request
> +    @size = 0
> +    bucket, blob = parse_bucket_blob(request.env["PATH_INFO"])
> +    user, password = parse_credentials(request.env['HTTP_AUTHORIZATION'])
> +    content_type = request.env['CONTENT_TYPE'] || ""
> +    #deal with blob_metadata: (X-Deltacloud-Blobmeta-name: value)
> +    user_meta = {}
> +    meta_array = request.env.select{|k,v| k.match(/^HTTP[-_]X[-_]Deltacloud[-_]Blobmeta[-_]/i)}
> +    meta_array.inject({}){ |result, array| user_meta[array.first.upcase] = array.last}

The inject here doesn't do anything, since you never change result, and
never use the return value. Either use each or sth like

        user_meta = meta_array.inject({}){ |res, kv| res[kv.first.upcase] = kv.last; res }

Also, there's a lot of blob meta handling sprinkled through the code
(here, ec2 driver etc.) Not for this patch, but in general, it would be
cleaner to have some helper methods to deal with that. For example, a
helper method 'extract_blob_meta(headers, opts)' that does the above,
and takes an option to rename keys, so that in the ec2_driver you can
call

        headers = extract_blob_meta(request.env, :rename =>
        "x-amz-meta-")
        
> +    @content_length = request.env['CONTENT_LENGTH']
> +    @http, provider_request = driver.blob_stream_connection({'user'=>user,
> +       'password'=>password, 'bucket'=>bucket, 'blob'=>blob, 'metadata'=>
user_meta,
> +       'content_type'=>content_type, 'content_length'=>@content_length })

Minor stylistic nit: it looks a litle funky to use string keys here
instead of symbols.

> +    @content_length = @content_length.to_i #for comparison of size in '<< (data)'
> +    @sock = @http.request(provider_request, nil, true)
> +  end
> +
> +  def << (data)
> +    @sock.write(data)
> +    @size += data.length
> +    if (@size >= @content_length)
> +      result = @http.end_request
> +      if result.is_a?(Net::HTTPSuccess)
> +        @client_request.env["BLOB_SUCCESS"] = "true"
> +      else
> +        @client_request.env["BLOB_FAIL"] = result.body
> +      end
> +    end
> +  end
> +
> +  def rewind
> +  end
> +
> +  #use the Request.env hash (populated by the ThinParser) to determine whether
> +  #this is a post blob operation. By definition, only get here with a body of
> +  # > 112kbytes - thin/lib/thin/request.rb:12 MAX_BODY = 1024 * (80 + 32)
> +  def self.is_put_blob(request = nil)
> +    path = request.env['PATH_INFO']
> +    method = request.env['REQUEST_METHOD']
> +    if ( path =~ /^#{Regexp.escape(Sinatra::UrlForHelper::DEFAULT_URI_PREFIX)}\/buckets/
&& method == 'PUT' )
> +      return true
> +    else
> +      return false
> +    end
> +  end
> +
> +  private
> +
> +  def parse_bucket_blob(request_string)
> +    array = request_string.split("/")
> +    blob = array.pop
> +    bucket = array.pop
> +    return bucket, blob
> +  end
> +
> +  def parse_credentials(request_string)
> +    decoded = Base64.decode64(request_string.split('Basic ').last)
> +    key = decoded.split(':').first
> +    pass = decoded.split(':').last
> +    return key, pass
> +  end
> +
> +end
> diff --git a/server/server.rb b/server/server.rb
> index e332679..a8f5ee0 100644
> --- a/server/server.rb
> +++ b/server/server.rb
> @@ -696,13 +696,46 @@ get "#{Sinatra::UrlForHelper::DEFAULT_URI_PREFIX}/buckets/:bucket/new_blob"
do
>    end
>  end
>  
> -#create a new blob
> +#create a new blob using PUT - streams through deltacloud
> +put "#{Sinatra::UrlForHelper::DEFAULT_URI_PREFIX}/buckets/:bucket/:blob" do
> +  if(env["BLOB_SUCCESS"]) #ie got a 200ok after putting blob
> +    content_type = env["CONTENT_TYPE"]
> +    content_type ||=  ""
> +    @blob = driver.blob(credentials, {:id => params[:blob],
> +                                      'bucket' => params[:bucket]})
> +    respond_to do |format|
> +      format.html { haml :"blobs/show" }
> +      format.xml { haml :"blobs/show" }
> +      format.json { convert_to_json(:blobs, @blob) }
> +    end
> +  elsif(env["BLOB_FAIL"])
> +    report_error(500) #OK?
> +  else # small blobs - < 112kb dont hit the streaming monkey patch - redirect to
POST rule
> +       # also, if running under webrick don't hit the streaming patch (Thin specific)
> +    path = bucket_url(params[:bucket])
> +    status, headers, body = call! env.merge({"PATH_INFO" => "#{Sinatra::UrlForHelper::DEFAULT_URI_PREFIX}/buckets/#{params[:bucket]}",
> +                                             "REQUEST_METHOD" => "POST",
> +                                             "DELTACLOUD_BLOB_NAME" => params[:blob]})

Could we avoid calling the POST rule if we factored the things that both
PUT and POST need into a helper method ?

> +  end
> +end
> +
> +#create a new blob using html interface - NON STREAMING (i.e. browser POST http form
data)
>  post "#{Sinatra::UrlForHelper::DEFAULT_URI_PREFIX}/buckets/:bucket" do
>    bucket_id = params[:bucket]
> -  blob_id = params['blob_id']
> -  blob_data = params['blob_data']
> +  #check if we were passed here from the PUT method - with a small blob (<112kb)
> +  if(env['DELTACLOUD_BLOB_NAME'])
> +    blob_id = env['DELTACLOUD_BLOB_NAME']
> +    temp_file = Tempfile.new("temp_blob_file")
> +    temp_file.write(env['rack.input'].read)
> +    temp_file.flush
> +    content_type = env['CONTENT_TYPE'] || ""
> +    blob_data = {:tempfile => temp_file, :type => content_type}
> +  else
> +    blob_id = params['blob']
> +    blob_data = params['blob_data']
> +  end

This seems too complicated to me ... if we have to jump through hoops to
get to the POST handler, and then through more hoops to have the POST
handler detect we are coming from PUT, we really should be splitting
stuff out into helper method(s)

>    user_meta = {}
> -#first try get blob_metadata from params (i.e., passed by http form post, e.g. browser)
> +  #first try metadata from params (i.e., passed by http form post, e.g. browser)
>    max = params[:meta_params]
>    if(max)
>      (1..max.to_i).each do |i|
> @@ -710,12 +743,13 @@ post "#{Sinatra::UrlForHelper::DEFAULT_URI_PREFIX}/buckets/:bucket"
do
>        key = "HTTP_X_Deltacloud_Blobmeta_#{key}"
>        value = params[:"meta_value#{i}"]
>        user_meta[key] = value
> -    end #max.each do
> -  else #can try to get blob_metadata from http headers
> -    meta_array = request.env.select{|k,v| k.match(/^HTTP[-_]X[-_]Deltacloud[-_]Blobmeta[-_]/i)}
> -    meta_array.inject({}){ |result, array| user_meta[array.first.upcase] = array.last}
> -  end #end if
> +    end
> +  else #can try to get blob_metadata from http headers (i.e. from PUT)
> +      meta_array = request.env.select{|k,v| k.match(/^HTTP[-_]X[-_]Deltacloud[-_]Blobmeta[-_]/i)}
> +      meta_array.inject({}){ |result, array| user_meta[array.first.upcase] = array.last}

Another potetnital user of extract_blob_meta ;)

> +  end
>    @blob = driver.create_blob(credentials, bucket_id, blob_id, blob_data, user_meta)
> +  temp_file.delete if temp_file
>    respond_to do |format|
>      format.html { haml :"blobs/show"}
>      format.xml { haml :"blobs/show" }
> @@ -730,7 +764,7 @@ delete "#{Sinatra::UrlForHelper::DEFAULT_URI_PREFIX}/buckets/:bucket/:blob"
do
>    respond_to do |format|
>      format.xml {  204 }
>      format.json {  204 }
> -    format.html { bucket_url(bucket_id) }
> +    format.html { redirect(bucket_url(bucket_id)) }
>    end
>  end
>  
> @@ -768,7 +802,7 @@ get "#{Sinatra::UrlForHelper::DEFAULT_URI_PREFIX}/buckets/:bucket/:blob"
do
>      respond_to do |format|
>        format.html { haml :"blobs/show" }
>        format.xml { haml :"blobs/show" }
> -      format.json { convert_to_json(blobs, @blob) }
> +      format.json { convert_to_json(:blobs, @blob) }
>        end
>    else
>        report_error(404)
> diff --git a/server/views/blobs/new.html.haml b/server/views/blobs/new.html.haml
> index a075f0a..bf5c6f5 100644
> --- a/server/views/blobs/new.html.haml
> +++ b/server/views/blobs/new.html.haml
> @@ -3,13 +3,7 @@
>  %form{ :action => bucket_url(@bucket_id), :method => :post, :enctype => 'multipart/form-data'}
>    %label
>      Blob Name:
> -    %input{ :name => 'blob_id', :size => 512}/
> -  %label
> -    Blob Data:
> -    %br
> -    %input{ :type => "file", :name => 'blob_data', :size => 50}/
> -    %br
> -    %br
> +    %input{ :name => 'blob', :size => 512}/
>    %input{ :type => "hidden", :name => "meta_params", :value => "0"}
>    %a{ :href => "javascript:;", :onclick => "more_fields();"} Add Metadata
>    %div{ :id => "metadata_holder", :style => "display: none;"}
> @@ -23,4 +17,10 @@
>    %a{ :href => "javascript:;", :onclick => "less_fields();"} Less Metadata
>    %br
>    %br
> +  %label
> +    Blob Data:
> +    %br
> +    %input{ :type => "file", :name => 'blob_data', :size => 50}/
> +    %br
> +    %br
>    %input{ :type => :submit, :name => "commit", :value => "create"}/




Mime
View raw message