Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9AF6210BBE for ; Wed, 17 Dec 2014 22:59:57 +0000 (UTC) Received: (qmail 4887 invoked by uid 500); 17 Dec 2014 22:59:49 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 4575 invoked by uid 500); 17 Dec 2014 22:59:49 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 3460 invoked by uid 99); 17 Dec 2014 22:59:48 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 17 Dec 2014 22:59:48 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 43ED58C4016; Wed, 17 Dec 2014 22:59:48 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: cnauroth@apache.org To: common-commits@hadoop.apache.org Date: Wed, 17 Dec 2014 23:00:00 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [14/24] hadoop git commit: HADOOP-10809. hadoop-azure: page blob support. Contributed by Dexter Bradshaw, Mostafa Elhemali, Eric Hanson, and Mike Liddell. http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a737026/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java new file mode 100644 index 0000000..95f0c22 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java @@ -0,0 +1,497 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azure; + +import static org.apache.hadoop.fs.azure.PageBlobFormatHelpers.PAGE_DATA_SIZE; +import static org.apache.hadoop.fs.azure.PageBlobFormatHelpers.PAGE_HEADER_SIZE; +import static org.apache.hadoop.fs.azure.PageBlobFormatHelpers.PAGE_SIZE; +import static org.apache.hadoop.fs.azure.PageBlobFormatHelpers.fromShort; +import static org.apache.hadoop.fs.azure.PageBlobFormatHelpers.withMD5Checking; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.fs.Syncable; +import org.apache.hadoop.fs.azure.StorageInterface.CloudPageBlobWrapper; +import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; + +import com.google.common.annotations.VisibleForTesting; +import com.microsoft.windowsazure.storage.OperationContext; +import com.microsoft.windowsazure.storage.StorageException; +import com.microsoft.windowsazure.storage.blob.BlobRequestOptions; + + +/** + * An output stream that write file data to a page blob stored using ASV's + * custom format. + */ +final class PageBlobOutputStream extends OutputStream implements Syncable { + /** + * The maximum number of raw bytes Azure Storage allows us to upload in a + * single request (4 MB). + */ + private static final int MAX_RAW_BYTES_PER_REQUEST = 4 * 1024 * 1024; + /** + * The maximum number of pages Azure Storage allows us to upload in a + * single request. + */ + private static final int MAX_PAGES_IN_REQUEST = + MAX_RAW_BYTES_PER_REQUEST / PAGE_SIZE; + /** + * The maximum number of data bytes (header not included) we can upload + * in a single request. I'm limiting it to (N - 1) pages to account for + * the possibility that we may have to rewrite the previous request's + * last page. + */ + private static final int MAX_DATA_BYTES_PER_REQUEST = + PAGE_DATA_SIZE * (MAX_PAGES_IN_REQUEST - 1); + + private final CloudPageBlobWrapper blob; + private final OperationContext opContext; + + /** + * If the IO thread encounters an error, it'll store it here. + */ + private volatile IOException lastError; + + /** + * The current byte offset we're at in the blob (how many bytes we've + * uploaded to the server). + */ + private long currentBlobOffset; + /** + * The data in the last page that we wrote to the server, in case we have to + * overwrite it in the new request. + */ + private byte[] previousLastPageDataWritten = new byte[0]; + /** + * The current buffer we're writing to before sending to the server. + */ + private ByteArrayOutputStream outBuffer; + /** + * The task queue for writing to the server. + */ + private final LinkedBlockingQueue ioQueue; + /** + * The thread pool we're using for writing to the server. Note that the IO + * write is NOT designed for parallelism, so there can only be one thread + * in that pool (I'm using the thread pool mainly for the lifetime management + * capabilities, otherwise I'd have just used a simple Thread). + */ + private final ThreadPoolExecutor ioThreadPool; + + // The last task given to the ioThreadPool to execute, to allow + // waiting until it's done. + private WriteRequest lastQueuedTask; + + public static final Log LOG = LogFactory.getLog(AzureNativeFileSystemStore.class); + + // Set the minimum page blob file size to 128MB, which is >> the default block size of 32MB. + // This default block size is often used as the hbase.regionserver.hlog.blocksize. + // The goal is to have a safe minimum size for HBase log files to allow them + // to be filled and rolled without exceeding the minimum size. A larger size can be + // used by setting the fs.azure.page.blob.size configuration variable. + public static final long PAGE_BLOB_MIN_SIZE = 128L * 1024L * 1024L; + + /** + * Constructs an output stream over the given page blob. + * + * @param blob the blob that this stream is associated with. + * @param opContext an object used to track the execution of the operation + * @throws StorageException if anything goes wrong creating the blob. + */ + public PageBlobOutputStream(final CloudPageBlobWrapper blob, + final OperationContext opContext, + final Configuration conf) throws StorageException { + this.blob = blob; + this.outBuffer = new ByteArrayOutputStream(); + this.opContext = opContext; + this.lastQueuedTask = null; + this.ioQueue = new LinkedBlockingQueue(); + + // As explained above: the IO writes are not designed for parallelism, + // so we only have one thread in this thread pool. + this.ioThreadPool = new ThreadPoolExecutor(1, 1, 2, TimeUnit.SECONDS, + ioQueue); + + + + // Make page blob files have a size that is the greater of a + // minimum size, or the value of fs.azure.page.blob.size from configuration. + long pageBlobConfigSize = conf.getLong("fs.azure.page.blob.size", 0); + LOG.debug("Read value of fs.azure.page.blob.size as " + pageBlobConfigSize + + " from configuration (0 if not present)."); + long pageBlobSize = Math.max(PAGE_BLOB_MIN_SIZE, pageBlobConfigSize); + + // Ensure that the pageBlobSize is a multiple of page size. + if (pageBlobSize % PAGE_SIZE != 0) { + pageBlobSize += PAGE_SIZE - pageBlobSize % PAGE_SIZE; + } + blob.create(pageBlobSize, new BlobRequestOptions(), opContext); + } + + private void checkStreamState() throws IOException { + if (lastError != null) { + throw lastError; + } + } + + /** + * Closes this output stream and releases any system resources associated with + * this stream. If any data remains in the buffer it is committed to the + * service. + */ + @Override + public void close() throws IOException { + LOG.debug("Closing page blob output stream."); + flush(); + checkStreamState(); + ioThreadPool.shutdown(); + try { + LOG.debug(ioThreadPool.toString()); + if (!ioThreadPool.awaitTermination(10, TimeUnit.MINUTES)) { + LOG.debug("Timed out after 10 minutes waiting for IO requests to finish"); + logAllStackTraces(); + LOG.debug(ioThreadPool.toString()); + throw new IOException("Timed out waiting for IO requests to finish"); + } + } catch (InterruptedException e) { + LOG.debug("Caught InterruptedException"); + + // Restore the interrupted status + Thread.currentThread().interrupt(); + } + + this.lastError = new IOException("Stream is already closed."); + } + + // Log the stacks of all threads. + private void logAllStackTraces() { + Map liveThreads = Thread.getAllStackTraces(); + for (Iterator i = liveThreads.keySet().iterator(); i.hasNext(); ) { + Thread key = (Thread) i.next(); + LOG.debug("Thread " + key.getName()); + StackTraceElement[] trace = (StackTraceElement[]) liveThreads.get(key); + for (int j = 0; j < trace.length; j++) { + LOG.debug("\tat " + trace[j]); + } + } + } + + /** + * A single write request for data to write to Azure storage. + */ + private class WriteRequest implements Runnable { + private final byte[] dataPayload; + private final CountDownLatch doneSignal = new CountDownLatch(1); + + public WriteRequest(byte[] dataPayload) { + this.dataPayload = dataPayload; + } + + public void waitTillDone() throws InterruptedException { + doneSignal.await(); + } + + @Override + public void run() { + try { + LOG.debug("before runInternal()"); + runInternal(); + LOG.debug("after runInternal()"); + } finally { + doneSignal.countDown(); + } + } + + private void runInternal() { + if (lastError != null) { + // We're already in an error state, no point doing anything. + return; + } + if (dataPayload.length == 0) { + // Nothing to do. + return; + } + + // Since we have to rewrite the last request's last page's data + // (may be empty), total data size is our data plus whatever was + // left from there. + final int totalDataBytes = dataPayload.length + + previousLastPageDataWritten.length; + // Calculate the total number of pages we're writing to the server. + final int numberOfPages = (totalDataBytes / PAGE_DATA_SIZE) + + (totalDataBytes % PAGE_DATA_SIZE == 0 ? 0 : 1); + // Fill up the raw bytes we're writing. + byte[] rawPayload = new byte[numberOfPages * PAGE_SIZE]; + // Keep track of the size of the last page we uploaded. + int currentLastPageDataSize = -1; + for (int page = 0; page < numberOfPages; page++) { + // Our current byte offset in the data. + int dataOffset = page * PAGE_DATA_SIZE; + // Our current byte offset in the raw buffer. + int rawOffset = page * PAGE_SIZE; + // The size of the data in the current page. + final short currentPageDataSize = (short) Math.min(PAGE_DATA_SIZE, + totalDataBytes - dataOffset); + // Save off this page's size as the potential last page's size. + currentLastPageDataSize = currentPageDataSize; + + // Write out the page size in the header. + final byte[] header = fromShort(currentPageDataSize); + System.arraycopy(header, 0, rawPayload, rawOffset, header.length); + rawOffset += header.length; + + int bytesToCopyFromDataPayload = currentPageDataSize; + if (dataOffset < previousLastPageDataWritten.length) { + // First write out the last page's data. + final int bytesToCopyFromLastPage = Math.min(currentPageDataSize, + previousLastPageDataWritten.length - dataOffset); + System.arraycopy(previousLastPageDataWritten, dataOffset, + rawPayload, rawOffset, bytesToCopyFromLastPage); + bytesToCopyFromDataPayload -= bytesToCopyFromLastPage; + rawOffset += bytesToCopyFromLastPage; + dataOffset += bytesToCopyFromLastPage; + } + + if (dataOffset >= previousLastPageDataWritten.length) { + // Then write the current payload's data. + System.arraycopy(dataPayload, + dataOffset - previousLastPageDataWritten.length, + rawPayload, rawOffset, bytesToCopyFromDataPayload); + } + } + + // Raw payload constructed, ship it off to the server. + writePayloadToServer(rawPayload); + + // Post-send bookkeeping. + currentBlobOffset += rawPayload.length; + if (currentLastPageDataSize < PAGE_DATA_SIZE) { + // Partial page, save it off so it's overwritten in the next request. + final int startOffset = (numberOfPages - 1) * PAGE_SIZE + PAGE_HEADER_SIZE; + previousLastPageDataWritten = Arrays.copyOfRange(rawPayload, + startOffset, + startOffset + currentLastPageDataSize); + // Since we're rewriting this page, set our current offset in the server + // to that page's beginning. + currentBlobOffset -= PAGE_SIZE; + } else { + // It wasn't a partial page, we won't need to rewrite it. + previousLastPageDataWritten = new byte[0]; + } + } + + /** + * Writes the given raw payload to Azure Storage at the current blob + * offset. + */ + private void writePayloadToServer(byte[] rawPayload) { + final ByteArrayInputStream wrapperStream = + new ByteArrayInputStream(rawPayload); + LOG.debug("writing payload of " + rawPayload.length + " bytes to Azure page blob"); + try { + long start = System.currentTimeMillis(); + blob.uploadPages(wrapperStream, currentBlobOffset, rawPayload.length, + withMD5Checking(), PageBlobOutputStream.this.opContext); + long end = System.currentTimeMillis(); + LOG.trace("Azure uploadPages time for " + rawPayload.length + " bytes = " + (end - start)); + } catch (IOException ex) { + LOG.debug(ExceptionUtils.getStackTrace(ex)); + lastError = ex; + } catch (StorageException ex) { + LOG.debug(ExceptionUtils.getStackTrace(ex)); + lastError = new IOException(ex); + } + if (lastError != null) { + LOG.debug("Caught error in PageBlobOutputStream#writePayloadToServer()"); + } + } + } + + private synchronized void flushIOBuffers() { + if (outBuffer.size() == 0) { + return; + } + lastQueuedTask = new WriteRequest(outBuffer.toByteArray()); + ioThreadPool.execute(lastQueuedTask); + outBuffer = new ByteArrayOutputStream(); + } + + /** + * Flushes this output stream and forces any buffered output bytes to be + * written out. If any data remains in the buffer it is committed to the + * service. Data is queued for writing but not forced out to the service + * before the call returns. + */ + @Override + public void flush() throws IOException { + checkStreamState(); + flushIOBuffers(); + } + + /** + * Writes b.length bytes from the specified byte array to this output stream. + * + * @param data + * the byte array to write. + * + * @throws IOException + * if an I/O error occurs. In particular, an IOException may be + * thrown if the output stream has been closed. + */ + @Override + public void write(final byte[] data) throws IOException { + write(data, 0, data.length); + } + + /** + * Writes length bytes from the specified byte array starting at offset to + * this output stream. + * + * @param data + * the byte array to write. + * @param offset + * the start offset in the data. + * @param length + * the number of bytes to write. + * @throws IOException + * if an I/O error occurs. In particular, an IOException may be + * thrown if the output stream has been closed. + */ + @Override + public void write(final byte[] data, final int offset, final int length) + throws IOException { + if (offset < 0 || length < 0 || length > data.length - offset) { + throw new IndexOutOfBoundsException(); + } + + writeInternal(data, offset, length); + } + + /** + * Writes the specified byte to this output stream. The general contract for + * write is that one byte is written to the output stream. The byte to be + * written is the eight low-order bits of the argument b. The 24 high-order + * bits of b are ignored. + * + * @param byteVal + * the byteValue to write. + * @throws IOException + * if an I/O error occurs. In particular, an IOException may be + * thrown if the output stream has been closed. + */ + @Override + public void write(final int byteVal) throws IOException { + write(new byte[] { (byte) (byteVal & 0xFF) }); + } + + /** + * Writes the data to the buffer and triggers writes to the service as needed. + * + * @param data + * the byte array to write. + * @param offset + * the start offset in the data. + * @param length + * the number of bytes to write. + * @throws IOException + * if an I/O error occurs. In particular, an IOException may be + * thrown if the output stream has been closed. + */ + private synchronized void writeInternal(final byte[] data, int offset, + int length) throws IOException { + while (length > 0) { + checkStreamState(); + final int availableBufferBytes = MAX_DATA_BYTES_PER_REQUEST + - this.outBuffer.size(); + final int nextWrite = Math.min(availableBufferBytes, length); + + outBuffer.write(data, offset, nextWrite); + offset += nextWrite; + length -= nextWrite; + + if (outBuffer.size() > MAX_DATA_BYTES_PER_REQUEST) { + throw new RuntimeException("Internal error: maximum write size " + + Integer.toString(MAX_DATA_BYTES_PER_REQUEST) + "exceeded."); + } + + if (outBuffer.size() == MAX_DATA_BYTES_PER_REQUEST) { + flushIOBuffers(); + } + } + } + + /** + * Force all data in the output stream to be written to Azure storage. + * Wait to return until this is complete. + */ + @Override + public synchronized void hsync() throws IOException { + LOG.debug("Entering PageBlobOutputStream#hsync()."); + long start = System.currentTimeMillis(); + flush(); + LOG.debug(ioThreadPool.toString()); + try { + if (lastQueuedTask != null) { + lastQueuedTask.waitTillDone(); + } + } catch (InterruptedException e1) { + + // Restore the interrupted status + Thread.currentThread().interrupt(); + } + LOG.debug("Leaving PageBlobOutputStream#hsync(). Total hsync duration = " + + (System.currentTimeMillis() - start) + " msec."); + } + + @Override + + public void hflush() throws IOException { + + // hflush is required to force data to storage, so call hsync, + // which does that. + hsync(); + } + + @Deprecated + public void sync() throws IOException { + + // Sync has been deprecated in favor of hflush. + hflush(); + } + + // For unit testing purposes: kill the IO threads. + @VisibleForTesting + void killIoThreads() { + ioThreadPool.shutdownNow(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a737026/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PartialListing.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PartialListing.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PartialListing.java index 9e49de8..4a80d2e 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PartialListing.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PartialListing.java @@ -30,7 +30,7 @@ import org.apache.hadoop.classification.InterfaceAudience; * This listing may be returned in chunks, so a priorLastKey is * provided so that the next chunk may be requested. *

- * + * * @see NativeFileSystemStore#list(String, int, String) */ @InterfaceAudience.Private http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a737026/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SelfRenewingLease.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SelfRenewingLease.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SelfRenewingLease.java new file mode 100644 index 0000000..2d5c0c8 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SelfRenewingLease.java @@ -0,0 +1,202 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azure; + +import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.azure.StorageInterface.CloudBlobWrapper; + +import com.microsoft.windowsazure.storage.AccessCondition; +import com.microsoft.windowsazure.storage.StorageException; +import com.microsoft.windowsazure.storage.blob.CloudBlob; + +/** + * An Azure blob lease that automatically renews itself indefinitely + * using a background thread. Use it to synchronize distributed processes, + * or to prevent writes to the blob by other processes that don't + * have the lease. + * + * Creating a new Lease object blocks the caller until the Azure blob lease is + * acquired. + * + * Attempting to get a lease on a non-existent blob throws StorageException. + * + * Call free() to release the Lease. + * + * You can use this Lease like a distributed lock. If the holder process + * dies, the lease will time out since it won't be renewed. + */ +public class SelfRenewingLease { + + private CloudBlobWrapper blobWrapper; + private Thread renewer; + private volatile boolean leaseFreed; + private String leaseID = null; + private static final int LEASE_TIMEOUT = 60; // Lease timeout in seconds + + // Time to wait to renew lease in milliseconds + public static final int LEASE_RENEWAL_PERIOD = 40000; + private static final Log LOG = LogFactory.getLog(SelfRenewingLease.class); + + // Used to allocate thread serial numbers in thread name + private static volatile int threadNumber = 0; + + + // Time to wait to retry getting the lease in milliseconds + private static final int LEASE_ACQUIRE_RETRY_INTERVAL = 2000; + + public SelfRenewingLease(CloudBlobWrapper blobWrapper) + throws StorageException { + + this.leaseFreed = false; + this.blobWrapper = blobWrapper; + + // Keep trying to get the lease until you get it. + CloudBlob blob = blobWrapper.getBlob(); + while(leaseID == null) { + try { + leaseID = blob.acquireLease(LEASE_TIMEOUT, null); + } catch (StorageException e) { + + // Throw again if we don't want to keep waiting. + // We expect it to be that the lease is already present, + // or in some cases that the blob does not exist. + if (!e.getErrorCode().equals("LeaseAlreadyPresent")) { + LOG.info( + "Caught exception when trying to get lease on blob " + + blobWrapper.getUri().toString() + ". " + e.getMessage()); + throw e; + } + } + if (leaseID == null) { + try { + Thread.sleep(LEASE_ACQUIRE_RETRY_INTERVAL); + } catch (InterruptedException e) { + + // Restore the interrupted status + Thread.currentThread().interrupt(); + } + } + } + renewer = new Thread(new Renewer()); + + // A Renewer running should not keep JVM from exiting, so make it a daemon. + renewer.setDaemon(true); + renewer.setName("AzureLeaseRenewer-" + threadNumber++); + renewer.start(); + LOG.debug("Acquired lease " + leaseID + " on " + blob.getUri() + + " managed by thread " + renewer.getName()); + } + + /** + * Free the lease and stop the keep-alive thread. + * @throws StorageException + */ + public void free() throws StorageException { + AccessCondition accessCondition = AccessCondition.generateEmptyCondition(); + accessCondition.setLeaseID(leaseID); + try { + blobWrapper.getBlob().releaseLease(accessCondition); + } catch (StorageException e) { + if (e.getErrorCode().equals("BlobNotFound")) { + + // Don't do anything -- it's okay to free a lease + // on a deleted file. The delete freed the lease + // implicitly. + } else { + + // This error is not anticipated, so re-throw it. + LOG.warn("Unanticipated exception when trying to free lease " + leaseID + + " on " + blobWrapper.getStorageUri()); + throw(e); + } + } finally { + + // Even if releasing the lease fails (e.g. because the file was deleted), + // make sure to record that we freed the lease, to terminate the + // keep-alive thread. + leaseFreed = true; + LOG.debug("Freed lease " + leaseID + " on " + blobWrapper.getUri() + + " managed by thread " + renewer.getName()); + } + } + + public boolean isFreed() { + return leaseFreed; + } + + public String getLeaseID() { + return leaseID; + } + + public CloudBlob getCloudBlob() { + return blobWrapper.getBlob(); + } + + private class Renewer implements Runnable { + + /** + * Start a keep-alive thread that will continue to renew + * the lease until it is freed or the process dies. + */ + @Override + public void run() { + LOG.debug("Starting lease keep-alive thread."); + AccessCondition accessCondition = + AccessCondition.generateEmptyCondition(); + accessCondition.setLeaseID(leaseID); + + while(!leaseFreed) { + try { + Thread.sleep(LEASE_RENEWAL_PERIOD); + } catch (InterruptedException e) { + LOG.debug("Keep-alive thread for lease " + leaseID + + " interrupted."); + + // Restore the interrupted status + Thread.currentThread().interrupt(); + } + try { + if (!leaseFreed) { + blobWrapper.getBlob().renewLease(accessCondition); + + // It'll be very rare to renew the lease (most will be short) + // so log that we did it, to help with system debugging. + LOG.info("Renewed lease " + leaseID + " on " + + getCloudBlob().getUri()); + } + } catch (StorageException e) { + if (!leaseFreed) { + + // Free the lease so we don't leave this thread running forever. + leaseFreed = true; + + // Normally leases should be freed and there should be no + // exceptions, so log a warning. + LOG.warn("Attempt to renew lease " + leaseID + " on " + + getCloudBlob().getUri() + + " failed, but lease not yet freed. Reason: " + + e.getMessage()); + } + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a737026/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SelfThrottlingIntercept.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SelfThrottlingIntercept.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SelfThrottlingIntercept.java index 25f2883..d18a144 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SelfThrottlingIntercept.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SelfThrottlingIntercept.java @@ -68,12 +68,14 @@ public class SelfThrottlingIntercept { private final float readFactor; private final float writeFactor; + private final OperationContext operationContext; // Concurrency: access to non-final members must be thread-safe private long lastE2Elatency; - public SelfThrottlingIntercept(OperationContext operationContext, + public SelfThrottlingIntercept(OperationContext operationContext, float readFactor, float writeFactor) { + this.operationContext = operationContext; this.readFactor = readFactor; this.writeFactor = writeFactor; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a737026/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/ShellDecryptionKeyProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/ShellDecryptionKeyProvider.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/ShellDecryptionKeyProvider.java index 2ce8ebd..d9d6fc3 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/ShellDecryptionKeyProvider.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/ShellDecryptionKeyProvider.java @@ -31,7 +31,8 @@ import org.apache.hadoop.util.Shell; */ @InterfaceAudience.Private public class ShellDecryptionKeyProvider extends SimpleKeyProvider { - static final String KEY_ACCOUNT_SHELLKEYPROVIDER_SCRIPT = "fs.azure.shellkeyprovider.script"; + static final String KEY_ACCOUNT_SHELLKEYPROVIDER_SCRIPT = + "fs.azure.shellkeyprovider.script"; @Override public String getStorageAccountKey(String accountName, Configuration conf) http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a737026/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SimpleKeyProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SimpleKeyProvider.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SimpleKeyProvider.java index ef44a85..3cd3eda 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SimpleKeyProvider.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SimpleKeyProvider.java @@ -28,7 +28,8 @@ import org.apache.hadoop.conf.Configuration; @InterfaceAudience.Private public class SimpleKeyProvider implements KeyProvider { - protected static final String KEY_ACCOUNT_KEY_PREFIX = "fs.azure.account.key."; + protected static final String KEY_ACCOUNT_KEY_PREFIX = + "fs.azure.account.key."; @Override public String getStorageAccountKey(String accountName, Configuration conf) http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a737026/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java index 87cef86..8d0229d 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java @@ -23,6 +23,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.net.URI; import java.net.URISyntaxException; +import java.util.ArrayList; import java.util.EnumSet; import java.util.HashMap; @@ -36,15 +37,17 @@ import com.microsoft.windowsazure.storage.StorageException; import com.microsoft.windowsazure.storage.blob.BlobListingDetails; import com.microsoft.windowsazure.storage.blob.BlobProperties; import com.microsoft.windowsazure.storage.blob.BlobRequestOptions; +import com.microsoft.windowsazure.storage.blob.CloudBlob; import com.microsoft.windowsazure.storage.blob.CopyState; import com.microsoft.windowsazure.storage.blob.ListBlobItem; +import com.microsoft.windowsazure.storage.blob.PageRange; /** * This is a very thin layer over the methods exposed by the Windows Azure * Storage SDK that we need for WASB implementation. This base class has a real * implementation that just simply redirects to the SDK, and a memory-backed one * that's used for unit tests. - * + * * IMPORTANT: all the methods here must remain very simple redirects since code * written here can't be properly unit tested. */ @@ -323,23 +326,39 @@ abstract class StorageInterface { * @throws URISyntaxException * If URI syntax exception occurred. */ - public abstract CloudBlockBlobWrapper getBlockBlobReference( + public abstract CloudBlobWrapper getBlockBlobReference( String relativePath) throws URISyntaxException, StorageException; + + /** + * Returns a wrapper for a CloudPageBlob. + * + * @param relativePath + * A String that represents the name of the blob, relative to the container + * + * @throws StorageException + * If a storage service error occurred. + * + * @throws URISyntaxException + * If URI syntax exception occurred. + */ + public abstract CloudBlobWrapper getPageBlobReference(String relativePath) + throws URISyntaxException, StorageException; } - + + /** - * A thin wrapper over the {@link CloudBlockBlob} class that simply redirects - * calls to the real object except in unit tests. + * A thin wrapper over the {@link CloudBlob} class that simply redirects calls + * to the real object except in unit tests. */ @InterfaceAudience.Private - public abstract static class CloudBlockBlobWrapper implements ListBlobItem { + public interface CloudBlobWrapper extends ListBlobItem { /** * Returns the URI for this blob. * * @return A java.net.URI object that represents the URI for * the blob. */ - public abstract URI getUri(); + URI getUri(); /** * Returns the metadata for the blob. @@ -347,7 +366,7 @@ abstract class StorageInterface { * @return A java.util.HashMap object that represents the * metadata for the blob. */ - public abstract HashMap getMetadata(); + HashMap getMetadata(); /** * Sets the metadata for the blob. @@ -356,37 +375,64 @@ abstract class StorageInterface { * A java.util.HashMap object that contains the * metadata being assigned to the blob. */ - public abstract void setMetadata(HashMap metadata); + void setMetadata(HashMap metadata); /** - * Copies an existing blob's contents, properties, and metadata to this - * instance of the CloudBlob class, using the specified - * operation context. - * - * @param sourceBlob - * A CloudBlob object that represents the source blob - * to copy. + * Copies an existing blob's contents, properties, and metadata to this instance of the CloudBlob + * class, using the specified operation context. + * + * @param source + * A java.net.URI The URI of a source blob. * @param opContext - * An {@link OperationContext} object that represents the context - * for the current operation. This object is used to track requests - * to the storage service, and to provide additional runtime - * information about the operation. - * + * An {@link OperationContext} object that represents the context for the current operation. This object + * is used to track requests to the storage service, and to provide additional runtime information about + * the operation. + * * @throws StorageException - * If a storage service error occurred. + * If a storage service error occurred. * @throws URISyntaxException - * + * */ - public abstract void startCopyFromBlob(CloudBlockBlobWrapper sourceBlob, - OperationContext opContext) throws StorageException, URISyntaxException; - + public abstract void startCopyFromBlob(URI source, + OperationContext opContext) + throws StorageException, URISyntaxException; + /** * Returns the blob's copy state. * * @return A {@link CopyState} object that represents the copy state of the * blob. */ - public abstract CopyState getCopyState(); + CopyState getCopyState(); + + /** + * Downloads a range of bytes from the blob to the given byte buffer, using the specified request options and + * operation context. + * + * @param offset + * The byte offset to use as the starting point for the source. + * @param length + * The number of bytes to read. + * @param buffer + * The byte buffer, as an array of bytes, to which the blob bytes are downloaded. + * @param bufferOffset + * The byte offset to use as the starting point for the target. + * @param options + * A {@link BlobRequestOptions} object that specifies any additional options for the request. Specifying + * null will use the default request options from the associated service client ( + * {@link CloudBlobClient}). + * @param opContext + * An {@link OperationContext} object that represents the context for the current operation. This object + * is used to track requests to the storage service, and to provide additional runtime information about + * the operation. + * + * @throws StorageException + * If a storage service error occurred. + */ + void downloadRange(final long offset, final long length, + final OutputStream outStream, final BlobRequestOptions options, + final OperationContext opContext) + throws StorageException, IOException; /** * Deletes the blob using the specified operation context. @@ -407,7 +453,7 @@ abstract class StorageInterface { * @throws StorageException * If a storage service error occurred. */ - public abstract void delete(OperationContext opContext) + void delete(OperationContext opContext, SelfRenewingLease lease) throws StorageException; /** @@ -419,13 +465,13 @@ abstract class StorageInterface { * to the storage service, and to provide additional runtime * information about the operation. * - * @return true if the blob exists, other wise + * @return true if the blob exists, otherwise * false. * * @throws StorageException - * f a storage service error occurred. + * If a storage service error occurred. */ - public abstract boolean exists(OperationContext opContext) + boolean exists(OperationContext opContext) throws StorageException; /** @@ -446,7 +492,7 @@ abstract class StorageInterface { * @throws StorageException * If a storage service error occurred. */ - public abstract void downloadAttributes(OperationContext opContext) + void downloadAttributes(OperationContext opContext) throws StorageException; /** @@ -455,7 +501,7 @@ abstract class StorageInterface { * @return A {@link BlobProperties} object that represents the properties of * the blob. */ - public abstract BlobProperties getProperties(); + BlobProperties getProperties(); /** * Opens a blob input stream to download the blob using the specified @@ -476,49 +522,10 @@ abstract class StorageInterface { * @throws StorageException * If a storage service error occurred. */ - public abstract InputStream openInputStream(BlobRequestOptions options, + InputStream openInputStream(BlobRequestOptions options, OperationContext opContext) throws StorageException; /** - * Creates and opens an output stream to write data to the block blob using - * the specified operation context. - * - * @param opContext - * An {@link OperationContext} object that represents the context - * for the current operation. This object is used to track requests - * to the storage service, and to provide additional runtime - * information about the operation. - * - * @return A {@link BlobOutputStream} object used to write data to the blob. - * - * @throws StorageException - * If a storage service error occurred. - */ - public abstract OutputStream openOutputStream(BlobRequestOptions options, - OperationContext opContext) throws StorageException; - - /** - * Uploads the source stream data to the blob, using the specified operation - * context. - * - * @param sourceStream - * An InputStream object that represents the input - * stream to write to the block blob. - * @param opContext - * An {@link OperationContext} object that represents the context - * for the current operation. This object is used to track requests - * to the storage service, and to provide additional runtime - * information about the operation. - * - * @throws IOException - * If an I/O error occurred. - * @throws StorageException - * If a storage service error occurred. - */ - public abstract void upload(InputStream sourceStream, - OperationContext opContext) throws StorageException, IOException; - - /** * Uploads the blob's metadata to the storage service using the specified * lease ID, request options, and operation context. * @@ -531,12 +538,15 @@ abstract class StorageInterface { * @throws StorageException * If a storage service error occurred. */ - public abstract void uploadMetadata(OperationContext opContext) + void uploadMetadata(OperationContext opContext) throws StorageException; - public abstract void uploadProperties(OperationContext opContext) + void uploadProperties(OperationContext opContext, + SelfRenewingLease lease) throws StorageException; + SelfRenewingLease acquireLease() throws StorageException; + /** * Sets the minimum read block size to use with this Blob. * @@ -545,7 +555,7 @@ abstract class StorageInterface { * while using a {@link BlobInputStream} object, ranging from 512 * bytes to 64 MB, inclusive. */ - public abstract void setStreamMinimumReadSizeInBytes( + void setStreamMinimumReadSizeInBytes( int minimumReadSizeBytes); /** @@ -560,7 +570,121 @@ abstract class StorageInterface { * If writeBlockSizeInBytes is less than 1 MB or * greater than 4 MB. */ - public abstract void setWriteBlockSizeInBytes(int writeBlockSizeBytes); + void setWriteBlockSizeInBytes(int writeBlockSizeBytes); + + CloudBlob getBlob(); + } + + /** + * A thin wrapper over the {@link CloudBlockBlob} class that simply redirects calls + * to the real object except in unit tests. + */ + public abstract interface CloudBlockBlobWrapper + extends CloudBlobWrapper { + /** + * Creates and opens an output stream to write data to the block blob using the specified + * operation context. + * + * @param opContext + * An {@link OperationContext} object that represents the context for the current operation. This object + * is used to track requests to the storage service, and to provide additional runtime information about + * the operation. + * + * @return A {@link BlobOutputStream} object used to write data to the blob. + * + * @throws StorageException + * If a storage service error occurred. + */ + OutputStream openOutputStream( + BlobRequestOptions options, + OperationContext opContext) throws StorageException; + } + + /** + * A thin wrapper over the {@link CloudPageBlob} class that simply redirects calls + * to the real object except in unit tests. + */ + public abstract interface CloudPageBlobWrapper + extends CloudBlobWrapper { + /** + * Creates a page blob using the specified request options and operation context. + * + * @param length + * The size, in bytes, of the page blob. + * @param options + * A {@link BlobRequestOptions} object that specifies any additional options for the request. Specifying + * null will use the default request options from the associated service client ( + * {@link CloudBlobClient}). + * @param opContext + * An {@link OperationContext} object that represents the context for the current operation. This object + * is used to track requests to the storage service, and to provide additional runtime information about + * the operation. + * + * @throws IllegalArgumentException + * If the length is not a multiple of 512. + * + * @throws StorageException + * If a storage service error occurred. + */ + void create(final long length, BlobRequestOptions options, + OperationContext opContext) throws StorageException; + + + /** + * Uploads a range of contiguous pages, up to 4 MB in size, at the specified offset in the page blob, using the + * specified lease ID, request options, and operation context. + * + * @param sourceStream + * An InputStream object that represents the input stream to write to the page blob. + * @param offset + * The offset, in number of bytes, at which to begin writing the data. This value must be a multiple of + * 512. + * @param length + * The length, in bytes, of the data to write. This value must be a multiple of 512. + * @param options + * A {@link BlobRequestOptions} object that specifies any additional options for the request. Specifying + * null will use the default request options from the associated service client ( + * {@link CloudBlobClient}). + * @param opContext + * An {@link OperationContext} object that represents the context for the current operation. This object + * is used to track requests to the storage service, and to provide additional runtime information about + * the operation. + * + * @throws IllegalArgumentException + * If the offset or length are not multiples of 512, or if the length is greater than 4 MB. + * @throws IOException + * If an I/O exception occurred. + * @throws StorageException + * If a storage service error occurred. + */ + void uploadPages(final InputStream sourceStream, final long offset, + final long length, BlobRequestOptions options, + OperationContext opContext) throws StorageException, IOException; + + /** + * Returns a collection of page ranges and their starting and ending byte offsets using the specified request + * options and operation context. + * + * @param options + * A {@link BlobRequestOptions} object that specifies any additional options for the request. Specifying + * null will use the default request options from the associated service client ( + * {@link CloudBlobClient}). + * @param opContext + * An {@link OperationContext} object that represents the context for the current operation. This object + * is used to track requests to the storage service, and to provide additional runtime information about + * the operation. + * + * @return An ArrayList object that represents the set of page ranges and their starting and ending + * byte offsets. + * + * @throws StorageException + * If a storage service error occurred. + */ + ArrayList downloadPageRanges(BlobRequestOptions options, + OperationContext opContext) throws StorageException; + + void uploadMetadata(OperationContext opContext) + throws StorageException; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a737026/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java index 935bf71..e44823c 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java @@ -23,6 +23,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.net.URI; import java.net.URISyntaxException; +import java.util.ArrayList; import java.util.EnumSet; import java.util.HashMap; import java.util.Iterator; @@ -39,13 +40,16 @@ import com.microsoft.windowsazure.storage.StorageUri; import com.microsoft.windowsazure.storage.blob.BlobListingDetails; import com.microsoft.windowsazure.storage.blob.BlobProperties; import com.microsoft.windowsazure.storage.blob.BlobRequestOptions; +import com.microsoft.windowsazure.storage.blob.CloudBlob; import com.microsoft.windowsazure.storage.blob.CloudBlobClient; import com.microsoft.windowsazure.storage.blob.CloudBlobContainer; import com.microsoft.windowsazure.storage.blob.CloudBlobDirectory; import com.microsoft.windowsazure.storage.blob.CloudBlockBlob; +import com.microsoft.windowsazure.storage.blob.CloudPageBlob; import com.microsoft.windowsazure.storage.blob.CopyState; import com.microsoft.windowsazure.storage.blob.DeleteSnapshotsOption; import com.microsoft.windowsazure.storage.blob.ListBlobItem; +import com.microsoft.windowsazure.storage.blob.PageRange; /** * A real implementation of the Azure interaction layer that just redirects @@ -129,6 +133,8 @@ class StorageInterfaceImpl extends StorageInterface { return new CloudBlobDirectoryWrapperImpl((CloudBlobDirectory) unwrapped); } else if (unwrapped instanceof CloudBlockBlob) { return new CloudBlockBlobWrapperImpl((CloudBlockBlob) unwrapped); + } else if (unwrapped instanceof CloudPageBlob) { + return new CloudPageBlobWrapperImpl((CloudPageBlob) unwrapped); } else { return unwrapped; } @@ -244,129 +250,217 @@ class StorageInterfaceImpl extends StorageInterface { } @Override - public CloudBlockBlobWrapper getBlockBlobReference(String relativePath) + public CloudBlobWrapper getBlockBlobReference(String relativePath) throws URISyntaxException, StorageException { - return new CloudBlockBlobWrapperImpl( - container.getBlockBlobReference(relativePath)); + return new CloudBlockBlobWrapperImpl(container.getBlockBlobReference(relativePath)); } + + @Override + public CloudBlobWrapper getPageBlobReference(String relativePath) + throws URISyntaxException, StorageException { + return new CloudPageBlobWrapperImpl( + container.getPageBlobReference(relativePath)); + } + } + + abstract static class CloudBlobWrapperImpl implements CloudBlobWrapper { + private final CloudBlob blob; - // - // CloudBlockBlobWrapperImpl - // - @InterfaceAudience.Private - static class CloudBlockBlobWrapperImpl extends CloudBlockBlobWrapper { - private final CloudBlockBlob blob; + @Override + public CloudBlob getBlob() { + return blob; + } public URI getUri() { - return blob.getUri(); + return getBlob().getUri(); } - public CloudBlockBlobWrapperImpl(CloudBlockBlob blob) { + protected CloudBlobWrapperImpl(CloudBlob blob) { this.blob = blob; } @Override public HashMap getMetadata() { - return blob.getMetadata(); + return getBlob().getMetadata(); } @Override - public void startCopyFromBlob(CloudBlockBlobWrapper sourceBlob, - OperationContext opContext) throws StorageException, URISyntaxException { - - blob.startCopyFromBlob(((CloudBlockBlobWrapperImpl) sourceBlob).blob, - null, null, null, opContext); - + public void delete(OperationContext opContext, SelfRenewingLease lease) + throws StorageException { + getBlob().delete(DeleteSnapshotsOption.NONE, getLeaseCondition(lease), + null, opContext); } - @Override - public void delete(OperationContext opContext) throws StorageException { - blob.delete(DeleteSnapshotsOption.NONE, null, null, opContext); + /** + * Return and access condition for this lease, or else null if + * there's no lease. + */ + private AccessCondition getLeaseCondition(SelfRenewingLease lease) { + AccessCondition leaseCondition = null; + if (lease != null) { + leaseCondition = AccessCondition.generateLeaseCondition(lease.getLeaseID()); + } + return leaseCondition; } @Override - public boolean exists(OperationContext opContext) throws StorageException { - return blob.exists(null, null, opContext); + public boolean exists(OperationContext opContext) + throws StorageException { + return getBlob().exists(null, null, opContext); } @Override - public void downloadAttributes(OperationContext opContext) - throws StorageException { - blob.downloadAttributes(null, null, opContext); + public void downloadAttributes( + OperationContext opContext) throws StorageException { + getBlob().downloadAttributes(null, null, opContext); } @Override public BlobProperties getProperties() { - return blob.getProperties(); + return getBlob().getProperties(); } @Override public void setMetadata(HashMap metadata) { - blob.setMetadata(metadata); + getBlob().setMetadata(metadata); } @Override - public InputStream openInputStream(BlobRequestOptions options, + public InputStream openInputStream( + BlobRequestOptions options, OperationContext opContext) throws StorageException { - return blob.openInputStream(null, options, opContext); + return getBlob().openInputStream(null, options, opContext); } - @Override - public OutputStream openOutputStream(BlobRequestOptions options, + public OutputStream openOutputStream( + BlobRequestOptions options, OperationContext opContext) throws StorageException { - return blob.openOutputStream(null, options, opContext); + return ((CloudBlockBlob) getBlob()).openOutputStream(null, options, opContext); } - @Override public void upload(InputStream sourceStream, OperationContext opContext) throws StorageException, IOException { - blob.upload(sourceStream, 0, null, null, opContext); + getBlob().upload(sourceStream, 0, null, null, opContext); } @Override public CloudBlobContainer getContainer() throws URISyntaxException, StorageException { - return blob.getContainer(); + return getBlob().getContainer(); } @Override public CloudBlobDirectory getParent() throws URISyntaxException, StorageException { - return blob.getParent(); + return getBlob().getParent(); } @Override public void uploadMetadata(OperationContext opContext) throws StorageException { - blob.uploadMetadata(null, null, opContext); + getBlob().uploadMetadata(null, null, opContext); } - @Override - public void uploadProperties(OperationContext opContext) + public void uploadProperties(OperationContext opContext, SelfRenewingLease lease) throws StorageException { - blob.uploadProperties(null, null, opContext); + + // Include lease in request if lease not null. + getBlob().uploadProperties(getLeaseCondition(lease), null, opContext); } @Override public void setStreamMinimumReadSizeInBytes(int minimumReadSizeBytes) { - blob.setStreamMinimumReadSizeInBytes(minimumReadSizeBytes); + getBlob().setStreamMinimumReadSizeInBytes(minimumReadSizeBytes); } @Override public void setWriteBlockSizeInBytes(int writeBlockSizeBytes) { - blob.setStreamWriteSizeInBytes(writeBlockSizeBytes); + getBlob().setStreamWriteSizeInBytes(writeBlockSizeBytes); } @Override public StorageUri getStorageUri() { - return blob.getStorageUri(); + return getBlob().getStorageUri(); } @Override public CopyState getCopyState() { - return blob.getCopyState(); + return getBlob().getCopyState(); + } + + @Override + public void startCopyFromBlob(URI source, + OperationContext opContext) + throws StorageException, URISyntaxException { + getBlob().startCopyFromBlob(source, + null, null, null, opContext); + } + + @Override + public void downloadRange(long offset, long length, OutputStream outStream, + BlobRequestOptions options, OperationContext opContext) + throws StorageException, IOException { + + getBlob().downloadRange(offset, length, outStream, null, options, opContext); + } + + @Override + public SelfRenewingLease acquireLease() throws StorageException { + return new SelfRenewingLease(this); + } + } + + + // + // CloudBlockBlobWrapperImpl + // + + static class CloudBlockBlobWrapperImpl extends CloudBlobWrapperImpl implements CloudBlockBlobWrapper { + public CloudBlockBlobWrapperImpl(CloudBlockBlob blob) { + super(blob); + } + + public OutputStream openOutputStream( + BlobRequestOptions options, + OperationContext opContext) throws StorageException { + return ((CloudBlockBlob) getBlob()).openOutputStream(null, options, opContext); + } + + public void upload(InputStream sourceStream, OperationContext opContext) + throws StorageException, IOException { + getBlob().upload(sourceStream, 0, null, null, opContext); + } + + public void uploadProperties(OperationContext opContext) + throws StorageException { + getBlob().uploadProperties(null, null, opContext); + } + + } + + static class CloudPageBlobWrapperImpl extends CloudBlobWrapperImpl implements CloudPageBlobWrapper { + public CloudPageBlobWrapperImpl(CloudPageBlob blob) { + super(blob); + } + + public void create(final long length, BlobRequestOptions options, + OperationContext opContext) throws StorageException { + ((CloudPageBlob) getBlob()).create(length, null, options, opContext); + } + + public void uploadPages(final InputStream sourceStream, final long offset, + final long length, BlobRequestOptions options, OperationContext opContext) + throws StorageException, IOException { + ((CloudPageBlob) getBlob()).uploadPages(sourceStream, offset, length, null, + options, opContext); + } + + public ArrayList downloadPageRanges(BlobRequestOptions options, + OperationContext opContext) throws StorageException { + return ((CloudPageBlob) getBlob()).downloadPageRanges( + null, options, opContext); } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a737026/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SyncableDataOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SyncableDataOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SyncableDataOutputStream.java new file mode 100644 index 0000000..9bec7a5 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SyncableDataOutputStream.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azure; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStream; + +import org.apache.hadoop.fs.Syncable; + +/** + * Support the Syncable interface on top of a DataOutputStream. + * This allows passing the sync/hflush/hsync calls through to the + * wrapped stream passed in to the constructor. This is required + * for HBase when wrapping a PageBlobOutputStream used as a write-ahead log. + */ +public class SyncableDataOutputStream extends DataOutputStream implements Syncable { + + public SyncableDataOutputStream(OutputStream out) { + super(out); + } + + @Override + public void hflush() throws IOException { + if (out instanceof Syncable) { + ((Syncable) out).hflush(); + } else { + out.flush(); + } + } + + @Override + public void hsync() throws IOException { + if (out instanceof Syncable) { + ((Syncable) out).hsync(); + } else { + out.flush(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a737026/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/Wasb.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/Wasb.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/Wasb.java index e098cef..dd354d7 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/Wasb.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/Wasb.java @@ -27,7 +27,6 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.DelegateToFileSystem; - /** * WASB implementation of AbstractFileSystem. * This impl delegates to the old FileSystem http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a737026/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/AzureFileSystemInstrumentation.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/AzureFileSystemInstrumentation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/AzureFileSystemInstrumentation.java index e389d7c..a08ad71 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/AzureFileSystemInstrumentation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/AzureFileSystemInstrumentation.java @@ -41,11 +41,11 @@ import org.apache.hadoop.metrics2.lib.MutableGaugeLong; @InterfaceAudience.Public @InterfaceStability.Evolving public final class AzureFileSystemInstrumentation implements MetricsSource { - + public static final String METRIC_TAG_FILESYSTEM_ID = "wasbFileSystemId"; public static final String METRIC_TAG_ACCOUNT_NAME = "accountName"; public static final String METRIC_TAG_CONTAINTER_NAME = "containerName"; - + public static final String WASB_WEB_RESPONSES = "wasb_web_responses"; public static final String WASB_BYTES_WRITTEN = "wasb_bytes_written_last_second"; @@ -381,7 +381,6 @@ public final class AzureFileSystemInstrumentation implements MetricsSource { */ public long getCurrentMaximumDownloadBandwidth() { return currentMaximumDownloadBytesPerSecond; - } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a737026/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/ResponseReceivedMetricUpdater.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/ResponseReceivedMetricUpdater.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/ResponseReceivedMetricUpdater.java index e3f5d44..676adb9 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/ResponseReceivedMetricUpdater.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/ResponseReceivedMetricUpdater.java @@ -33,8 +33,7 @@ import com.microsoft.windowsazure.storage.StorageEvent; /** * An event listener to the ResponseReceived event from Azure Storage that will - * update metrics appropriately. - * + * update metrics appropriately when it gets that event. */ @InterfaceAudience.Private public final class ResponseReceivedMetricUpdater extends StorageEvent { @@ -43,7 +42,7 @@ public final class ResponseReceivedMetricUpdater extends StorageEvent allMetrics = new ConcurrentLinkedQueue(); - + private static boolean metricsConfigSaved = false; private AzureBlobStorageTestAccount(NativeAzureFileSystem fs, - CloudStorageAccount account, CloudBlobContainer container) { + CloudStorageAccount account, + CloudBlobContainer container) { this.account = account; this.container = container; this.fs = fs; @@ -158,6 +162,14 @@ public final class AzureBlobStorageTestAccount { return toMockUri(path.toUri().getRawPath().substring(1)); } + public static Path pageBlobPath() { + return new Path("/" + DEFAULT_PAGE_BLOB_DIRECTORY); + } + + public static Path pageBlobPath(String fileName) { + return new Path(pageBlobPath(), fileName); + } + public Number getLatestMetricValue(String metricName, Number defaultValue) throws IndexOutOfBoundsException{ boolean found = false; @@ -206,8 +218,10 @@ public final class AzureBlobStorageTestAccount { * The blob key (no initial slash). * @return The blob reference. */ - public CloudBlockBlob getBlobReference(String blobKey) throws Exception { - return container.getBlockBlobReference(String.format(blobKey)); + public CloudBlockBlob getBlobReference(String blobKey) + throws Exception { + return container.getBlockBlobReference( + String.format(blobKey)); } /** @@ -233,45 +247,79 @@ public final class AzureBlobStorageTestAccount { getBlobReference(blobKey).releaseLease(accessCondition); } + private static void saveMetricsConfigFile() { + if (!metricsConfigSaved) { + new org.apache.hadoop.metrics2.impl.ConfigBuilder() + .add("azure-file-system.sink.azuretestcollector.class", + StandardCollector.class.getName()) + .save("hadoop-metrics2-azure-file-system.properties"); + metricsConfigSaved = true; + } + } + public static AzureBlobStorageTestAccount createMock() throws Exception { return createMock(new Configuration()); } - public static AzureBlobStorageTestAccount createMock(Configuration conf) - throws Exception { + public static AzureBlobStorageTestAccount createMock(Configuration conf) throws Exception { + saveMetricsConfigFile(); + configurePageBlobDir(conf); + configureAtomicRenameDir(conf); AzureNativeFileSystemStore store = new AzureNativeFileSystemStore(); MockStorageInterface mockStorage = new MockStorageInterface(); store.setAzureStorageInteractionLayer(mockStorage); NativeAzureFileSystem fs = new NativeAzureFileSystem(store); - addWasbToConfiguration(conf); setMockAccountKey(conf); // register the fs provider. fs.initialize(new URI(MOCK_WASB_URI), conf); - AzureBlobStorageTestAccount testAcct = new AzureBlobStorageTestAccount(fs, - mockStorage); + AzureBlobStorageTestAccount testAcct = + new AzureBlobStorageTestAccount(fs, mockStorage); return testAcct; } /** + * Set the page blob directories configuration to the default if it is not + * already set. Some tests may set it differently (e.g. the page blob + * tests in TestNativeAzureFSPageBlobLive). + * @param conf The configuration to conditionally update. + */ + private static void configurePageBlobDir(Configuration conf) { + if (conf.get(AzureNativeFileSystemStore.KEY_PAGE_BLOB_DIRECTORIES) == null) { + conf.set(AzureNativeFileSystemStore.KEY_PAGE_BLOB_DIRECTORIES, + "/" + DEFAULT_PAGE_BLOB_DIRECTORY); + } + } + + /** Do the same for the atomic rename directories configuration */ + private static void configureAtomicRenameDir(Configuration conf) { + if (conf.get(AzureNativeFileSystemStore.KEY_ATOMIC_RENAME_DIRECTORIES) == null) { + conf.set(AzureNativeFileSystemStore.KEY_ATOMIC_RENAME_DIRECTORIES, + DEFAULT_ATOMIC_RENAME_DIRECTORIES); + } + } + + /** * Creates a test account that goes against the storage emulator. * * @return The test account, or null if the emulator isn't setup. */ public static AzureBlobStorageTestAccount createForEmulator() throws Exception { + saveMetricsConfigFile(); NativeAzureFileSystem fs = null; CloudBlobContainer container = null; Configuration conf = createTestConfiguration(); if (!conf.getBoolean(USE_EMULATOR_PROPERTY_NAME, false)) { // Not configured to test against the storage emulator. - System.out.println("Skipping emulator Azure test because configuration " - + "doesn't indicate that it's running." - + " Please see README.txt for guidance."); + System.out + .println("Skipping emulator Azure test because configuration " + + "doesn't indicate that it's running." + + " Please see RunningLiveWasbTests.txt for guidance."); return null; } - CloudStorageAccount account = CloudStorageAccount - .getDevelopmentStorageAccount(); + CloudStorageAccount account = + CloudStorageAccount.getDevelopmentStorageAccount(); fs = new NativeAzureFileSystem(); String containerName = String.format("wasbtests-%s-%tQ", System.getProperty("user.name"), new Date()); @@ -285,14 +333,18 @@ public final class AzureBlobStorageTestAccount { fs.initialize(accountUri, conf); // Create test account initializing the appropriate member variables. - AzureBlobStorageTestAccount testAcct = new AzureBlobStorageTestAccount(fs, - account, container); + // + AzureBlobStorageTestAccount testAcct = + new AzureBlobStorageTestAccount(fs, account, container); return testAcct; } public static AzureBlobStorageTestAccount createOutOfBandStore( int uploadBlockSize, int downloadBlockSize) throws Exception { + + saveMetricsConfigFile(); + CloudBlobContainer container = null; Configuration conf = createTestConfiguration(); CloudStorageAccount account = createTestAccount(conf); @@ -337,8 +389,9 @@ public final class AzureBlobStorageTestAccount { testStorage.initialize(accountUri, conf, instrumentation); // Create test account initializing the appropriate member variables. - AzureBlobStorageTestAccount testAcct = new AzureBlobStorageTestAccount( - testStorage, account, container); + // + AzureBlobStorageTestAccount testAcct = + new AzureBlobStorageTestAccount(testStorage, account, container); return testAcct; } @@ -416,11 +469,11 @@ public final class AzureBlobStorageTestAccount { } } - private static Configuration createTestConfiguration() { + public static Configuration createTestConfiguration() { return createTestConfiguration(null); } - protected static Configuration createTestConfiguration(Configuration conf) { + private static Configuration createTestConfiguration(Configuration conf) { if (conf == null) { conf = new Configuration(); } @@ -429,16 +482,9 @@ public final class AzureBlobStorageTestAccount { return conf; } - // for programmatic setting of the wasb configuration. - // note that tests can also get the - public static void addWasbToConfiguration(Configuration conf) { - conf.set("fs.wasb.impl", "org.apache.hadoop.fs.azure.NativeAzureFileSystem"); - conf.set("fs.wasbs.impl", - "org.apache.hadoop.fs.azure.NativeAzureFileSystem"); - } - - static CloudStorageAccount createTestAccount() throws URISyntaxException, - KeyProviderException { + static CloudStorageAccount createTestAccount() + throws URISyntaxException, KeyProviderException + { return createTestAccount(createTestConfiguration()); } @@ -447,8 +493,8 @@ public final class AzureBlobStorageTestAccount { String testAccountName = conf.get(TEST_ACCOUNT_NAME_PROPERTY_NAME); if (testAccountName == null) { System.out - .println("Skipping live Azure test because of missing test account." - + " Please see README.txt for guidance."); + .println("Skipping live Azure test because of missing test account." + + " Please see RunningLiveWasbTests.txt for guidance."); return null; } return createStorageAccount(testAccountName, conf, false); @@ -466,9 +512,12 @@ public final class AzureBlobStorageTestAccount { public static AzureBlobStorageTestAccount create(String containerNameSuffix, EnumSet createOptions, Configuration initialConfiguration) throws Exception { + saveMetricsConfigFile(); NativeAzureFileSystem fs = null; CloudBlobContainer container = null; Configuration conf = createTestConfiguration(initialConfiguration); + configurePageBlobDir(conf); + configureAtomicRenameDir(conf); CloudStorageAccount account = createTestAccount(conf); if (account == null) { return null; @@ -510,15 +559,18 @@ public final class AzureBlobStorageTestAccount { fs.initialize(accountUri, conf); // Create test account initializing the appropriate member variables. - AzureBlobStorageTestAccount testAcct = new AzureBlobStorageTestAccount(fs, - account, container); + // + AzureBlobStorageTestAccount testAcct = + new AzureBlobStorageTestAccount(fs, account, container); return testAcct; } private static String generateContainerName() throws Exception { - String containerName = String.format("wasbtests-%s-%tQ", - System.getProperty("user.name"), new Date()); + String containerName = + String.format ("wasbtests-%s-%tQ", + System.getProperty("user.name"), + new Date()); return containerName; } @@ -548,12 +600,16 @@ public final class AzureBlobStorageTestAccount { if (readonly) { // Set READ permissions - sasPolicy.setPermissions(EnumSet.of(SharedAccessBlobPermissions.READ, + sasPolicy.setPermissions(EnumSet.of( + SharedAccessBlobPermissions.READ, SharedAccessBlobPermissions.LIST)); } else { // Set READ and WRITE permissions. - sasPolicy.setPermissions(EnumSet.of(SharedAccessBlobPermissions.READ, - SharedAccessBlobPermissions.WRITE, SharedAccessBlobPermissions.LIST)); + // + sasPolicy.setPermissions(EnumSet.of( + SharedAccessBlobPermissions.READ, + SharedAccessBlobPermissions.WRITE, + SharedAccessBlobPermissions.LIST)); } // Create the container permissions. @@ -590,8 +646,11 @@ public final class AzureBlobStorageTestAccount { SharedAccessBlobPolicy sasPolicy = new SharedAccessBlobPolicy(); // Set READ and WRITE permissions. - sasPolicy.setPermissions(EnumSet.of(SharedAccessBlobPermissions.READ, - SharedAccessBlobPermissions.WRITE, SharedAccessBlobPermissions.LIST, + // + sasPolicy.setPermissions(EnumSet.of( + SharedAccessBlobPermissions.READ, + SharedAccessBlobPermissions.WRITE, + SharedAccessBlobPermissions.LIST, SharedAccessBlobPermissions.DELETE)); // Create the container permissions. @@ -725,8 +784,9 @@ public final class AzureBlobStorageTestAccount { // Create test account initializing the appropriate member variables. // Set the container value to null for the default root container. - AzureBlobStorageTestAccount testAcct = new AzureBlobStorageTestAccount(fs, - account, blobRoot); + // + AzureBlobStorageTestAccount testAcct = new AzureBlobStorageTestAccount( + fs, account, blobRoot); // Return to caller with test account. return testAcct; @@ -805,5 +865,12 @@ public final class AzureBlobStorageTestAccount { public void flush() { } } - + + public void setPageBlobDirectory(String directory) { + this.pageBlobDirectory = directory; + } + + public String getPageBlobDirectory() { + return pageBlobDirectory; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a737026/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/InMemoryBlockBlobStore.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/InMemoryBlockBlobStore.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/InMemoryBlockBlobStore.java index ab35961..b8971c4 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/InMemoryBlockBlobStore.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/InMemoryBlockBlobStore.java @@ -41,12 +41,15 @@ public class InMemoryBlockBlobStore { private final String key; private final HashMap metadata; private final int contentLength; + private final boolean isPageBlob; + ListBlobEntry(String key, HashMap metadata, - int contentLength) { + int contentLength, boolean isPageBlob) { this.key = key; this.metadata = metadata; this.contentLength = contentLength; + this.isPageBlob = isPageBlob; } public String getKey() { @@ -60,6 +63,10 @@ public class InMemoryBlockBlobStore { public int getContentLength() { return contentLength; } + + public boolean isPageBlob() { + return isPageBlob; + } } /** @@ -77,10 +84,13 @@ public class InMemoryBlockBlobStore { ArrayList list = new ArrayList(); for (Map.Entry entry : blobs.entrySet()) { if (entry.getKey().startsWith(prefix)) { - list.add(new ListBlobEntry(entry.getKey(), - includeMetadata ? new HashMap( - entry.getValue().metadata) : null, - entry.getValue().content.length)); + list.add(new ListBlobEntry( + entry.getKey(), + includeMetadata ? + new HashMap(entry.getValue().metadata) : + null, + entry.getValue().content.length, + entry.getValue().isPageBlob)); } } return list; @@ -92,19 +102,49 @@ public class InMemoryBlockBlobStore { @SuppressWarnings("unchecked") public synchronized void setContent(String key, byte[] value, + HashMap metadata, boolean isPageBlob, + long length) { + blobs.put(key, new Entry(value, (HashMap)metadata.clone(), + isPageBlob, length)); + } + + @SuppressWarnings("unchecked") + public synchronized void setMetadata(String key, HashMap metadata) { - blobs - .put(key, new Entry(value, (HashMap) metadata.clone())); + blobs.get(key).metadata = (HashMap) metadata.clone(); } - public OutputStream upload(final String key, + public OutputStream uploadBlockBlob(final String key, final HashMap metadata) { - setContent(key, new byte[0], metadata); + setContent(key, new byte[0], metadata, false, 0); + return new ByteArrayOutputStream() { + @Override + public void flush() + throws IOException { + super.flush(); + byte[] tempBytes = toByteArray(); + setContent(key, tempBytes, metadata, false, tempBytes.length); + } + @Override + public void close() + throws IOException { + super.close(); + byte[] tempBytes = toByteArray(); + setContent(key, tempBytes, metadata, false, tempBytes.length); + } + }; + } + + public OutputStream uploadPageBlob(final String key, + final HashMap metadata, + final long length) { + setContent(key, new byte[0], metadata, true, length); return new ByteArrayOutputStream() { @Override - public void flush() throws IOException { + public void flush() + throws IOException { super.flush(); - setContent(key, toByteArray(), metadata); + setContent(key, toByteArray(), metadata, true, length); } }; } @@ -137,10 +177,16 @@ public class InMemoryBlockBlobStore { private static class Entry { private byte[] content; private HashMap metadata; + private boolean isPageBlob; + @SuppressWarnings("unused") // TODO: use it + private long length; - public Entry(byte[] content, HashMap metadata) { + public Entry(byte[] content, HashMap metadata, + boolean isPageBlob, long length) { this.content = content; this.metadata = metadata; + this.isPageBlob = isPageBlob; + this.length = length; } } }