deltacloud-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "marios@redhat.com" <mandr...@redhat.com>
Subject Re: [PATCH 1/4] Adds blob streaming uploads using PUT (stream through thin+deltacloud)
Date Thu, 09 Jun 2011 09:11:22 GMT
On 09/06/11 12:07, Michal Fojtik wrote:
> On Jun 8, 2011, at 7:09 PM, marios@redhat.com wrote:
>
> ACK.
>
> A minor stylistic comments inline.

ok thanks - David had the same comment in the previous version - 
explained below:

>
>    -- Michal
>
>
>> From: marios<marios@redhat.com>
>>
>>
>> Signed-off-by: marios<marios@redhat.com>
>> ---
>> server/lib/deltacloud/drivers/ec2/ec2_driver.rb |   37 ++++++-
>> server/lib/deltacloud/helpers/blob_stream.rb    |  142 ++++++++++++++++++++++-
>> server/server.rb                                |   51 +++++++--
>> server/views/blobs/new.html.haml                |   14 +-
>> 4 files changed, 220 insertions(+), 24 deletions(-)
>>
>> diff --git a/server/lib/deltacloud/drivers/ec2/ec2_driver.rb b/server/lib/deltacloud/drivers/ec2/ec2_driver.rb
>> index 4edd989..d37e8ab 100644
>> --- a/server/lib/deltacloud/drivers/ec2/ec2_driver.rb
>> +++ b/server/lib/deltacloud/drivers/ec2/ec2_driver.rb
>> @@ -396,7 +396,7 @@ module Deltacloud
>>          end
>>
>>          #--
>> -        # Create Blob
>> +        # Create Blob - NON Streaming way (i.e. was called with POST html multipart
form data)
>>          #--
>>          def create_blob(credentials, bucket_id, blob_id, data = nil, opts = {})
>>            s3_client = new_client(credentials, :s3)
>> @@ -445,6 +445,40 @@ module Deltacloud
>>            end
>>          end
>>
>> +        #params: {:user,:password,:bucket,:blob,:content_type,:content_length,:metadata}
>> +        def blob_stream_connection(params)
>> +          #canonicalise metadata:
>> +          #http://docs.amazonwebservices.com/AmazonS3/latest/dev/index.html?RESTAuthentication.html
>> +          metadata = params[:metadata]
>> +          signature_meta_string = ""
>> +          unless metadata.nil?
>> +            metadata.gsub_keys('HTTP[-_]X[-_]Deltacloud[-_]Blobmeta[-_]', 'x-amz-meta-')
>> +            keys_array = metadata.keys.sort!
>> +            keys_array.each {|k| signature_meta_string<<  "#{k}:#{metadata[k]}\n"}
>> +          end
>> +          #s3.amazonaws.com
>> +          provider = "https://#{Deltacloud::Drivers::driver_config[:ec2][:entrypoints]['s3']['us-east-1']}"
>
> How about making this configurable by headers o matrix params? Like I want to use different
provider
> than us-east-1 ?
>

===> problem is that we really MUST use 'https://s3.amazonaws.com' (i.e. 
the us-east endpoint) for putting blobs. Amazon redirects your put to 
the appropriate endpoint depending on your bucket location (e.g. to 
s3-eu-west-1.amazonaws.com for European buckets). I tried putting 
directly to the European endpoint but failed. The 'endpoint_for_service' 
method checks
"(Thread.current[:provider] || ENV['API_PROVIDER'] || DEFAULT_REGION) " 
for the endpoint.


