Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C351A1050E for ; Fri, 27 Sep 2013 22:51:43 +0000 (UTC) Received: (qmail 22577 invoked by uid 500); 27 Sep 2013 22:51:43 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 22524 invoked by uid 500); 27 Sep 2013 22:51:43 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 22517 invoked by uid 99); 27 Sep 2013 22:51:43 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 27 Sep 2013 22:51:43 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 27 Sep 2013 22:51:36 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 325C3238896F; Fri, 27 Sep 2013 22:51:14 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1527113 - in /hadoop/common/trunk/hadoop-common-project/hadoop-common/src: main/java/org/apache/hadoop/fs/ main/java/org/apache/hadoop/io/ main/java/org/apache/hadoop/util/ test/java/org/apache/hadoop/util/ Date: Fri, 27 Sep 2013 22:51:13 -0000 To: common-commits@hadoop.apache.org From: cnauroth@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20130927225114.325C3238896F@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: cnauroth Date: Fri Sep 27 22:51:12 2013 New Revision: 1527113 URL: http://svn.apache.org/r1527113 Log: HDFS-5260. Merge zero-copy memory-mapped HDFS client reads to trunk and branch-2. Contributed by Chris Nauroth. Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferUtil.java hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HasEnhancedByteBufferAccess.java hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ReadOption.java hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ZeroCopyUnavailableException.java hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ByteBufferPool.java hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/IdentityHashStore.java hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestIdentityHashStore.java Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputStream.java Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferUtil.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferUtil.java?rev=1527113&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferUtil.java (added) +++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferUtil.java Fri Sep 27 22:51:12 2013 @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.io.ByteBufferPool; + +import com.google.common.base.Preconditions; + +@InterfaceAudience.Private +@InterfaceStability.Evolving +public final class ByteBufferUtil { + + /** + * Determine if a stream can do a byte buffer read via read(ByteBuffer buf) + */ + private static boolean streamHasByteBufferRead(InputStream stream) { + if (!(stream instanceof ByteBufferReadable)) { + return false; + } + if (!(stream instanceof FSDataInputStream)) { + return true; + } + return ((FSDataInputStream)stream).getWrappedStream() + instanceof ByteBufferReadable; + } + + /** + * Perform a fallback read. + */ + public static ByteBuffer fallbackRead( + InputStream stream, ByteBufferPool bufferPool, int maxLength) + throws IOException { + if (bufferPool == null) { + throw new UnsupportedOperationException("zero-copy reads " + + "were not available, and you did not provide a fallback " + + "ByteBufferPool."); + } + boolean useDirect = streamHasByteBufferRead(stream); + ByteBuffer buffer = bufferPool.getBuffer(useDirect, maxLength); + if (buffer == null) { + throw new UnsupportedOperationException("zero-copy reads " + + "were not available, and the ByteBufferPool did not provide " + + "us with " + (useDirect ? "a direct" : "an indirect") + + "buffer."); + } + Preconditions.checkState(buffer.capacity() > 0); + Preconditions.checkState(buffer.isDirect() == useDirect); + maxLength = Math.min(maxLength, buffer.capacity()); + boolean success = false; + try { + if (useDirect) { + buffer.clear(); + buffer.limit(maxLength); + ByteBufferReadable readable = (ByteBufferReadable)stream; + int totalRead = 0; + while (true) { + if (totalRead >= maxLength) { + success = true; + break; + } + int nRead = readable.read(buffer); + if (nRead < 0) { + if (totalRead > 0) { + success = true; + } + break; + } + totalRead += nRead; + } + buffer.flip(); + } else { + buffer.clear(); + int nRead = stream.read(buffer.array(), + buffer.arrayOffset(), maxLength); + if (nRead >= 0) { + buffer.limit(nRead); + success = true; + } + } + } finally { + if (!success) { + // If we got an error while reading, or if we are at EOF, we + // don't need the buffer any more. We can give it back to the + // bufferPool. + bufferPool.putBuffer(buffer); + buffer = null; + } + } + return buffer; + } +} Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java?rev=1527113&r1=1527112&r2=1527113&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java (original) +++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java Fri Sep 27 22:51:12 2013 @@ -1,4 +1,5 @@ /** + * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -19,17 +20,29 @@ package org.apache.hadoop.fs; import java.io.*; import java.nio.ByteBuffer; +import java.util.EnumSet; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.io.ByteBufferPool; +import org.apache.hadoop.fs.ByteBufferUtil; +import org.apache.hadoop.util.IdentityHashStore; /** Utility that wraps a {@link FSInputStream} in a {@link DataInputStream} * and buffers input through a {@link BufferedInputStream}. */ @InterfaceAudience.Public @InterfaceStability.Stable public class FSDataInputStream extends DataInputStream - implements Seekable, PositionedReadable, Closeable, - ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead { + implements Seekable, PositionedReadable, Closeable, + ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead, + HasEnhancedByteBufferAccess { + /** + * Map ByteBuffers that we have handed out to readers to ByteBufferPool + * objects + */ + private final IdentityHashStore + extendedReadBuffers + = new IdentityHashStore(0); public FSDataInputStream(InputStream in) throws IOException { @@ -167,4 +180,45 @@ public class FSDataInputStream extends D "support setting the drop-behind caching setting."); } } + + @Override + public ByteBuffer read(ByteBufferPool bufferPool, int maxLength, + EnumSet opts) + throws IOException, UnsupportedOperationException { + try { + return ((HasEnhancedByteBufferAccess)in).read(bufferPool, + maxLength, opts); + } + catch (ClassCastException e) { + ByteBuffer buffer = ByteBufferUtil. + fallbackRead(this, bufferPool, maxLength); + if (buffer != null) { + extendedReadBuffers.put(buffer, bufferPool); + } + return buffer; + } + } + + private static final EnumSet EMPTY_READ_OPTIONS_SET = + EnumSet.noneOf(ReadOption.class); + + final public ByteBuffer read(ByteBufferPool bufferPool, int maxLength) + throws IOException, UnsupportedOperationException { + return read(bufferPool, maxLength, EMPTY_READ_OPTIONS_SET); + } + + @Override + public void releaseBuffer(ByteBuffer buffer) { + try { + ((HasEnhancedByteBufferAccess)in).releaseBuffer(buffer); + } + catch (ClassCastException e) { + ByteBufferPool bufferPool = extendedReadBuffers.remove( buffer); + if (bufferPool == null) { + throw new IllegalArgumentException("tried to release a buffer " + + "that was not created by this stream."); + } + bufferPool.putBuffer(buffer); + } + } } Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputStream.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputStream.java?rev=1527113&r1=1527112&r2=1527113&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputStream.java (original) +++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputStream.java Fri Sep 27 22:51:12 2013 @@ -18,9 +18,11 @@ package org.apache.hadoop.fs; import java.io.*; +import java.nio.ByteBuffer; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.ZeroCopyUnavailableException; /**************************************************************** * FSInputStream is a generic old InputStream with a little bit Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HasEnhancedByteBufferAccess.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HasEnhancedByteBufferAccess.java?rev=1527113&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HasEnhancedByteBufferAccess.java (added) +++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HasEnhancedByteBufferAccess.java Fri Sep 27 22:51:12 2013 @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.EnumSet; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.io.ByteBufferPool; + +/** + * FSDataInputStreams implement this interface to provide enhanced + * byte buffer access. Usually this takes the form of mmap support. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public interface HasEnhancedByteBufferAccess { + /** + * Get a ByteBuffer containing file data. + * + * This ByteBuffer may come from the stream itself, via a call like mmap, + * or it may come from the ByteBufferFactory which is passed in as an + * argument. + * + * @param factory + * If this is non-null, it will be used to create a fallback + * ByteBuffer when the stream itself cannot create one. + * @param maxLength + * The maximum length of buffer to return. We may return a buffer + * which is shorter than this. + * @param opts + * Options to use when reading. + * + * @return + * We will return null on EOF (and only on EOF). + * Otherwise, we will return a direct ByteBuffer containing at + * least one byte. You must free this ByteBuffer when you are + * done with it by calling releaseBuffer on it. + * The buffer will continue to be readable until it is released + * in this manner. However, the input stream's close method may + * warn about unclosed buffers. + * @throws + * IOException: if there was an error reading. + * UnsupportedOperationException: if factory was null, and we + * needed an external byte buffer. UnsupportedOperationException + * will never be thrown unless the factory argument is null. + */ + public ByteBuffer read(ByteBufferPool factory, int maxLength, + EnumSet opts) + throws IOException, UnsupportedOperationException; + + /** + * Release a ByteBuffer which was created by the enhanced ByteBuffer read + * function. You must not continue using the ByteBuffer after calling this + * function. + * + * @param buffer + * The ByteBuffer to release. + */ + public void releaseBuffer(ByteBuffer buffer); +} \ No newline at end of file Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ReadOption.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ReadOption.java?rev=1527113&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ReadOption.java (added) +++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ReadOption.java Fri Sep 27 22:51:12 2013 @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Options that can be used when reading from a FileSystem. + */ +@InterfaceAudience.Public +@InterfaceStability.Stable +public enum ReadOption { + /** + * Skip checksums when reading. This option may be useful when reading a file + * format that has built-in checksums, or for testing purposes. + */ + SKIP_CHECKSUMS, +} Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ZeroCopyUnavailableException.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ZeroCopyUnavailableException.java?rev=1527113&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ZeroCopyUnavailableException.java (added) +++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ZeroCopyUnavailableException.java Fri Sep 27 22:51:12 2013 @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import java.io.IOException; + +public class ZeroCopyUnavailableException extends IOException { + private static final long serialVersionUID = 0L; + + public ZeroCopyUnavailableException(String message) { + super(message); + } + + public ZeroCopyUnavailableException(String message, Exception e) { + super(message, e); + } + + public ZeroCopyUnavailableException(Exception e) { + super(e); + } +} Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ByteBufferPool.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ByteBufferPool.java?rev=1527113&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ByteBufferPool.java (added) +++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ByteBufferPool.java Fri Sep 27 22:51:12 2013 @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io; + +import java.nio.ByteBuffer; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +@InterfaceAudience.Public +@InterfaceStability.Stable +public interface ByteBufferPool { + /** + * Get a new direct ByteBuffer. The pool can provide this from + * removing a buffer from its internal cache, or by allocating a + * new buffer. + * + * @param direct Whether the buffer should be direct. + * @param length The minimum length the buffer will have. + * @return A new ByteBuffer. This ByteBuffer must be direct. + * Its capacity can be less than what was requested, but + * must be at least 1 byte. + */ + ByteBuffer getBuffer(boolean direct, int length); + + /** + * Release a buffer back to the pool. + * The pool may choose to put this buffer into its cache. + * + * @param buffer a direct bytebuffer + */ + void putBuffer(ByteBuffer buffer); +} Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java?rev=1527113&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java (added) +++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java Fri Sep 27 22:51:12 2013 @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io; + +import com.google.common.collect.ComparisonChain; +import org.apache.commons.lang.builder.HashCodeBuilder; + +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * This is a simple ByteBufferPool which just creates ByteBuffers as needed. + * It also caches ByteBuffers after they're released. It will always return + * the smallest cached buffer with at least the capacity you request. + * We don't try to do anything clever here like try to limit the maximum cache + * size. + */ +@InterfaceAudience.Public +@InterfaceStability.Stable +public final class ElasticByteBufferPool implements ByteBufferPool { + private static final class Key implements Comparable { + private final int capacity; + private final long insertionTime; + + Key(int capacity, long insertionTime) { + this.capacity = capacity; + this.insertionTime = insertionTime; + } + + @Override + public int compareTo(Key other) { + return ComparisonChain.start(). + compare(capacity, other.capacity). + compare(insertionTime, other.insertionTime). + result(); + } + + @Override + public boolean equals(Object rhs) { + if (rhs == null) { + return false; + } + try { + Key o = (Key)rhs; + return (compareTo(o) == 0); + } catch (ClassCastException e) { + return false; + } + } + + @Override + public int hashCode() { + return new HashCodeBuilder(). + append(capacity). + append(insertionTime). + toHashCode(); + } + } + + private final TreeMap buffers = + new TreeMap(); + + private final TreeMap directBuffers = + new TreeMap(); + + private final TreeMap getBufferTree(boolean direct) { + return direct ? directBuffers : buffers; + } + + @Override + public synchronized ByteBuffer getBuffer(boolean direct, int length) { + TreeMap tree = getBufferTree(direct); + Map.Entry entry = + tree.ceilingEntry(new Key(length, 0)); + if (entry == null) { + return direct ? ByteBuffer.allocateDirect(length) : + ByteBuffer.allocate(length); + } + tree.remove(entry.getKey()); + return entry.getValue(); + } + + @Override + public synchronized void putBuffer(ByteBuffer buffer) { + TreeMap tree = getBufferTree(buffer.isDirect()); + while (true) { + Key key = new Key(buffer.capacity(), System.nanoTime()); + if (!tree.containsKey(key)) { + tree.put(key, buffer); + return; + } + // Buffers are indexed by (capacity, time). + // If our key is not unique on the first try, we try again, since the + // time will be different. Since we use nanoseconds, it's pretty + // unlikely that we'll loop even once, unless the system clock has a + // poor granularity. + } + } +} Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/IdentityHashStore.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/IdentityHashStore.java?rev=1527113&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/IdentityHashStore.java (added) +++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/IdentityHashStore.java Fri Sep 27 22:51:12 2013 @@ -0,0 +1,197 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.util; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +import com.google.common.base.Preconditions; + +/** + * The IdentityHashStore stores (key, value) mappings in an array. + * It is similar to java.util.HashTable, but much more lightweight. + * Neither inserting nor removing an element ever leads to any garbage + * getting created (assuming the array doesn't need to be enlarged). + * + * Unlike HashTable, it compares keys using + * {@link System#identityHashCode(Object)} and the identity operator. + * This is useful for types like ByteBuffer which have expensive hashCode + * and equals operators. + * + * We use linear probing to resolve collisions. This avoids the need for + * the overhead of linked list data structures. It also means that it is + * expensive to attempt to remove an element that isn't there, since we + * have to look at the entire array to be sure that it doesn't exist. + * + * @param The key type to use. + * @param THe value type to use. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +@SuppressWarnings("unchecked") +public final class IdentityHashStore { + /** + * Even elements are keys; odd elements are values. + * The array has size 1 + Math.pow(2, capacity). + */ + private Object buffer[]; + + private int numInserted = 0; + + private int capacity; + + /** + * The default maxCapacity value to use. + */ + private static final int DEFAULT_MAX_CAPACITY = 2; + + public IdentityHashStore(int capacity) { + Preconditions.checkArgument(capacity >= 0); + if (capacity == 0) { + this.capacity = 0; + this.buffer = null; + } else { + // Round the capacity we need up to a power of 2. + realloc((int)Math.pow(2, + Math.ceil(Math.log(capacity) / Math.log(2)))); + } + } + + private void realloc(int newCapacity) { + Preconditions.checkArgument(newCapacity > 0); + Object prevBuffer[] = buffer; + this.capacity = newCapacity; + // Each element takes two array slots -- one for the key, + // and another for the value. We also want a load factor + // of 0.50. Combine those together and you get 4 * newCapacity. + this.buffer = new Object[4 * newCapacity]; + this.numInserted = 0; + if (prevBuffer != null) { + for (int i = 0; i < prevBuffer.length; i += 2) { + if (prevBuffer[i] != null) { + putInternal(prevBuffer[i], prevBuffer[i + 1]); + } + } + } + } + + private void putInternal(Object k, Object v) { + int hash = System.identityHashCode(k); + final int numEntries = buffer.length / 2; + int index = hash % numEntries; + while (true) { + if (buffer[2 * index] == null) { + buffer[2 * index] = k; + buffer[1 + (2 * index)] = v; + numInserted++; + return; + } + index = (index + 1) % numEntries; + } + } + + /** + * Add a new (key, value) mapping. + * + * Inserting a new (key, value) never overwrites a previous one. + * In other words, you can insert the same key multiple times and it will + * lead to multiple entries. + */ + public void put(K k, V v) { + Preconditions.checkNotNull(k); + if (buffer == null) { + realloc(DEFAULT_MAX_CAPACITY); + } else if (numInserted + 1 > capacity) { + realloc(capacity * 2); + } + putInternal(k, v); + } + + private int getElementIndex(K k) { + if (buffer == null) { + return -1; + } + final int numEntries = buffer.length / 2; + int hash = System.identityHashCode(k); + int index = hash % numEntries; + int firstIndex = index; + do { + if (buffer[2 * index] == k) { + return index; + } + index = (index + 1) % numEntries; + } while (index != firstIndex); + return -1; + } + + /** + * Retrieve a value associated with a given key. + */ + public V get(K k) { + int index = getElementIndex(k); + if (index < 0) { + return null; + } + return (V)buffer[1 + (2 * index)]; + } + + /** + * Retrieve a value associated with a given key, and delete the + * relevant entry. + */ + public V remove(K k) { + int index = getElementIndex(k); + if (index < 0) { + return null; + } + V val = (V)buffer[1 + (2 * index)]; + buffer[2 * index] = null; + buffer[1 + (2 * index)] = null; + numInserted--; + return val; + } + + public boolean isEmpty() { + return numInserted == 0; + } + + public int numElements() { + return numInserted; + } + + public int capacity() { + return capacity; + } + + public interface Visitor { + void accept(K k, V v); + } + + /** + * Visit all key, value pairs in the IdentityHashStore. + */ + public void visitAll(Visitor visitor) { + int length = buffer == null ? 0 : buffer.length; + for (int i = 0; i < length; i += 2) { + if (buffer[i] != null) { + visitor.accept((K)buffer[i], (V)buffer[i + 1]); + } + } + } +} Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestIdentityHashStore.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestIdentityHashStore.java?rev=1527113&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestIdentityHashStore.java (added) +++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestIdentityHashStore.java Fri Sep 27 22:51:12 2013 @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.util; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; + +import junit.framework.Assert; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.util.IdentityHashStore; +import org.apache.hadoop.util.IdentityHashStore.Visitor; +import org.junit.Test; + +public class TestIdentityHashStore { + private static final Log LOG = LogFactory.getLog(TestIdentityHashStore.class.getName()); + + private static class Key { + private final String name; + + Key(String name) { + this.name = name; + } + + @Override + public int hashCode() { + throw new RuntimeException("should not be used!"); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof Key)) { + return false; + } + Key other = (Key)o; + return name.equals(other.name); + } + } + + @Test(timeout=60000) + public void testStartingWithZeroCapacity() { + IdentityHashStore store = + new IdentityHashStore(0); + store.visitAll(new Visitor() { + @Override + public void accept(Key k, Integer v) { + Assert.fail("found key " + k + " in empty IdentityHashStore."); + } + }); + Assert.assertTrue(store.isEmpty()); + final Key key1 = new Key("key1"); + Integer value1 = new Integer(100); + store.put(key1, value1); + Assert.assertTrue(!store.isEmpty()); + Assert.assertEquals(value1, store.get(key1)); + store.visitAll(new Visitor() { + @Override + public void accept(Key k, Integer v) { + Assert.assertEquals(key1, k); + } + }); + Assert.assertEquals(value1, store.remove(key1)); + Assert.assertTrue(store.isEmpty()); + } + + @Test(timeout=60000) + public void testDuplicateInserts() { + IdentityHashStore store = + new IdentityHashStore(4); + store.visitAll(new Visitor() { + @Override + public void accept(Key k, Integer v) { + Assert.fail("found key " + k + " in empty IdentityHashStore."); + } + }); + Assert.assertTrue(store.isEmpty()); + Key key1 = new Key("key1"); + Integer value1 = new Integer(100); + Integer value2 = new Integer(200); + Integer value3 = new Integer(300); + store.put(key1, value1); + Key equalToKey1 = new Key("key1"); + + // IdentityHashStore compares by object equality, not equals() + Assert.assertNull(store.get(equalToKey1)); + + Assert.assertTrue(!store.isEmpty()); + Assert.assertEquals(value1, store.get(key1)); + store.put(key1, value2); + store.put(key1, value3); + final List allValues = new LinkedList(); + store.visitAll(new Visitor() { + @Override + public void accept(Key k, Integer v) { + allValues.add(v); + } + }); + Assert.assertEquals(3, allValues.size()); + for (int i = 0; i < 3; i++) { + Integer value = store.remove(key1); + Assert.assertTrue(allValues.remove(value)); + } + Assert.assertNull(store.remove(key1)); + Assert.assertTrue(store.isEmpty()); + } + + @Test(timeout=60000) + public void testAdditionsAndRemovals() { + IdentityHashStore store = + new IdentityHashStore(0); + final int NUM_KEYS = 1000; + LOG.debug("generating " + NUM_KEYS + " keys"); + final List keys = new ArrayList(NUM_KEYS); + for (int i = 0; i < NUM_KEYS; i++) { + keys.add(new Key("key " + i)); + } + for (int i = 0; i < NUM_KEYS; i++) { + store.put(keys.get(i), i); + } + store.visitAll(new Visitor() { + @Override + public void accept(Key k, Integer v) { + Assert.assertTrue(keys.contains(k)); + } + }); + for (int i = 0; i < NUM_KEYS; i++) { + Assert.assertEquals(Integer.valueOf(i), + store.remove(keys.get(i))); + } + store.visitAll(new Visitor() { + @Override + public void accept(Key k, Integer v) { + Assert.fail("expected all entries to be removed"); + } + }); + Assert.assertTrue("expected the store to be " + + "empty, but found " + store.numElements() + " elements.", + store.isEmpty()); + Assert.assertEquals(1024, store.capacity()); + } + +}