Return-Path: X-Original-To: apmail-incubator-deltacloud-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-deltacloud-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 84B2748C9 for ; Fri, 10 Jun 2011 16:13:58 +0000 (UTC) Received: (qmail 94213 invoked by uid 500); 10 Jun 2011 16:13:58 -0000 Delivered-To: apmail-incubator-deltacloud-commits-archive@incubator.apache.org Received: (qmail 94187 invoked by uid 500); 10 Jun 2011 16:13:58 -0000 Mailing-List: contact deltacloud-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: deltacloud-dev@incubator.apache.org Delivered-To: mailing list deltacloud-commits@incubator.apache.org Received: (qmail 94180 invoked by uid 99); 10 Jun 2011 16:13:58 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 10 Jun 2011 16:13:58 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 10 Jun 2011 16:13:54 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id D9AAC238896F; Fri, 10 Jun 2011 16:13:32 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1134365 - in /incubator/deltacloud/trunk/server: lib/deltacloud/drivers/ec2/ec2_driver.rb lib/deltacloud/helpers/blob_stream.rb server.rb views/blobs/new.html.haml Date: Fri, 10 Jun 2011 16:13:32 -0000 To: deltacloud-commits@incubator.apache.org From: marios@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110610161332.D9AAC238896F@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: marios Date: Fri Jun 10 16:13:32 2011 New Revision: 1134365 URL: http://svn.apache.org/viewvc?rev=1134365&view=rev Log: Adds blob streaming uploads using PUT (stream through thin+deltacloud) Signed-off-by: marios Modified: incubator/deltacloud/trunk/server/lib/deltacloud/drivers/ec2/ec2_driver.rb incubator/deltacloud/trunk/server/lib/deltacloud/helpers/blob_stream.rb incubator/deltacloud/trunk/server/server.rb incubator/deltacloud/trunk/server/views/blobs/new.html.haml Modified: incubator/deltacloud/trunk/server/lib/deltacloud/drivers/ec2/ec2_driver.rb URL: http://svn.apache.org/viewvc/incubator/deltacloud/trunk/server/lib/deltacloud/drivers/ec2/ec2_driver.rb?rev=1134365&r1=1134364&r2=1134365&view=diff ============================================================================== --- incubator/deltacloud/trunk/server/lib/deltacloud/drivers/ec2/ec2_driver.rb (original) +++ incubator/deltacloud/trunk/server/lib/deltacloud/drivers/ec2/ec2_driver.rb Fri Jun 10 16:13:32 2011 @@ -396,7 +396,7 @@ module Deltacloud end #-- - # Create Blob + # Create Blob - NON Streaming way (i.e. was called with POST html multipart form data) #-- def create_blob(credentials, bucket_id, blob_id, data = nil, opts = {}) s3_client = new_client(credentials, :s3) @@ -445,6 +445,39 @@ module Deltacloud end end + #params: {:user,:password,:bucket,:blob,:content_type,:content_length,:metadata} + def blob_stream_connection(params) + #canonicalise metadata: + #http://docs.amazonwebservices.com/AmazonS3/latest/dev/index.html?RESTAuthentication.html + metadata = params[:metadata] + signature_meta_string = "" + unless metadata.nil? + metadata.gsub_keys('HTTP[-_]X[-_]Deltacloud[-_]Blobmeta[-_]', 'x-amz-meta-') + keys_array = metadata.keys.sort! + keys_array.each {|k| signature_meta_string << "#{k}:#{metadata[k]}\n"} + end + provider = "https://#{endpoint_for_service(:s3)}" + uri = URI.parse(provider) + http = Net::HTTP.new("#{params[:bucket]}.#{uri.host}", uri.port ) + http.use_ssl = true + http.verify_mode = OpenSSL::SSL::VERIFY_NONE + timestamp = Time.now.httpdate + string_to_sign = + "PUT\n\n#{params[:content_type]}\n#{timestamp}\n#{signature_meta_string}/#{params[:bucket]}/#{params[:blob]}" + auth_string = Aws::Utils::sign(params[:password], string_to_sign) + request = Net::HTTP::Put.new("/#{params[:blob]}") + request['Host'] = "#{params[:bucket]}.#{uri.host}" + request['Date'] = timestamp + request['Content-Type'] = params[:content_type] + request['Content-Length'] = params[:content_length] + request['Authorization'] = "AWS #{params[:user]}:#{auth_string}" + request['Expect'] = "100-continue" + unless metadata.nil? + metadata.each{|k,v| request[k] = v} + end + return http, request + end + def storage_volumes(credentials, opts={}) ec2 = new_client( credentials ) volume_list = (opts and opts[:id]) ? opts[:id] : nil @@ -582,7 +615,6 @@ module Deltacloud end private - def new_client(credentials, type = :ec2) klass = case type when :elb then Aws::Elb Modified: incubator/deltacloud/trunk/server/lib/deltacloud/helpers/blob_stream.rb URL: http://svn.apache.org/viewvc/incubator/deltacloud/trunk/server/lib/deltacloud/helpers/blob_stream.rb?rev=1134365&r1=1134364&r2=1134365&view=diff ============================================================================== --- incubator/deltacloud/trunk/server/lib/deltacloud/helpers/blob_stream.rb (original) +++ incubator/deltacloud/trunk/server/lib/deltacloud/helpers/blob_stream.rb Fri Jun 10 16:13:32 2011 @@ -69,13 +69,145 @@ class Hash remove = [] self.each_key do |key| if key.to_s.match(rgx_pattern) - new_key = key.to_s.gsub(rgx_pattern, replacement) + new_key = key.to_s.gsub(rgx_pattern, replacement).downcase self[new_key] = self[key] remove << key - end #key.match - end # each_key do + end + end #remove the original keys self.delete_if{|k,v| remove.include?(k)} - end #def + end -end #class +end + +#Monkey patch for streaming blobs: +# Normally a client will upload a blob to deltacloud and thin will put +# this into a tempfile. Then deltacloud would stream up to the provider: +# i.e. client =-->>TEMP_FILE-->> deltacloud =-->>STREAM-->> provider +# Instead we want to recognise that this is a 'PUT blob' operation and +# start streaming to the provider as the request is received: +# i.e. client =-->>STREAM-->> deltacloud =-->>STREAM-->> provider +module Thin + class Request + + alias_method :move_body_to_tempfile_orig, :move_body_to_tempfile if defined?(Thin::Response) + private + def move_body_to_tempfile + if BlobStreamIO::is_put_blob(self) + @body = BlobStreamIO.new(self) + else + move_body_to_tempfile_orig + end + end + + end +end + +require 'net/http' +#monkey patch for Net:HTTP +module Net + class HTTP + + alias :request_orig :request + + def request(req, body = nil, blob_stream = nil, &block) + unless blob_stream + return request_orig(req, body, &block) + end + @blob_req = req + do_start #start the connection + + req.set_body_internal body + begin_transport req + req.write_header_m @socket,@curr_http_version, edit_path(req.path) + @socket + end + + class Put < HTTPRequest + def write_header_m(sock, ver, path) + write_header(sock, ver, path) + end + end + + def end_request + begin + res = HTTPResponse.read_new(@socket) + end while res.kind_of?(HTTPContinue) + res.reading_body(@socket, @blob_req.response_body_permitted?) { + yield res if block_given? } + end_transport @blob_req, res + do_finish + res + end + end + +end + +require 'base64' +class BlobStreamIO + + attr_accessor :size, :provider, :sock + + def initialize(request) + @client_request = request + @size = 0 + bucket, blob = parse_bucket_blob(request.env["PATH_INFO"]) + user, password = parse_credentials(request.env['HTTP_AUTHORIZATION']) + content_type = request.env['CONTENT_TYPE'] || "" + #deal with blob_metadata: (X-Deltacloud-Blobmeta-name: value) + meta_array = request.env.select{|k,v| k.match(/^HTTP[-_]X[-_]Deltacloud[-_]Blobmeta[-_]/i)} + user_meta = meta_array.inject({}){ |result, array| result[array.first.upcase] = array.last; result} + @content_length = request.env['CONTENT_LENGTH'] + @http, provider_request = driver.blob_stream_connection({:user=>user, + :password=>password, :bucket=>bucket, :blob=>blob, :metadata=> user_meta, + :content_type=>content_type, :content_length=>@content_length }) + @content_length = @content_length.to_i #for comparison of size in '<< (data)' + @sock = @http.request(provider_request, nil, true) + end + + def << (data) + @sock.write(data) + @size += data.length + if (@size >= @content_length) + result = @http.end_request + if result.is_a?(Net::HTTPSuccess) + @client_request.env["BLOB_SUCCESS"] = "true" + else + @client_request.env["BLOB_FAIL"] = result.body + end + end + end + + def rewind + end + + #use the Request.env hash (populated by the ThinParser) to determine whether + #this is a post blob operation. By definition, only get here with a body of + # > 112kbytes - thin/lib/thin/request.rb:12 MAX_BODY = 1024 * (80 + 32) + def self.is_put_blob(request = nil) + path = request.env['PATH_INFO'] + method = request.env['REQUEST_METHOD'] + if ( path =~ /^#{Regexp.escape(Sinatra::UrlForHelper::DEFAULT_URI_PREFIX)}\/buckets/ && method == 'PUT' ) + return true + else + return false + end + end + + private + + def parse_bucket_blob(request_string) + array = request_string.split("/") + blob = array.pop + bucket = array.pop + return bucket, blob + end + + def parse_credentials(request_string) + decoded = Base64.decode64(request_string.split('Basic ').last) + key = decoded.split(':').first + pass = decoded.split(':').last + return key, pass + end + +end Modified: incubator/deltacloud/trunk/server/server.rb URL: http://svn.apache.org/viewvc/incubator/deltacloud/trunk/server/server.rb?rev=1134365&r1=1134364&r2=1134365&view=diff ============================================================================== --- incubator/deltacloud/trunk/server/server.rb (original) +++ incubator/deltacloud/trunk/server/server.rb Fri Jun 10 16:13:32 2011 @@ -704,13 +704,47 @@ get "#{Sinatra::UrlForHelper::DEFAULT_UR end end -#create a new blob +#create a new blob using PUT - streams through deltacloud +put "#{Sinatra::UrlForHelper::DEFAULT_URI_PREFIX}/buckets/:bucket/:blob" do + if(env["BLOB_SUCCESS"]) #ie got a 200ok after putting blob + content_type = env["CONTENT_TYPE"] + content_type ||= "" + @blob = driver.blob(credentials, {:id => params[:blob], + 'bucket' => params[:bucket]}) + respond_to do |format| + format.html { haml :"blobs/show" } + format.xml { haml :"blobs/show" } + format.json { convert_to_json(:blobs, @blob) } + end + elsif(env["BLOB_FAIL"]) + report_error(500) #OK? + else # small blobs - < 112kb dont hit the streaming monkey patch - use 'normal' create_blob + # also, if running under webrick don't hit the streaming patch (Thin specific) + bucket_id = params[:bucket] + blob_id = params[:blob] + temp_file = Tempfile.new("temp_blob_file") + temp_file.write(env['rack.input'].read) + temp_file.flush + content_type = env['CONTENT_TYPE'] || "" + blob_data = {:tempfile => temp_file, :type => content_type} + meta_array = request.env.select{|k,v| k.match(/^HTTP[-_]X[-_]Deltacloud[-_]Blobmeta[-_]/i)} + user_meta = meta_array.inject({}){ |result, array| result[array.first.upcase] = array.last; result} + @blob = driver.create_blob(credentials, bucket_id, blob_id, blob_data, user_meta) + temp_file.delete + respond_to do |format| + format.html { haml :"blobs/show"} + format.xml { haml :"blobs/show" } + end + end +end + +#create a new blob using html interface - NON STREAMING (i.e. browser POST http form data) post "#{Sinatra::UrlForHelper::DEFAULT_URI_PREFIX}/buckets/:bucket" do bucket_id = params[:bucket] - blob_id = params['blob_id'] + blob_id = params['blob'] blob_data = params['blob_data'] user_meta = {} -#first try get blob_metadata from params (i.e., passed by http form post, e.g. browser) + #metadata from params (i.e., passed by http form post, e.g. browser) max = params[:meta_params] if(max) (1..max.to_i).each do |i| @@ -718,11 +752,8 @@ post "#{Sinatra::UrlForHelper::DEFAULT_U key = "HTTP_X_Deltacloud_Blobmeta_#{key}" value = params[:"meta_value#{i}"] user_meta[key] = value - end #max.each do - else #can try to get blob_metadata from http headers - meta_array = request.env.select{|k,v| k.match(/^HTTP[-_]X[-_]Deltacloud[-_]Blobmeta[-_]/i)} - meta_array.inject({}){ |result, array| user_meta[array.first.upcase] = array.last} - end #end if + end + end @blob = driver.create_blob(credentials, bucket_id, blob_id, blob_data, user_meta) respond_to do |format| format.html { haml :"blobs/show"} @@ -738,7 +769,7 @@ delete "#{Sinatra::UrlForHelper::DEFAULT respond_to do |format| format.xml { 204 } format.json { 204 } - format.html { bucket_url(bucket_id) } + format.html { redirect(bucket_url(bucket_id)) } end end @@ -776,7 +807,7 @@ get "#{Sinatra::UrlForHelper::DEFAULT_UR respond_to do |format| format.html { haml :"blobs/show" } format.xml { haml :"blobs/show" } - format.json { convert_to_json(blobs, @blob) } + format.json { convert_to_json(:blobs, @blob) } end else report_error(404) Modified: incubator/deltacloud/trunk/server/views/blobs/new.html.haml URL: http://svn.apache.org/viewvc/incubator/deltacloud/trunk/server/views/blobs/new.html.haml?rev=1134365&r1=1134364&r2=1134365&view=diff ============================================================================== --- incubator/deltacloud/trunk/server/views/blobs/new.html.haml (original) +++ incubator/deltacloud/trunk/server/views/blobs/new.html.haml Fri Jun 10 16:13:32 2011 @@ -3,13 +3,7 @@ %form{ :action => bucket_url(@bucket_id), :method => :post, :enctype => 'multipart/form-data'} %label Blob Name: - %input{ :name => 'blob_id', :size => 512}/ - %label - Blob Data: - %br - %input{ :type => "file", :name => 'blob_data', :size => 50}/ - %br - %br + %input{ :name => 'blob', :size => 512}/ %input{ :type => "hidden", :name => "meta_params", :value => "0"} %a{ :href => "javascript:;", :onclick => "more_fields();"} Add Metadata %div{ :id => "metadata_holder", :style => "display: none;"} @@ -23,4 +17,10 @@ %a{ :href => "javascript:;", :onclick => "less_fields();"} Less Metadata %br %br + %label + Blob Data: + %br + %input{ :type => "file", :name => 'blob_data', :size => 50}/ + %br + %br %input{ :type => :submit, :name => "commit", :value => "create"}/