>> +          uri = URI.parse(provider)
>> +          http = Net::HTTP.new("#{params[:bucket]}.#{uri.host}", uri.port )
>> +          http.use_ssl = true
>> +          http.verify_mode = OpenSSL::SSL::VERIFY_NONE
>> +          timestamp = Time.now.httpdate
>> +          string_to_sign =
>> +            "PUT\n\n#{params[:content_type]}\n#{timestamp}\n#{signature_meta_string}/#{params[:bucket]}/#{params[:blob]}"
>> +          auth_string = Aws::Utils::sign(params[:password], string_to_sign)
>> +          request = Net::HTTP::Put.new("/#{params[:blob]}")
>> +          request['Host'] = "#{params[:bucket]}.#{uri.host}"
>> +          request['Date'] = timestamp
>> +          request['Content-Type'] = params[:content_type]
>> +          request['Content-Length'] = params[:content_length]
>> +          request['Authorization'] = "AWS #{params[:user]}:#{auth_string}"
>> +          request['Expect'] = "100-continue"
>> +          unless metadata.nil?
>> +            metadata.each{|k,v| request[k] = v}
>> +          end
>> +          return http, request
>> +        end
>> +
>>          def storage_volumes(credentials, opts={})
>>            ec2 = new_client( credentials )
>>            volume_list = (opts and opts[:id]) ? opts[:id] : nil
>> @@ -582,7 +616,6 @@ module Deltacloud
>>          end
>>
>>          private
>> -
>>          def new_client(credentials, type = :ec2)
>>            klass = case type
>>                      when :elb then Aws::Elb
>> diff --git a/server/lib/deltacloud/helpers/blob_stream.rb b/server/lib/deltacloud/helpers/blob_stream.rb
>> index 00879a9..094bfe6 100644
>> --- a/server/lib/deltacloud/helpers/blob_stream.rb
>> +++ b/server/lib/deltacloud/helpers/blob_stream.rb
>> @@ -69,13 +69,145 @@ class Hash
>>      remove = []
>>      self.each_key do |key|
>>        if key.to_s.match(rgx_pattern)
>> -         new_key = key.to_s.gsub(rgx_pattern, replacement)
>> +         new_key = key.to_s.gsub(rgx_pattern, replacement).downcase
>>           self[new_key] = self[key]
>>           remove<<  key
>> -      end #key.match
>> -    end # each_key do
>> +      end
>> +    end
>>      #remove the original keys
>>      self.delete_if{|k,v| remove.include?(k)}
>> -  end #def
>> +  end
>> +
>> +end
>> +
>> +#Monkey patch for streaming blobs:
>> +# Normally a client will upload a blob to deltacloud and thin will put
>> +# this into a tempfile. Then deltacloud would stream up to the provider:
>> +#   i.e.  client =-->>TEMP_FILE-->>  deltacloud =-->>STREAM-->>
 provider
>> +# Instead we want to recognise that this is a 'PUT blob' operation and
>> +# start streaming to the provider as the request is received:
>> +#   i.e.  client =-->>STREAM-->>  deltacloud =-->>STREAM-->>
 provider
>> +module Thin
>> +  class Request
>> +
>> +    alias_method :move_body_to_tempfile_orig, :move_body_to_tempfile if defined?(Thin::Response)
>> +    private
>> +      def move_body_to_tempfile
>> +        if BlobStreamIO::is_put_blob(self)
>> +          @body = BlobStreamIO.new(self)
>> +        else
>> +          move_body_to_tempfile_orig
>> +        end
>> +      end
>> +
>> +  end
>> +end
>> +
>> +require 'net/http'
>> +#monkey patch for Net:HTTP
>> +module Net
>> +  class HTTP
>> +
>> +    alias :request_orig :request
>> +
>> +    def request(req, body = nil, blob_stream = nil,&block)
>> +      unless blob_stream
>> +        return request_orig(req, body,&block)
>> +      end
>> +      @blob_req = req
>> +      do_start #start the connection
>> +
>> +      req.set_body_internal body
>> +      begin_transport req
>> +      req.write_header_m @socket,@curr_http_version, edit_path(req.path)
>> +      @socket
>> +    end
>> +
>> +    class Put<  HTTPRequest
>> +      def write_header_m(sock, ver, path)
>> +        write_header(sock, ver, path)
>> +      end
>> +    end
>> +
>> +    def end_request
>> +      begin
>> +        res = HTTPResponse.read_new(@socket)
>> +      end while res.kind_of?(HTTPContinue)
>> +      res.reading_body(@socket, @blob_req.response_body_permitted?) {
>> +                                          yield res if block_given? }
>> +      end_transport @blob_req, res
>> +      do_finish
>> +      res
>> +    end
>> +  end
>> +
>> +end
>> +
>> +require 'base64'
>> +class BlobStreamIO
>> +
>> +  attr_accessor :size, :provider, :sock
>> +
>> +  def initialize(request)
>> +    @client_request = request
>> +    @size = 0
>> +    bucket, blob = parse_bucket_blob(request.env["PATH_INFO"])
>> +    user, password = parse_credentials(request.env['HTTP_AUTHORIZATION'])
>> +    content_type = request.env['CONTENT_TYPE'] || ""
>> +    #deal with blob_metadata: (X-Deltacloud-Blobmeta-name: value)
>> +    meta_array = request.env.select{|k,v| k.match(/^HTTP[-_]X[-_]Deltacloud[-_]Blobmeta[-_]/i)}
>> +    user_meta = meta_array.inject({}){ |result, array| result[array.first.upcase]
= array.last; result}
>> +    @content_length = request.env['CONTENT_LENGTH']
>> +    @http, provider_request = driver.blob_stream_connection({:user=>user,
>> +       :password=>password, :bucket=>bucket, :blob=>blob, :metadata=>
 user_meta,
>> +       :content_type=>content_type, :content_length=>@content_length })
>> +    @content_length = @content_length.to_i #for comparison of size in '<<
 (data)'
