deltacloud-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Michal Fojtik <mfoj...@redhat.com>
Subject Re: [PATCH 1/4] Adds blob streaming uploads using PUT (stream through thin+deltacloud)
Date Thu, 09 Jun 2011 09:07:48 GMT
On Jun 8, 2011, at 7:09 PM, marios@redhat.com wrote:

ACK.

A minor stylistic comments inline.

  -- Michal


> From: marios <marios@redhat.com>
> 
> 
> Signed-off-by: marios <marios@redhat.com>
> ---
> server/lib/deltacloud/drivers/ec2/ec2_driver.rb |   37 ++++++-
> server/lib/deltacloud/helpers/blob_stream.rb    |  142 ++++++++++++++++++++++-
> server/server.rb                                |   51 +++++++--
> server/views/blobs/new.html.haml                |   14 +-
> 4 files changed, 220 insertions(+), 24 deletions(-)
> 
> diff --git a/server/lib/deltacloud/drivers/ec2/ec2_driver.rb b/server/lib/deltacloud/drivers/ec2/ec2_driver.rb
> index 4edd989..d37e8ab 100644
> --- a/server/lib/deltacloud/drivers/ec2/ec2_driver.rb
> +++ b/server/lib/deltacloud/drivers/ec2/ec2_driver.rb
> @@ -396,7 +396,7 @@ module Deltacloud
>         end
> 
>         #--
> -        # Create Blob
> +        # Create Blob - NON Streaming way (i.e. was called with POST html multipart
form data)
>         #--
>         def create_blob(credentials, bucket_id, blob_id, data = nil, opts = {})
>           s3_client = new_client(credentials, :s3)
> @@ -445,6 +445,40 @@ module Deltacloud
>           end
>         end
> 
> +        #params: {:user,:password,:bucket,:blob,:content_type,:content_length,:metadata}
> +        def blob_stream_connection(params)
> +          #canonicalise metadata:
> +          #http://docs.amazonwebservices.com/AmazonS3/latest/dev/index.html?RESTAuthentication.html
> +          metadata = params[:metadata]
> +          signature_meta_string = ""
> +          unless metadata.nil?
> +            metadata.gsub_keys('HTTP[-_]X[-_]Deltacloud[-_]Blobmeta[-_]', 'x-amz-meta-')
> +            keys_array = metadata.keys.sort!
> +            keys_array.each {|k| signature_meta_string << "#{k}:#{metadata[k]}\n"}
> +          end
> +          #s3.amazonaws.com
> +          provider = "https://#{Deltacloud::Drivers::driver_config[:ec2][:entrypoints]['s3']['us-east-1']}"

How about making this configurable by headers o matrix params? Like I want to use different
provider
than us-east-1 ?

