deltacloud-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mar...@apache.org
Subject [2/3] git commit: Adds routes and helpers for segmented blob upload API
Date Fri, 30 Nov 2012 12:30:07 GMT
Adds routes and helpers for segmented blob upload API

http://mariosandreou.com/deltacloud/cloud_API/2012/10/08/segmenting-huge-blobs.html


Project: http://git-wip-us.apache.org/repos/asf/deltacloud/repo
Commit: http://git-wip-us.apache.org/repos/asf/deltacloud/commit/c71a728e
Tree: http://git-wip-us.apache.org/repos/asf/deltacloud/tree/c71a728e
Diff: http://git-wip-us.apache.org/repos/asf/deltacloud/diff/c71a728e

Branch: refs/heads/master
Commit: c71a728ec59cdb1eee6b849d9e1010274d59911c
Parents: b4c5492
Author: marios <marios@redhat.com>
Authored: Thu Oct 25 17:10:00 2012 +0300
Committer: marios <marios@redhat.com>
Committed: Fri Nov 30 14:22:32 2012 +0200

----------------------------------------------------------------------
 server/lib/deltacloud/collections/buckets.rb       |   35 +++++++++++-
 .../lib/deltacloud/helpers/blob_stream_helper.rb   |   45 ++++++++++++++-
 2 files changed, 76 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/deltacloud/blob/c71a728e/server/lib/deltacloud/collections/buckets.rb
----------------------------------------------------------------------
diff --git a/server/lib/deltacloud/collections/buckets.rb b/server/lib/deltacloud/collections/buckets.rb
index c503926..90f2306 100644
--- a/server/lib/deltacloud/collections/buckets.rb
+++ b/server/lib/deltacloud/collections/buckets.rb
@@ -49,8 +49,41 @@ module Deltacloud::Collections
       end
     end
 
+    put "/segmented_blob_operation/:bucket/:blob" do
+      case BlobHelper.segmented_blob_op_type(request)
+        when "init" then
+          segmented_blob_id = driver.init_segmented_blob(credentials, {:id => params[:blob],
+                                            :bucket => params[:bucket]})
+          headers["X-Deltacloud-SegmentedBlob"] = segmented_blob_id
+          status 204
+        when "segment" then
+          if env["BLOB_SUCCESS"] # set by blob_stream_helper after succesful stream PUT
+            #segment already uploaded by blob_streaming_patch - setting blob_segment-id from
the response
+            headers["X-Deltacloud-BlobSegmentId"] = request.env["BLOB_SEGMENT_ID"] # set
in blob_stream_helper: 203
+            status 204
+          else
+            report_error(400) # likely blob size < 112 KB (didn't hit streaming patch)
+          end
+        when "finalize" then
+          bucket_id = params[:bucket]
+          blob_id = params[:blob]
+          segmented_blob_manifest = BlobHelper.extract_segmented_blob_manifest(request)
+          segmented_blob_id = BlobHelper.segmented_blob_id(request)
+          @blob = driver.create_blob(credentials, params[:bucket], params[:blob], nil, {:segment_manifest=>segmented_blob_manifest,
:segmented_blob_id=>segmented_blob_id})
+          respond_to do |format|
+            format.xml { haml :"blobs/show" }
+            format.html { haml :"blobs/show" }
+            format.json { xml_to_json 'blobs/show' }
+          end
+        else
+          report_error(500)
+      end
+    end
+
     put "/buckets/:bucket/:blob" do