>> +    @sock = @http.request(provider_request, nil, true)
>> +  end
>> +
>> +  def<<  (data)
>> +    @sock.write(data)
>> +    @size += data.length
>> +    if (@size>= @content_length)
>> +      result = @http.end_request
>> +      if result.is_a?(Net::HTTPSuccess)
>> +        @client_request.env["BLOB_SUCCESS"] = "true"
>> +      else
>> +        @client_request.env["BLOB_FAIL"] = result.body
>> +      end
>> +    end
>> +  end
>>
>> -end #class
>> +  def rewind
>> +  end
>> +
>> +  #use the Request.env hash (populated by the ThinParser) to determine whether
>> +  #this is a post blob operation. By definition, only get here with a body of
>> +  #>  112kbytes - thin/lib/thin/request.rb:12 MAX_BODY = 1024 * (80 + 32)
>> +  def self.is_put_blob(request = nil)
>> +    path = request.env['PATH_INFO']
>> +    method = request.env['REQUEST_METHOD']
>> +    if ( path =~ /^#{Regexp.escape(Sinatra::UrlForHelper::DEFAULT_URI_PREFIX)}\/buckets/&&
 method == 'PUT' )
>> +      return true
>> +    else
>> +      return false
>> +    end
>
> The 'if' construction here is not necessary, since condition will return true/false anyway:
>
> ( path =~ /^#{Regexp.escape(Sinatra::UrlForHelper::DEFAULT_URI_PREFIX)}\/buckets/&&
 method == 'PUT' )