> +          uri = URI.parse(provider)
> +          http = Net::HTTP.new("#{params[:bucket]}.#{uri.host}", uri.port )
> +          http.use_ssl = true
> +          http.verify_mode = OpenSSL::SSL::VERIFY_NONE
> +          timestamp = Time.now.httpdate
> +          string_to_sign =
> +            "PUT\n\n#{params[:content_type]}\n#{timestamp}\n#{signature_meta_string}/#{params[:bucket]}/#{params[:blob]}"
> +          auth_string = Aws::Utils::sign(params[:password], string_to_sign)
> +          request = Net::HTTP::Put.new("/#{params[:blob]}")
> +          request['Host'] = "#{params[:bucket]}.#{uri.host}"
> +          request['Date'] = timestamp
> +          request['Content-Type'] = params[:content_type]
> +          request['Content-Length'] = params[:content_length]
> +          request['Authorization'] = "AWS #{params[:user]}:#{auth_string}"
> +          request['Expect'] = "100-continue"
> +          unless metadata.nil?
> +            metadata.each{|k,v| request[k] = v}
> +          end
> +          return http, request
> +        end
> +
>         def storage_volumes(credentials, opts={})
>           ec2 = new_client( credentials )
>           volume_list = (opts and opts[:id]) ? opts[:id] : nil
> @@ -582,7 +616,6 @@ module Deltacloud
>         end
> 
>         private
> -
>         def new_client(credentials, type = :ec2)
>           klass = case type
>                     when :elb then Aws::Elb
> diff --git a/server/lib/deltacloud/helpers/blob_stream.rb b/server/lib/deltacloud/helpers/blob_stream.rb
> index 00879a9..094bfe6 100644
> --- a/server/lib/deltacloud/helpers/blob_stream.rb
> +++ b/server/lib/deltacloud/helpers/blob_stream.rb
> @@ -69,13 +69,145 @@ class Hash
>     remove = []
>     self.each_key do |key|
>       if key.to_s.match(rgx_pattern)
> -         new_key = key.to_s.gsub(rgx_pattern, replacement)
> +         new_key = key.to_s.gsub(rgx_pattern, replacement).downcase
>          self[new_key] = self[key]
>          remove << key
> -      end #key.match
> -    end # each_key do
> +      end
> +    end
>     #remove the original keys
>     self.delete_if{|k,v| remove.include?(k)}
> -  end #def
> +  end
> +
> +end
> +
> +#Monkey patch for streaming blobs:
> +# Normally a client will upload a blob to deltacloud and thin will put
> +# this into a tempfile. Then deltacloud would stream up to the provider:
> +#   i.e.  client =-->>TEMP_FILE-->> deltacloud =-->>STREAM-->>
provider
> +# Instead we want to recognise that this is a 'PUT blob' operation and
> +# start streaming to the provider as the request is received:
> +#   i.e.  client =-->>STREAM-->> deltacloud =-->>STREAM-->>
provider
> +module Thin
> +  class Request
> +
> +    alias_method :move_body_to_tempfile_orig, :move_body_to_tempfile if defined?(Thin::Response)
> +    private
> +      def move_body_to_tempfile
> +        if BlobStreamIO::is_put_blob(self)
> +          @body = BlobStreamIO.new(self)
> +        else
> +          move_body_to_tempfile_orig
> +        end
> +      end
> +
> +  end
> +end
> +
> +require 'net/http'
> +#monkey patch for Net:HTTP
> +module Net
> +  class HTTP
> +
> +    alias :request_orig :request
> +
> +    def request(req, body = nil, blob_stream = nil, &block)
> +      unless blob_stream
> +        return request_orig(req, body, &block)
> +      end
> +      @blob_req = req
> +      do_start #start the connection
> +
> +      req.set_body_internal body
> +      begin_transport req
> +      req.write_header_m @socket,@curr_http_version, edit_path(req.path)
> +      @socket
> +    end
> +
> +    class Put < HTTPRequest
> +      def write_header_m(sock, ver, path)
> +        write_header(sock, ver, path)
> +      end
> +    end
> +
> +    def end_request
> +      begin
> +        res = HTTPResponse.read_new(@socket)
> +      end while res.kind_of?(HTTPContinue)
> +      res.reading_body(@socket, @blob_req.response_body_permitted?) {
> +                                          yield res if block_given? }
> +      end_transport @blob_req, res
> +      do_finish
> +      res
> +    end
> +  end
> +
> +end
> +
> +require 'base64'
> +class BlobStreamIO
> +
> +  attr_accessor :size, :provider, :sock
> +
> +  def initialize(request)
> +    @client_request = request
> +    @size = 0
> +    bucket, blob = parse_bucket_blob(request.env["PATH_INFO"])
> +    user, password = parse_credentials(request.env['HTTP_AUTHORIZATION'])
> +    content_type = request.env['CONTENT_TYPE'] || ""
> +    #deal with blob_metadata: (X-Deltacloud-Blobmeta-name: value)
> +    meta_array = request.env.select{|k,v| k.match(/^HTTP[-_]X[-_]Deltacloud[-_]Blobmeta[-_]/i)}
> +    user_meta = meta_array.inject({}){ |result, array| result[array.first.upcase] =
array.last; result}
> +    @content_length = request.env['CONTENT_LENGTH']
> +    @http, provider_request = driver.blob_stream_connection({:user=>user,
> +       :password=>password, :bucket=>bucket, :blob=>blob, :metadata=> user_meta,
> +       :content_type=>content_type, :content_length=>@content_length })
> +    @content_length = @content_length.to_i #for comparison of size in '<< (data)'
> +    @sock = @http.request(provider_request, nil, true)
> +  end
> +
> +  def << (data)
> +    @sock.write(data)
> +    @size += data.length
> +    if (@size >= @content_length)
> +      result = @http.end_request
> +      if result.is_a?(Net::HTTPSuccess)
> +        @client_request.env["BLOB_SUCCESS"] = "true"
> +      else
> +        @client_request.env["BLOB_FAIL"] = result.body
> +      end
> +    end
> +  end
> 
> -end #class
> +  def rewind
> +  end
> +
> +  #use the Request.env hash (populated by the ThinParser) to determine whether
> +  #this is a post blob operation. By definition, only get here with a body of
> +  # > 112kbytes - thin/lib/thin/request.rb:12 MAX_BODY = 1024 * (80 + 32)
> +  def self.is_put_blob(request = nil)
> +    path = request.env['PATH_INFO']
> +    method = request.env['REQUEST_METHOD']
> +    if ( path =~ /^#{Regexp.escape(Sinatra::UrlForHelper::DEFAULT_URI_PREFIX)}\/buckets/
&& method == 'PUT' )
> +      return true
> +    else
> +      return false
> +    end

