incubator-deltacloud-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "marios@redhat.com" <mandr...@redhat.com>
Subject Re: [PATCH] Adds blob streaming uploads using PUT (stream through thin+deltacloud)
Date Wed, 08 Jun 2011 17:01:19 GMT
Ok, thanks for the review - I am sending out a new patch series. I 
address your comments inline:


On 08/06/11 11:52, David Lutterkort wrote:
> On Tue, 2011-06-07 at 20:45 +0300, marios@redhat.com wrote:
>> From: marios<marios@redhat.com>
>>
>>
>> Signed-off-by: marios<marios@redhat.com>
>
> Almost ACK; there's a few stylistic things that should be done to
> simplify the code:
>
>> ---
>>   server/lib/deltacloud/drivers/ec2/ec2_driver.rb |   51 +++++++-
>>   server/lib/deltacloud/helpers/blob_stream.rb    |  155 +++++++++++++++++++++--
>>   server/server.rb                                |   56 +++++++--
>>   server/views/blobs/new.html.haml                |   14 +-
>>   4 files changed, 242 insertions(+), 34 deletions(-)
>>
>> diff --git a/server/lib/deltacloud/drivers/ec2/ec2_driver.rb b/server/lib/deltacloud/drivers/ec2/ec2_driver.rb
>> index 4edd989..7f1e021 100644
>> --- a/server/lib/deltacloud/drivers/ec2/ec2_driver.rb
>> +++ b/server/lib/deltacloud/drivers/ec2/ec2_driver.rb
>> @@ -355,9 +355,9 @@ module Deltacloud
>>                 bucket_list = s3_client.buckets
>>                 bucket_list.each do |current|
>>                   buckets<<  Bucket.new({:name =>  current.name, :id =>
 current.name})
>> -              end #bucket_list.each
>> -            end #if
>> -          end #safely
>> +              end
>> +            end
>> +          end
>>             filter_on(buckets, :id, opts)
>>           end
>>
>> @@ -387,8 +387,12 @@ module Deltacloud
>>             blobs = []
>>             safely do
>>               s3_bucket = s3_client.bucket(opts['bucket'])
>> -            s3_bucket.keys({}, true).each do |s3_object|
>> -              blobs<<  convert_object(s3_object)
>> +            unless(opts[:id]).nil?
>
> Better: 'if opts[:id]'
>
>> +              blobs<<  convert_object(s3_bucket.key(opts[:id], true))
>> +            else
>> +              s3_bucket.keys({}, true).each do |s3_object|
>> +                blobs<<  convert_object(s3_object)
>> +              end
>>               end
>>             end
>>             blobs = filter_on(blobs, :id, opts)
>
> This and the previous hunk really belong into a different patch.

===> OK, patches 2 and 3 in the new series



>
>> @@ -396,7 +400,7 @@ module Deltacloud
>>           end
>>
>>           #--
>> -        # Create Blob
>> +        # Create Blob - NON Streaming way (i.e. was called with POST html multipart
form data)
>>           #--
>>           def create_blob(credentials, bucket_id, blob_id, data = nil, opts = {})
>>             s3_client = new_client(credentials, :s3)
>> @@ -445,6 +449,40 @@ module Deltacloud
>>             end
>>           end
>>
>> +        #params: {'user','password','bucket','blob','content_type', 'content_length',
'metadata'}
>> +        def blob_stream_connection(params)
>> +          #canonicalise metadata:
>> +          #http://docs.amazonwebservices.com/AmazonS3/latest/dev/index.html?RESTAuthentication.html
>> +          metadata = params['metadata']
>> +          signature_meta_string = ""
>> +          unless metadata.nil?
>> +            metadata.gsub_keys('HTTP[-_]X[-_]Deltacloud[-_]Blobmeta[-_]', 'x-amz-meta-')
>> +            keys_array = metadata.keys.sort!
>> +            keys_array.each {|k| signature_meta_string<<  "#{k}:#{metadata[k]}\n"}
>> +          end
>> +          #s3.amazonaws.com
>> +          provider = "https://#{Deltacloud::Drivers::driver_config[:ec2][:entrypoints]['s3']['us-east-1']}"
>
> This should use endpoint_for_service so that provider selection works as
> expected.