>
>
>> +  end
>> +
>> +  private
>> +
>> +  def parse_bucket_blob(request_string)
>> +    array = request_string.split("/")
>> +    blob = array.pop
>> +    bucket = array.pop
>> +    return bucket, blob
>> +  end
>> +
>> +  def parse_credentials(request_string)
>> +    decoded = Base64.decode64(request_string.split('Basic ').last)
>> +    key = decoded.split(':').first
>> +    pass = decoded.split(':').last
>> +    return key, pass
>> +  end
>> +
>> +end
>> diff --git a/server/server.rb b/server/server.rb
>> index e332679..32d0e33 100644
>> --- a/server/server.rb
>> +++ b/server/server.rb
>> @@ -696,13 +696,47 @@ get "#{Sinatra::UrlForHelper::DEFAULT_URI_PREFIX}/buckets/:bucket/new_blob"
do
>>    end
>> end
>>
>> -#create a new blob
>> +#create a new blob using PUT - streams through deltacloud
>> +put "#{Sinatra::UrlForHelper::DEFAULT_URI_PREFIX}/buckets/:bucket/:blob" do
>> +  if(env["BLOB_SUCCESS"]) #ie got a 200ok after putting blob
>> +    content_type = env["CONTENT_TYPE"]
>> +    content_type ||=  ""
>> +    @blob = driver.blob(credentials, {:id =>  params[:blob],
>> +                                      'bucket' =>  params[:bucket]})
>
> It's better to use Symbol as Hash key instead of String.
>
>> +    respond_to do |format|
>> +      format.html { haml :"blobs/show" }
>> +      format.xml { haml :"blobs/show" }
>> +      format.json { convert_to_json(:blobs, @blob) }
>> +    end
>> +  elsif(env["BLOB_FAIL"])
>> +    report_error(500) #OK?
>> +  else # small blobs -<  112kb dont hit the streaming monkey patch - use 'normal'
create_blob
>> +       # also, if running under webrick don't hit the streaming patch (Thin specific)
>> +    bucket_id = params[:bucket]
>> +    blob_id = params[:blob]
>> +    temp_file = Tempfile.new("temp_blob_file")
>> +    temp_file.write(env['rack.input'].read)
>> +    temp_file.flush
>> +    content_type = env['CONTENT_TYPE'] || ""
>> +    blob_data = {:tempfile =>  temp_file, :type =>  content_type}
>> +    meta_array = request.env.select{|k,v| k.match(/^HTTP[-_]X[-_]Deltacloud[-_]Blobmeta[-_]/i)}
>> +    user_meta = meta_array.inject({}){ |result, array| result[array.first.upcase]
= array.last; result}
>> +    @blob = driver.create_blob(credentials, bucket_id, blob_id, blob_data, user_meta)
>> +    temp_file.delete
>> +    respond_to do |format|
>> +      format.html { haml :"blobs/show"}
>> +      format.xml { haml :"blobs/show" }
>> +    end
>> +  end
>> +end
>> +
>> +#create a new blob using html interface - NON STREAMING (i.e. browser POST http
form data)
>> post "#{Sinatra::UrlForHelper::DEFAULT_URI_PREFIX}/buckets/:bucket" do
>>    bucket_id = params[:bucket]
>> -  blob_id = params['blob_id']
>> +  blob_id = params['blob']
>>    blob_data = params['blob_data']
>>    user_meta = {}
>> -#first try get blob_metadata from params (i.e., passed by http form post, e.g. browser)
>> +  #metadata from params (i.e., passed by http form post, e.g. browser)
>>    max = params[:meta_params]
>>    if(max)
>>      (1..max.to_i).each do |i|
>> @@ -710,11 +744,8 @@ post "#{Sinatra::UrlForHelper::DEFAULT_URI_PREFIX}/buckets/:bucket"
do
>>        key = "HTTP_X_Deltacloud_Blobmeta_#{key}"
>>        value = params[:"meta_value#{i}"]
>>        user_meta[key] = value
>> -    end #max.each do
>> -  else #can try to get blob_metadata from http headers
>> -    meta_array = request.env.select{|k,v| k.match(/^HTTP[-_]X[-_]Deltacloud[-_]Blobmeta[-_]/i)}
>> -    meta_array.inject({}){ |result, array| user_meta[array.first.upcase] = array.last}
>> -  end #end if
>> +    end
>> +  end
>>    @blob = driver.create_blob(credentials, bucket_id, blob_id, blob_data, user_meta)
>>    respond_to do |format|
>>      format.html { haml :"blobs/show"}
>> @@ -730,7 +761,7 @@ delete "#{Sinatra::UrlForHelper::DEFAULT_URI_PREFIX}/buckets/:bucket/:blob"
do
>>    respond_to do |format|
>>      format.xml {  204 }
>>      format.json {  204 }
>> -    format.html { bucket_url(bucket_id) }
>> +    format.html { redirect(bucket_url(bucket_id)) }
>>    end
>> end
>>
>> @@ -768,7 +799,7 @@ get "#{Sinatra::UrlForHelper::DEFAULT_URI_PREFIX}/buckets/:bucket/:blob"
do
>>      respond_to do |format|
>>        format.html { haml :"blobs/show" }
>>        format.xml { haml :"blobs/show" }
>> -      format.json { convert_to_json(blobs, @blob) }
>> +      format.json { convert_to_json(:blobs, @blob) }
>>        end
>>    else
>>        report_error(404)
>> diff --git a/server/views/blobs/new.html.haml b/server/views/blobs/new.html.haml
>> index a075f0a..bf5c6f5 100644
>> --- a/server/views/blobs/new.html.haml
>> +++ b/server/views/blobs/new.html.haml
>> @@ -3,13 +3,7 @@
>> %form{ :action =>  bucket_url(@bucket_id), :method =>  :post, :enctype =>
 'multipart/form-data'}
>>    %label
>>      Blob Name:
>> -    %input{ :name =>  'blob_id', :size =>  512}/
>> -  %label
>> -    Blob Data:
>> -    %br
>> -    %input{ :type =>  "file", :name =>  'blob_data', :size =>  50}/
>> -    %br
>> -    %br
>> +    %input{ :name =>  'blob', :size =>  512}/
>>    %input{ :type =>  "hidden", :name =>  "meta_params", :value =>  "0"}
>>    %a{ :href =>  "javascript:;", :onclick =>  "more_fields();"} Add Metadata
>>    %div{ :id =>  "metadata_holder", :style =>  "display: none;"}
>> @@ -23,4 +17,10 @@
>>    %a{ :href =>  "javascript:;", :onclick =>  "less_fields();"} Less Metadata
>>    %br
>>    %br
>> +  %label
>> +    Blob Data:
>> +    %br
>> +    %input{ :type =>  "file", :name =>  'blob_data', :size =>  50}/
>> +    %br
>> +    %br
>>    %input{ :type =>  :submit, :name =>  "commit", :value =>  "create"}/
>> --
>> 1.7.3.4
>>
>
> ------------------------------------------------------
> Michal Fojtik, mfojtik@redhat.com
> Deltacloud API: http://deltacloud.org
>


Mime
View raw message