The 'if' construction here is not necessary, since condition will return true/false anyway:

( path =~ /^#{Regexp.escape(Sinatra::UrlForHelper::DEFAULT_URI_PREFIX)}\/buckets/ &&
method == 'PUT' )


> +  end
> +
> +  private
> +
> +  def parse_bucket_blob(request_string)
> +    array = request_string.split("/")
> +    blob = array.pop
> +    bucket = array.pop
> +    return bucket, blob
> +  end
> +
> +  def parse_credentials(request_string)
> +    decoded = Base64.decode64(request_string.split('Basic ').last)
> +    key = decoded.split(':').first
> +    pass = decoded.split(':').last
> +    return key, pass
> +  end
> +
> +end
> diff --git a/server/server.rb b/server/server.rb
> index e332679..32d0e33 100644
> --- a/server/server.rb
> +++ b/server/server.rb
> @@ -696,13 +696,47 @@ get "#{Sinatra::UrlForHelper::DEFAULT_URI_PREFIX}/buckets/:bucket/new_blob"
do
>   end
> end
> 
> -#create a new blob
> +#create a new blob using PUT - streams through deltacloud
> +put "#{Sinatra::UrlForHelper::DEFAULT_URI_PREFIX}/buckets/:bucket/:blob" do
> +  if(env["BLOB_SUCCESS"]) #ie got a 200ok after putting blob
> +    content_type = env["CONTENT_TYPE"]
> +    content_type ||=  ""
> +    @blob = driver.blob(credentials, {:id => params[:blob],
> +                                      'bucket' => params[:bucket]})

It's better to use Symbol as Hash key instead of String. 