===> problem is that we really MUST use 'https://s3.amazonaws.com' (i.e. 
the us-east endpoint) for putting blobs. Amazon redirects your put to 
the appropriate endpoint depending on your bucket location (e.g. to 
s3-eu-west-1.amazonaws.com for European buckets). I tried putting 
directly to the European endpoint but failed. The 'endpoint_for_service' 
method checks
"(Thread.current[:provider] || ENV['API_PROVIDER'] || DEFAULT_REGION) " 
for the endpoint.


>
>> +          uri = URI.parse(provider)
>> +          http = Net::HTTP.new("#{params['bucket']}.#{uri.host}", uri.port )
>> +          http.use_ssl = true
>> +          http.verify_mode = OpenSSL::SSL::VERIFY_NONE
>> +          timestamp = Time.now.httpdate
>> +          string_to_sign =
>> +            "PUT\n\n#{params['content_type']}\n#{timestamp}\n#{signature_meta_string}/#{params['bucket']}/#{params['blob']}"
>> +          auth_string = Aws::Utils::sign(params['password'], string_to_sign)
>> +          request = Net::HTTP::Put.new("/#{params['blob']}")
>> +          request['Host'] = "#{params['bucket']}.#{uri.host}"
>> +          request['Date'] = timestamp
>> +          request['Content-Type'] = params['content_type']
>> +          request['Content-Length'] = params['content_length']
>> +          request['Authorization'] = "AWS #{params['user']}:#{auth_string}"
>> +          request['Expect'] = "100-continue"
>> +          unless metadata.nil?
>> +            metadata.each{|k,v| request[k] = v}
>> +          end
>> +          return http, request
>> +        end
>> +
>>           def storage_volumes(credentials, opts={})
>>             ec2 = new_client( credentials )
>>             volume_list = (opts and opts[:id]) ? opts[:id] : nil
>> @@ -582,7 +620,6 @@ module Deltacloud
>>           end
>>
>>           private
>> -
>>           def new_client(credentials, type = :ec2)
>>             klass = case type
>>                       when :elb then Aws::Elb
>> diff --git a/server/lib/deltacloud/helpers/blob_stream.rb b/server/lib/deltacloud/helpers/blob_stream.rb
>> index 00879a9..355a66a 100644
>> --- a/server/lib/deltacloud/helpers/blob_stream.rb
>> +++ b/server/lib/deltacloud/helpers/blob_stream.rb
>> @@ -17,7 +17,8 @@ include Deltacloud
>>   begin
>>     require 'eventmachine'
>>     #--
>> -  # based on the example from http://macournoyer.com/blog/2009/06/04/pusher-and-async-with-thin/
>> +  # based on the example from
>> +  #   http://macournoyer.com/blog/2009/06/04/pusher-and-async-with-thin/
>>     #--
>>     class BlobStream
>>       AsyncResponse = [-1, {}, []].freeze
>> @@ -32,11 +33,11 @@ begin
>>           'Content-Disposition' =>  params["content_disposition"],
>>           'Content-Length' =>  "#{params['content_length']}"}, body]
>>         }
>> -      #call the driver from here. the driver method yields for every chunk of blob
it receives. We then
>> -      #use body.call to write that chunk as received.
>> +      #call the driver from here. the driver method yields for every chunk
>> +      #of blob it receives. Then use body.call to write that chunk as received.
>>         driver.blob_data(credentials, params[:bucket], params[:blob], params) {|chunk|
body.call ["#{chunk}"]} #close blob_data block
>>         body.succeed
>> -      AsyncResponse # Tells Thin to not close the connection and continue it's work
on other request
>> +      AsyncResponse # Tell Thin to not close connection&  work other requests
>>       end
>>     end
>
> The last two hunks only change formatting and should really go into a
> separate patch.

