Return-Path: X-Original-To: apmail-deltacloud-commits-archive@www.apache.org Delivered-To: apmail-deltacloud-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8CB96EA8B for ; Fri, 30 Nov 2012 12:30:12 +0000 (UTC) Received: (qmail 66754 invoked by uid 500); 30 Nov 2012 12:30:12 -0000 Delivered-To: apmail-deltacloud-commits-archive@deltacloud.apache.org Received: (qmail 66714 invoked by uid 500); 30 Nov 2012 12:30:11 -0000 Mailing-List: contact commits-help@deltacloud.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@deltacloud.apache.org Delivered-To: mailing list commits@deltacloud.apache.org Received: (qmail 66586 invoked by uid 99); 30 Nov 2012 12:30:07 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 30 Nov 2012 12:30:07 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 23464816236; Fri, 30 Nov 2012 12:30:07 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: marios@apache.org To: commits@deltacloud.apache.org X-Mailer: ASF-Git Admin Mailer Subject: [2/3] git commit: Adds routes and helpers for segmented blob upload API Message-Id: <20121130123007.23464816236@tyr.zones.apache.org> Date: Fri, 30 Nov 2012 12:30:07 +0000 (UTC) Adds routes and helpers for segmented blob upload API http://mariosandreou.com/deltacloud/cloud_API/2012/10/08/segmenting-huge-blobs.html Project: http://git-wip-us.apache.org/repos/asf/deltacloud/repo Commit: http://git-wip-us.apache.org/repos/asf/deltacloud/commit/c71a728e Tree: http://git-wip-us.apache.org/repos/asf/deltacloud/tree/c71a728e Diff: http://git-wip-us.apache.org/repos/asf/deltacloud/diff/c71a728e Branch: refs/heads/master Commit: c71a728ec59cdb1eee6b849d9e1010274d59911c Parents: b4c5492 Author: marios Authored: Thu Oct 25 17:10:00 2012 +0300 Committer: marios Committed: Fri Nov 30 14:22:32 2012 +0200 ---------------------------------------------------------------------- server/lib/deltacloud/collections/buckets.rb | 35 +++++++++++- .../lib/deltacloud/helpers/blob_stream_helper.rb | 45 ++++++++++++++- 2 files changed, 76 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/deltacloud/blob/c71a728e/server/lib/deltacloud/collections/buckets.rb ---------------------------------------------------------------------- diff --git a/server/lib/deltacloud/collections/buckets.rb b/server/lib/deltacloud/collections/buckets.rb index c503926..90f2306 100644 --- a/server/lib/deltacloud/collections/buckets.rb +++ b/server/lib/deltacloud/collections/buckets.rb @@ -49,8 +49,41 @@ module Deltacloud::Collections end end + put "/segmented_blob_operation/:bucket/:blob" do + case BlobHelper.segmented_blob_op_type(request) + when "init" then + segmented_blob_id = driver.init_segmented_blob(credentials, {:id => params[:blob], + :bucket => params[:bucket]}) + headers["X-Deltacloud-SegmentedBlob"] = segmented_blob_id + status 204 + when "segment" then + if env["BLOB_SUCCESS"] # set by blob_stream_helper after succesful stream PUT + #segment already uploaded by blob_streaming_patch - setting blob_segment-id from the response + headers["X-Deltacloud-BlobSegmentId"] = request.env["BLOB_SEGMENT_ID"] # set in blob_stream_helper: 203 + status 204 + else + report_error(400) # likely blob size < 112 KB (didn't hit streaming patch) + end + when "finalize" then + bucket_id = params[:bucket] + blob_id = params[:blob] + segmented_blob_manifest = BlobHelper.extract_segmented_blob_manifest(request) + segmented_blob_id = BlobHelper.segmented_blob_id(request) + @blob = driver.create_blob(credentials, params[:bucket], params[:blob], nil, {:segment_manifest=>segmented_blob_manifest, :segmented_blob_id=>segmented_blob_id}) + respond_to do |format| + format.xml { haml :"blobs/show" } + format.html { haml :"blobs/show" } + format.json { xml_to_json 'blobs/show' } + end + else + report_error(500) + end + end + put "/buckets/:bucket/:blob" do - if(env["BLOB_SUCCESS"]) #ie got a 200ok after putting blob + if BlobHelper.segmented_blob(request) + status, headers, body = call!(env.merge("PATH_INFO" => "/segmented_blob_operation/#{params[:bucket]}/#{params[:blob]}")) + elsif(env["BLOB_SUCCESS"]) #ie got a 200ok after putting blob content_type = env["CONTENT_TYPE"] content_type ||= "" @blob = driver.blob(credentials, {:id => params[:blob], http://git-wip-us.apache.org/repos/asf/deltacloud/blob/c71a728e/server/lib/deltacloud/helpers/blob_stream_helper.rb ---------------------------------------------------------------------- diff --git a/server/lib/deltacloud/helpers/blob_stream_helper.rb b/server/lib/deltacloud/helpers/blob_stream_helper.rb index dc1d230..88c6c4d 100644 --- a/server/lib/deltacloud/helpers/blob_stream_helper.rb +++ b/server/lib/deltacloud/helpers/blob_stream_helper.rb @@ -77,6 +77,43 @@ DELTACLOUD_BLOBMETA_HEADER = /HTTP[-_]X[-_]Deltacloud[-_]Blobmeta[-_]/i metadata.gsub_keys(DELTACLOUD_BLOBMETA_HEADER, rename_to) end + #in the following segment* methods, using context.env["QUERY_STRING"] rather than context.params so it works for both Thin and Sinatra request objects (streaming) + def self.segmented_blob(request_context) + return true if (request_context.env["HTTP_X_DELTACLOUD_BLOBTYPE"] == 'segmented' || request_context.env["QUERY_STRING"].match(/blob_type=segmented/)) + false + end + + def self.segment_order(request_context) + (request_context.env["HTTP_X_DELTACLOUD_SEGMENTORDER"] || request_context.env["QUERY_STRING"].match(/segment_order=(\w)*/){|m| m[0].split("=").pop}) + end + + def self.segmented_blob_id(request_context) + (request_context.env["HTTP_X_DELTACLOUD_SEGMENTEDBLOB"] || request_context.env["QUERY_STRING"].match(/segmented_blob=(\w)*/){|m| m[0].split("=").pop}) + end + + def self.segmented_blob_op_type(request_context) + is_segmented = segmented_blob(request_context) + blob_id = segmented_blob_id(request_context) + segment_order = segment_order(request_context) + #if blob_type=segmented AND segmented_blob_id AND segment_order then it is a "SEGMENT" + #if blob_type=segmented AND segmented_blob_id then it is a "FINALIZE" + #if blob_type=segmented then it is "INIT" + return "segment" if (is_segmented && blob_id && segment_order) + return "finalize" if (is_segmented && blob_id) + return "init" if is_segmented + nil # should explode something instead + end + + #in "1=abc , 2=def , 3=ghi" + #out {"1"=>"abc", "2"=>"def", "3"=>"ghi"} + def self.extract_segmented_blob_manifest(request) + manifest_hash = request.body.read.split(",").inject({}) do |res,current| + k,v=current.strip.split("=") + res[k]=v + res + end + end + end #Monkey patch for streaming blobs: @@ -158,11 +195,10 @@ class BlobStreamIO @content_length = request.env['CONTENT_LENGTH'] @http, provider_request = Deltacloud::API.driver.blob_stream_connection({:user=>user, :password=>password, :bucket=>bucket, :blob=>blob, :metadata=> user_meta, - :content_type=>content_type, :content_length=>@content_length }) + :content_type=>content_type, :content_length=>@content_length, :context=>request }) @content_length = @content_length.to_i #for comparison of size in '<< (data)' @sock = @http.request(provider_request, nil, true) end - def << (data) @sock.write(data) @size += data.length @@ -170,6 +206,9 @@ class BlobStreamIO result = @http.end_request if result.is_a?(Net::HTTPSuccess) @client_request.env["BLOB_SUCCESS"] = "true" + if BlobHelper.segmented_blob_op_type(@client_request) == "segment" + @client_request.env["BLOB_SEGMENT_ID"] = Deltacloud::API.driver.blob_segment_id(@client_request, result) + end else @client_request.env["BLOB_FAIL"] = result.body end @@ -195,7 +234,7 @@ class BlobStreamIO private def parse_bucket_blob(request_string) - array = request_string.split("/") + array = request_string.gsub(/(&\w*=\w*)*$/, "").split("/") blob = array.pop bucket = array.pop return bucket, blob