> +    respond_to do |format|
> +      format.html { haml :"blobs/show" }
> +      format.xml { haml :"blobs/show" }
> +      format.json { convert_to_json(:blobs, @blob) }
> +    end
> +  elsif(env["BLOB_FAIL"])
> +    report_error(500) #OK?
> +  else # small blobs - < 112kb dont hit the streaming monkey patch - use 'normal'
create_blob
> +       # also, if running under webrick don't hit the streaming patch (Thin specific)
> +    bucket_id = params[:bucket]
> +    blob_id = params[:blob]
> +    temp_file = Tempfile.new("temp_blob_file")
> +    temp_file.write(env['rack.input'].read)
> +    temp_file.flush
> +    content_type = env['CONTENT_TYPE'] || ""
> +    blob_data = {:tempfile => temp_file, :type => content_type}
> +    meta_array = request.env.select{|k,v| k.match(/^HTTP[-_]X[-_]Deltacloud[-_]Blobmeta[-_]/i)}
> +    user_meta = meta_array.inject({}){ |result, array| result[array.first.upcase] =
array.last; result}
> +    @blob = driver.create_blob(credentials, bucket_id, blob_id, blob_data, user_meta)
> +    temp_file.delete
> +    respond_to do |format|
> +      format.html { haml :"blobs/show"}
> +      format.xml { haml :"blobs/show" }
> +    end
> +  end
> +end
> +
> +#create a new blob using html interface - NON STREAMING (i.e. browser POST http form
data)
> post "#{Sinatra::UrlForHelper::DEFAULT_URI_PREFIX}/buckets/:bucket" do
>   bucket_id = params[:bucket]
> -  blob_id = params['blob_id']
> +  blob_id = params['blob']
>   blob_data = params['blob_data']
>   user_meta = {}
> -#first try get blob_metadata from params (i.e., passed by http form post, e.g. browser)
> +  #metadata from params (i.e., passed by http form post, e.g. browser)
>   max = params[:meta_params]
>   if(max)
>     (1..max.to_i).each do |i|
> @@ -710,11 +744,8 @@ post "#{Sinatra::UrlForHelper::DEFAULT_URI_PREFIX}/buckets/:bucket"
do
>       key = "HTTP_X_Deltacloud_Blobmeta_#{key}"
>       value = params[:"meta_value#{i}"]
>       user_meta[key] = value
> -    end #max.each do
> -  else #can try to get blob_metadata from http headers
> -    meta_array = request.env.select{|k,v| k.match(/^HTTP[-_]X[-_]Deltacloud[-_]Blobmeta[-_]/i)}
> -    meta_array.inject({}){ |result, array| user_meta[array.first.upcase] = array.last}
> -  end #end if
> +    end
> +  end
>   @blob = driver.create_blob(credentials, bucket_id, blob_id, blob_data, user_meta)
>   respond_to do |format|
>     format.html { haml :"blobs/show"}
> @@ -730,7 +761,7 @@ delete "#{Sinatra::UrlForHelper::DEFAULT_URI_PREFIX}/buckets/:bucket/:blob"
do
>   respond_to do |format|
>     format.xml {  204 }
>     format.json {  204 }
> -    format.html { bucket_url(bucket_id) }
> +    format.html { redirect(bucket_url(bucket_id)) }
>   end
> end
> 
> @@ -768,7 +799,7 @@ get "#{Sinatra::UrlForHelper::DEFAULT_URI_PREFIX}/buckets/:bucket/:blob"
do
>     respond_to do |format|
>       format.html { haml :"blobs/show" }
>       format.xml { haml :"blobs/show" }
> -      format.json { convert_to_json(blobs, @blob) }
> +      format.json { convert_to_json(:blobs, @blob) }
>       end
>   else
>       report_error(404)
> diff --git a/server/views/blobs/new.html.haml b/server/views/blobs/new.html.haml
> index a075f0a..bf5c6f5 100644
> --- a/server/views/blobs/new.html.haml
> +++ b/server/views/blobs/new.html.haml
> @@ -3,13 +3,7 @@
> %form{ :action => bucket_url(@bucket_id), :method => :post, :enctype => 'multipart/form-data'}
>   %label
>     Blob Name:
> -    %input{ :name => 'blob_id', :size => 512}/
> -  %label
> -    Blob Data:
> -    %br
> -    %input{ :type => "file", :name => 'blob_data', :size => 50}/
> -    %br
> -    %br
> +    %input{ :name => 'blob', :size => 512}/
>   %input{ :type => "hidden", :name => "meta_params", :value => "0"}
>   %a{ :href => "javascript:;", :onclick => "more_fields();"} Add Metadata
>   %div{ :id => "metadata_holder", :style => "display: none;"}
> @@ -23,4 +17,10 @@
>   %a{ :href => "javascript:;", :onclick => "less_fields();"} Less Metadata
>   %br
>   %br
> +  %label
> +    Blob Data:
> +    %br
> +    %input{ :type => "file", :name => 'blob_data', :size => 50}/
> +    %br
> +    %br
>   %input{ :type => :submit, :name => "commit", :value => "create"}/
> -- 
> 1.7.3.4
> 

------------------------------------------------------
Michal Fojtik, mfojtik@redhat.com
Deltacloud API: http://deltacloud.org


Mime
View raw message