==>OK patch 3 in the new series

>
>> @@ -69,13 +70,149 @@ class Hash
>>       remove = []
>>       self.each_key do |key|
>>         if key.to_s.match(rgx_pattern)
>> -         new_key = key.to_s.gsub(rgx_pattern, replacement)
>> +         new_key = key.to_s.gsub(rgx_pattern, replacement).downcase
>>            self[new_key] = self[key]
>>            remove<<  key
>> -      end #key.match
>> -    end # each_key do
>> +      end
>> +    end
>>       #remove the original keys
>>       self.delete_if{|k,v| remove.include?(k)}
>> -  end #def
>> +  end
>> +
>> +end
>> +
>> +#Monkey patch for streaming blobs:
>> +# Normally a client will upload a blob to deltacloud and thin will put
>> +# this into a tempfile. Then deltacloud would stream up to the provider:
>> +#   i.e.  client =-->>TEMP_FILE-->>  deltacloud =-->>STREAM-->>
 provider
>> +# Instead we want to recognise that this is a 'PUT blob' operation and
>> +# start streaming to the provider as the request is received:
>> +#   i.e.  client =-->>STREAM-->>  deltacloud =-->>STREAM-->>
 provider
>> +module Thin
>> +  class Request
>> +
>> +    alias_method :move_body_to_tempfile_orig, :move_body_to_tempfile if defined?(Thin::Response)
>
> Why dies this need to be made conditional ? If we are not running under
> thin, it doesn't really matter what move_body_to_tempfile does, since
> e.g. Webrick will never call it.
>