-      if(env["BLOB_SUCCESS"]) #ie got a 200ok after putting blob
+      if BlobHelper.segmented_blob(request)
+        status, headers, body = call!(env.merge("PATH_INFO" => "/segmented_blob_operation/#{params[:bucket]}/#{params[:blob]}"))
+      elsif(env["BLOB_SUCCESS"]) #ie got a 200ok after putting blob
         content_type = env["CONTENT_TYPE"]
         content_type ||=  ""
         @blob = driver.blob(credentials, {:id => params[:blob],

http://git-wip-us.apache.org/repos/asf/deltacloud/blob/c71a728e/server/lib/deltacloud/helpers/blob_stream_helper.rb
----------------------------------------------------------------------
diff --git a/server/lib/deltacloud/helpers/blob_stream_helper.rb b/server/lib/deltacloud/helpers/blob_stream_helper.rb
index dc1d230..88c6c4d 100644
--- a/server/lib/deltacloud/helpers/blob_stream_helper.rb
+++ b/server/lib/deltacloud/helpers/blob_stream_helper.rb
@@ -77,6 +77,43 @@ DELTACLOUD_BLOBMETA_HEADER = /HTTP[-_]X[-_]Deltacloud[-_]Blobmeta[-_]/i
     metadata.gsub_keys(DELTACLOUD_BLOBMETA_HEADER, rename_to)
   end
 
+  #in the following segment* methods, using context.env["QUERY_STRING"] rather than context.params
so it works for both Thin and Sinatra request objects (streaming)
+  def self.segmented_blob(request_context)
+    return true if (request_context.env["HTTP_X_DELTACLOUD_BLOBTYPE"] == 'segmented' || request_context.env["QUERY_STRING"].match(/blob_type=segmented/))
+    false
+  end
+
+  def self.segment_order(request_context)
+    (request_context.env["HTTP_X_DELTACLOUD_SEGMENTORDER"] || request_context.env["QUERY_STRING"].match(/segment_order=(\w)*/){|m|
m[0].split("=").pop})
+  end
+
+  def self.segmented_blob_id(request_context)
+    (request_context.env["HTTP_X_DELTACLOUD_SEGMENTEDBLOB"] || request_context.env["QUERY_STRING"].match(/segmented_blob=(\w)*/){|m|
m[0].split("=").pop})
+  end
+
+  def self.segmented_blob_op_type(request_context)
+    is_segmented = segmented_blob(request_context)
+    blob_id = segmented_blob_id(request_context)
+    segment_order = segment_order(request_context)
+    #if blob_type=segmented AND segmented_blob_id AND segment_order then it is a "SEGMENT"
+    #if blob_type=segmented AND segmented_blob_id then it is a "FINALIZE"
+    #if blob_type=segmented then it is "INIT"
+    return "segment" if (is_segmented && blob_id && segment_order)
+    return "finalize" if (is_segmented && blob_id)
+    return "init" if is_segmented
+    nil # should explode something instead
+  end
+
+  #in "1=abc , 2=def , 3=ghi"
+  #out {"1"=>"abc", "2"=>"def", "3"=>"ghi"}
+  def self.extract_segmented_blob_manifest(request)
+    manifest_hash = request.body.read.split(",").inject({}) do |res,current|
+      k,v=current.strip.split("=")
+      res[k]=v
+      res
+    end
+  end
+
 end
 
 #Monkey patch for streaming blobs:
@@ -158,11 +195,10 @@ class BlobStreamIO
     @content_length = request.env['CONTENT_LENGTH']
     @http, provider_request = Deltacloud::API.driver.blob_stream_connection({:user=>user,
        :password=>password, :bucket=>bucket, :blob=>blob, :metadata=> user_meta,
-       :content_type=>content_type, :content_length=>@content_length })
+       :content_type=>content_type, :content_length=>@content_length, :context=>request
})
     @content_length = @content_length.to_i #for comparison of size in '<< (data)'
     @sock = @http.request(provider_request, nil, true)
   end
-
   def << (data)
     @sock.write(data)
     @size += data.length
@@ -170,6 +206,9 @@ class BlobStreamIO
       result = @http.end_request
       if result.is_a?(Net::HTTPSuccess)
         @client_request.env["BLOB_SUCCESS"] = "true"
+        if BlobHelper.segmented_blob_op_type(@client_request) == "segment"
+          @client_request.env["BLOB_SEGMENT_ID"] = Deltacloud::API.driver.blob_segment_id(@client_request,
result)
+        end
       else
         @client_request.env["BLOB_FAIL"] = result.body
       end
@@ -195,7 +234,7 @@ class BlobStreamIO
   private
 
   def parse_bucket_blob(request_string)
-    array = request_string.split("/")
+    array = request_string.gsub(/(&\w*=\w*)*$/, "").split("/")
     blob = array.pop
     bucket = array.pop
     return bucket, blob


Mime
View raw message