===> Basically I uninstalled thin entirely (i.e. the thin gem) in order 
to test this code under webrick. In that case, the 'alias_method' failed 
since Thin::Request.move_body_to_tempfile does not exist 
(./server/bin/../lib/deltacloud/helpers/blob_stream.rb:94:in 
`alias_method': undefined method `move_body_to_tempfile' for class 
`Thin::Request' (NameError)).


>> +    private
>> +      def move_body_to_tempfile
>> +        if BlobStreamIO::is_put_blob(self)
>> +          @body = BlobStreamIO.new(self)
>> +        else
>> +          move_body_to_tempfile_orig
>> +        end
>> +      end
>> +
>> +  end
>> +end
>> +
>> +require 'net/http'
>> +#monkey patch for Net:HTTP
>> +module Net
>> +  class HTTP
>>
>> -end #class
>> +    alias :request_orig :request
>> +
>> +    @blob_req # needs global scope for close op later
>
> That line isn't needed.
>

done (sorry bad habits from my java days... )


>> +    def request(req, body = nil, blob_stream = nil,&block)
>> +      unless blob_stream
>> +        return request_orig(req, body,&block)
>> +      end
>> +      @blob_req = req
>> +      do_start #start the connection
>> +
>> +      req.set_body_internal body
>> +      begin_transport req
>> +      req.write_header_m @socket,@curr_http_version, edit_path(req.path)
>> +      @socket
>> +    end
>> +
>> +    class Put<  HTTPRequest
>> +      def write_header_m(sock, ver, path)
>> +        write_header(sock, ver, path)
>> +      end
>> +    end
>> +
>> +    def end_request
>> +      begin
>> +      res = HTTPResponse.read_new(@socket)
>> +      end while res.kind_of?(HTTPContinue)
>> +      res.reading_body(@socket, @blob_req.response_body_permitted?) {
>> +        yield res if block_given?
>> +       }
>> +  end_transport @blob_req, res
>
> Spaces, please
>

ok done


>> +      do_finish
>> +      res
>> +    end
>> +  end
>> +
>> +end
>> +
>> +require 'base64'
>> +class BlobStreamIO
>> +
>> +  attr_accessor :size, :provider, :sock
>> +
>> +  def initialize(request)
>> +    @client_request = request
>> +    @size = 0
>> +    bucket, blob = parse_bucket_blob(request.env["PATH_INFO"])
>> +    user, password = parse_credentials(request.env['HTTP_AUTHORIZATION'])
>> +    content_type = request.env['CONTENT_TYPE'] || ""
>> +    #deal with blob_metadata: (X-Deltacloud-Blobmeta-name: value)
>> +    user_meta = {}
>> +    meta_array = request.env.select{|k,v| k.match(/^HTTP[-_]X[-_]Deltacloud[-_]Blobmeta[-_]/i)}
>> +    meta_array.inject({}){ |result, array| user_meta[array.first.upcase] = array.last}
>
> The inject here doesn't do anything, since you never change result, and
> never use the return value. Either use each or sth like
>
>          user_meta = meta_array.inject({}){ |res, kv| res[kv.first.upcase] = kv.last;
res }
>
thanks, done


> Also, there's a lot of blob meta handling sprinkled through the code
> (here, ec2 driver etc.) Not for this patch, but in general, it would be
> cleaner to have some helper methods to deal with that. For example, a
> helper method 'extract_blob_meta(headers, opts)' that does the above,
> and takes an option to rename keys, so that in the ec2_driver you can
> call
>
>          headers = extract_blob_meta(request.env, :rename =>
>          "x-amz-meta-")

ok, I created a new BlobHelper module in blob_stream.rb, patch 4 in the 
new series

>
>> +    @content_length = request.env['CONTENT_LENGTH']
>> +    @http, provider_request = driver.blob_stream_connection({'user'=>user,
>> +       'password'=>password, 'bucket'=>bucket, 'blob'=>blob, 'metadata'=>
 user_meta,
>> +       'content_type'=>content_type, 'content_length'=>@content_length })
>
> Minor stylistic nit: it looks a litle funky to use string keys here
> instead of symbols.

done

>
>> +    @content_length = @content_length.to_i #for comparison of size in '<<
 (data)'
>> +    @sock = @http.request(provider_request, nil, true)
>> +  end
>> +
>> +  def<<  (data)
>> +    @sock.write(data)
>> +    @size += data.length
>> +    if (@size>= @content_length)
>> +      result = @http.end_request
>> +      if result.is_a?(Net::HTTPSuccess)
>> +        @client_request.env["BLOB_SUCCESS"] = "true"
>> +      else
>> +        @client_request.env["BLOB_FAIL"] = result.body
>> +      end
>> +    end
>> +  end
>> +
>> +  def rewind
>> +  end
>> +
>> +  #use the Request.env hash (populated by the ThinParser) to determine whether
>> +  #this is a post blob operation. By definition, only get here with a body of
>> +  #>  112kbytes - thin/lib/thin/request.rb:12 MAX_BODY = 1024 * (80 + 32)
>> +  def self.is_put_blob(request = nil)
>> +    path = request.env['PATH_INFO']
>> +    method = request.env['REQUEST_METHOD']
>> +    if ( path =~ /^#{Regexp.escape(Sinatra::UrlForHelper::DEFAULT_URI_PREFIX)}\/buckets/&&
 method == 'PUT' )
>> +      return true
>> +    else
>> +      return false
>> +    end
>> +  end
>> +
>> +  private
>> +
>> +  def parse_bucket_blob(request_string)
>> +    array = request_string.split("/")
>> +    blob = array.pop
>> +    bucket = array.pop
>> +    return bucket, blob
>> +  end
>> +
>> +  def parse_credentials(request_string)
>> +    decoded = Base64.decode64(request_string.split('Basic ').last)
>> +    key = decoded.split(':').first
>> +    pass = decoded.split(':').last
>> +    return key, pass
>> +  end
>> +
>> +end
>> diff --git a/server/server.rb b/server/server.rb
>> index e332679..a8f5ee0 100644
>> --- a/server/server.rb
>> +++ b/server/server.rb
>> @@ -696,13 +696,46 @@ get "#{Sinatra::UrlForHelper::DEFAULT_URI_PREFIX}/buckets/:bucket/new_blob"
do
>>     end
>>   end
>>
>> -#create a new blob
>> +#create a new blob using PUT - streams through deltacloud
>> +put "#{Sinatra::UrlForHelper::DEFAULT_URI_PREFIX}/buckets/:bucket/:blob" do
>> +  if(env["BLOB_SUCCESS"]) #ie got a 200ok after putting blob
>> +    content_type = env["CONTENT_TYPE"]
>> +    content_type ||=  ""
>> +    @blob = driver.blob(credentials, {:id =>  params[:blob],
>> +                                      'bucket' =>  params[:bucket]})
>> +    respond_to do |format|
>> +      format.html { haml :"blobs/show" }
>> +      format.xml { haml :"blobs/show" }
>> +      format.json { convert_to_json(:blobs, @blob) }
>> +    end
>> +  elsif(env["BLOB_FAIL"])
>> +    report_error(500) #OK?
>> +  else # small blobs -<  112kb dont hit the streaming monkey patch - redirect
to POST rule
>> +       # also, if running under webrick don't hit the streaming patch (Thin specific)
>> +    path = bucket_url(params[:bucket])
>> +    status, headers, body = call! env.merge({"PATH_INFO" =>  "#{Sinatra::UrlForHelper::DEFAULT_URI_PREFIX}/buckets/#{params[:bucket]}",
>> +                                             "REQUEST_METHOD" =>  "POST",
>> +                                             "DELTACLOUD_BLOB_NAME" =>  params[:blob]})
>
> Could we avoid calling the POST rule if we factored the things that both
> PUT and POST need into a helper method ?

===> unfortunately no. The two differ in the way they get the params for 
the driver.create_blob operation: how they get the metadata (http form 
post vs http headers), how they get the blob data (http form post, vs 
data given in rack.input which we have to put into a tempfile). The 
commonality between the two methods is just the call to 
'driver.create_blob' once the params are determined.  However, to avoid 
the call to POST from PUT (and hence the requirement for sinatra 1.2.4) 
I simply put the code directly into the PUT method.


>
>> +  end
>> +end
>> +
>> +#create a new blob using html interface - NON STREAMING (i.e. browser POST http
form data)
>>   post "#{Sinatra::UrlForHelper::DEFAULT_URI_PREFIX}/buckets/:bucket" do
>>     bucket_id = params[:bucket]
>> -  blob_id = params['blob_id']
>> -  blob_data = params['blob_data']
>> +  #check if we were passed here from the PUT method - with a small blob (<112kb)
>> +  if(env['DELTACLOUD_BLOB_NAME'])
>> +    blob_id = env['DELTACLOUD_BLOB_NAME']
>> +    temp_file = Tempfile.new("temp_blob_file")
>> +    temp_file.write(env['rack.input'].read)
>> +    temp_file.flush
>> +    content_type = env['CONTENT_TYPE'] || ""
>> +    blob_data = {:tempfile =>  temp_file, :type =>  content_type}
>> +  else
>> +    blob_id = params['blob']
>> +    blob_data = params['blob_data']
>> +  end
>
> This seems too complicated to me ... if we have to jump through hoops to
> get to the POST handler, and then through more hoops to have the POST
> handler detect we are coming from PUT, we really should be splitting
> stuff out into helper method(s)

as previous comment


>
>>     user_meta = {}
>> -#first try get blob_metadata from params (i.e., passed by http form post, e.g. browser)
>> +  #first try metadata from params (i.e., passed by http form post, e.g. browser)
>>     max = params[:meta_params]
>>     if(max)
>>       (1..max.to_i).each do |i|
>> @@ -710,12 +743,13 @@ post "#{Sinatra::UrlForHelper::DEFAULT_URI_PREFIX}/buckets/:bucket"
do
>>         key = "HTTP_X_Deltacloud_Blobmeta_#{key}"
>>         value = params[:"meta_value#{i}"]
>>         user_meta[key] = value
>> -    end #max.each do
>> -  else #can try to get blob_metadata from http headers
>> -    meta_array = request.env.select{|k,v| k.match(/^HTTP[-_]X[-_]Deltacloud[-_]Blobmeta[-_]/i)}
>> -    meta_array.inject({}){ |result, array| user_meta[array.first.upcase] = array.last}
>> -  end #end if
>> +    end
>> +  else #can try to get blob_metadata from http headers (i.e. from PUT)
>> +      meta_array = request.env.select{|k,v| k.match(/^HTTP[-_]X[-_]Deltacloud[-_]Blobmeta[-_]/i)}
>> +      meta_array.inject({}){ |result, array| user_meta[array.first.upcase] = array.last}
>
> Another potetnital user of extract_blob_meta ;)

done, patch 4/4

>
>> +  end
>>     @blob = driver.create_blob(credentials, bucket_id, blob_id, blob_data, user_meta)
>> +  temp_file.delete if temp_file
>>     respond_to do |format|
>>       format.html { haml :"blobs/show"}
>>       format.xml { haml :"blobs/show" }
>> @@ -730,7 +764,7 @@ delete "#{Sinatra::UrlForHelper::DEFAULT_URI_PREFIX}/buckets/:bucket/:blob"
do
>>     respond_to do |format|
>>       format.xml {  204 }
>>       format.json {  204 }
>> -    format.html { bucket_url(bucket_id) }
>> +    format.html { redirect(bucket_url(bucket_id)) }
>>     end
>>   end
>>
>> @@ -768,7 +802,7 @@ get "#{Sinatra::UrlForHelper::DEFAULT_URI_PREFIX}/buckets/:bucket/:blob"
do
>>       respond_to do |format|
>>         format.html { haml :"blobs/show" }
>>         format.xml { haml :"blobs/show" }
>> -      format.json { convert_to_json(blobs, @blob) }
>> +      format.json { convert_to_json(:blobs, @blob) }
>>         end
>>     else
>>         report_error(404)
>> diff --git a/server/views/blobs/new.html.haml b/server/views/blobs/new.html.haml
>> index a075f0a..bf5c6f5 100644
>> --- a/server/views/blobs/new.html.haml
>> +++ b/server/views/blobs/new.html.haml
>> @@ -3,13 +3,7 @@
>>   %form{ :action =>  bucket_url(@bucket_id), :method =>  :post, :enctype =>
 'multipart/form-data'}
>>     %label
>>       Blob Name:
>> -    %input{ :name =>  'blob_id', :size =>  512}/
>> -  %label
>> -    Blob Data:
>> -    %br
>> -    %input{ :type =>  "file", :name =>  'blob_data', :size =>  50}/
>> -    %br
>> -    %br
>> +    %input{ :name =>  'blob', :size =>  512}/
>>     %input{ :type =>  "hidden", :name =>  "meta_params", :value =>  "0"}
>>     %a{ :href =>  "javascript:;", :onclick =>  "more_fields();"} Add Metadata
>>     %div{ :id =>  "metadata_holder", :style =>  "display: none;"}
>> @@ -23,4 +17,10 @@
>>     %a{ :href =>  "javascript:;", :onclick =>  "less_fields();"} Less Metadata
>>     %br
>>     %br
>> +  %label
>> +    Blob Data:
>> +    %br
>> +    %input{ :type =>  "file", :name =>  'blob_data', :size =>  50}/
>> +    %br
>> +    %br
>>     %input{ :type =>  :submit, :name =>  "commit", :value =>  "create"}/
>
>
>


Mime
View raw message