geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kl...@apache.org
Subject [11/63] [abbrv] incubator-geode git commit: GEODE-1072: Removing HDFS related code
Date Wed, 04 May 2016 22:57:11 GMT
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HFileSortedOplog.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HFileSortedOplog.java
b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HFileSortedOplog.java
deleted file mode 100644
index 5ba20d2..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HFileSortedOplog.java
+++ /dev/null
@@ -1,853 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
-
-import java.io.ByteArrayInputStream;
-import java.io.Closeable;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.EnumMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.NoSuchElementException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import com.gemstone.gemfire.internal.hll.HyperLogLog;
-import com.gemstone.gemfire.internal.hll.ICardinality;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.util.ShutdownHookManager;
-
-import com.gemstone.gemfire.cache.CacheClosedException;
-import com.gemstone.gemfire.cache.hdfs.HDFSIOException;
-import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.DelegatingSerializedComparator;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.HFileStoreStatistics;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogStatistics;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogStatistics.ScanOperation;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.SerializedComparator;
-import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
-import com.gemstone.gemfire.internal.Version;
-import com.gemstone.gemfire.internal.util.Hex;
-import com.gemstone.gemfire.internal.util.SingletonValue;
-import com.gemstone.gemfire.internal.util.SingletonValue.SingletonBuilder;
-
-import org.apache.hadoop.hbase.io.hfile.BlockCache;
-import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory;
-import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
-import org.apache.hadoop.hbase.io.hfile.HFile;
-import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
-import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
-import org.apache.hadoop.hbase.io.hfile.HFileBlockIndex.BlockIndexReader;
-import org.apache.hadoop.hbase.io.hfile.HFileScanner;
-import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
-import org.apache.hadoop.hbase.util.BloomFilterFactory;
-import org.apache.hadoop.hbase.util.BloomFilterWriter;
-
-/**
- * Implements hfile based {@link Hoplog}
- */
-public final class HFileSortedOplog extends AbstractHoplog {
-
-//  private static final boolean CACHE_DATA_BLOCKS_ON_READ = !Boolean.getBoolean("gemfire.HFileSortedOplog.DISABLE_CACHE_ON_READ");
-  private final CacheConfig cacheConf;
-  private ICardinality entryCountEstimate;
-  
-  // a cached reader for the file
-  private final SingletonValue<HFileReader> reader;
-
-  public HFileSortedOplog(HDFSStoreImpl store, Path hfilePath,
-      BlockCache blockCache, SortedOplogStatistics stats,
-      HFileStoreStatistics storeStats) throws IOException {
-    super(store, hfilePath, stats);
-    cacheConf = getCacheConfInstance(blockCache, stats, storeStats);
-    reader = getReaderContainer();
-  }
-
-  /**
-   * THIS METHOD SHOULD BE USED FOR LONER ONLY
-   */
-  public static HFileSortedOplog getHoplogForLoner(FileSystem inputFS,
-      Path hfilePath) throws IOException {
-    return new HFileSortedOplog(inputFS, hfilePath, null, null, null);
-  }
-
-  private HFileSortedOplog(FileSystem inputFS, Path hfilePath,
-      BlockCache blockCache, SortedOplogStatistics stats,
-      HFileStoreStatistics storeStats) throws IOException {
-    super(inputFS, hfilePath, stats);
-    cacheConf = getCacheConfInstance(blockCache, stats, storeStats);
-    reader = getReaderContainer();
-  }
-
-  protected CacheConfig getCacheConfInstance(BlockCache blockCache,
-      SortedOplogStatistics stats, HFileStoreStatistics storeStats) {
-    CacheConfig tmpConfig = null;
-//    if (stats == null) {
-      tmpConfig = new CacheConfig(conf);
-//    } else {
-//      tmpConfig = new CacheConfig(conf, CACHE_DATA_BLOCKS_ON_READ, blockCache,
-//          HFileSortedOplogFactory.convertStatistics(stats, storeStats));
-//    }
-    tmpConfig.shouldCacheBlockOnRead(BlockCategory.ALL_CATEGORIES);
-    return tmpConfig;
-  }  
-
-  private SingletonValue<HFileReader> getReaderContainer() {
-    return new SingletonValue<HFileReader>(new SingletonBuilder<HFileReader>()
{
-      @Override
-      public HFileReader create() throws IOException {
-        if (logger.isDebugEnabled())
-          logger.debug("{}Creating hoplog reader", logPrefix);
-        return new HFileReader();
-      }
-
-      @Override
-      public void postCreate() {
-        if (readerListener != null) {
-          readerListener.readerCreated();
-        }
-      }
-      
-      @Override
-      public void createInProgress() {
-      }
-    });
-  }
-  
-  @Override
-  public HoplogReader getReader() throws IOException {
-    return reader.get();
-  }
-  
-  @Override
-  public ICardinality getEntryCountEstimate() throws IOException {
-    ICardinality result = entryCountEstimate;
-    if (result == null) {
-      HoplogReader rdr = getReader(); // keep this out of the critical section
-      synchronized(this) {
-        result = entryCountEstimate;
-          if (result == null) {
-            entryCountEstimate = result = rdr.getCardinalityEstimator();
-          }
-        }
-    }
-    return result;
-  }
-  
-  @Override
-  public HoplogWriter createWriter(int keys) throws IOException {
-    return new HFileSortedOplogWriter(keys);
-  }
-
-  @Override
-  public boolean isClosed() {
-    HFileReader rdr = reader.getCachedValue();
-    return rdr == null || rdr.isClosed();
-  }
-  
-  @Override
-  public void close() throws IOException {
-    close(true);
-  }
-
-  @Override
-  public void close(boolean clearCache) throws IOException {
-    compareAndClose(null, clearCache);
-  }
-  
-  private void compareAndClose(HFileReader hfileReader, boolean clearCache) throws IOException
{
-    HFileReader rdr ;
-    if (hfileReader == null) {
-      rdr = reader.clear(true);
-    } else {
-      boolean result = reader.clear(hfileReader, true);
-      if (! result) {
-        if (logger.isDebugEnabled())
-          logger.debug("{}skipping close, provided hfileReader mismatched", logPrefix);
-        return;
-      } 
-      rdr = hfileReader;
-    }
-    
-    if (rdr != null) {
-      try {
-        rdr.close(clearCache);
-      } finally {
-        if (readerListener != null) {
-          readerListener.readerClosed();
-        }
-      }
-    }
-  }
-  
-  @Override
-  public String toString() {
-    return "HFileSortedOplog[" + getFileName() + "]";
-  }
-
-  private class HFileSortedOplogWriter implements HoplogWriter {
-    private final Writer writer;
-    private final BloomFilterWriter bfw;
-    private final AtomicBoolean closed = new AtomicBoolean(false);
-
-    public HFileSortedOplogWriter(int keys) throws IOException {
-      try {
-        int hfileBlockSize = Integer.getInteger(
-            HoplogConfig.HFILE_BLOCK_SIZE_CONF, (1 << 16));
-
-        Algorithm compress = Algorithm.valueOf(System.getProperty(HoplogConfig.COMPRESSION,
-            HoplogConfig.COMPRESSION_DEFAULT));
-
-//        ByteComparator bc = new ByteComparator();
-        writer = HFile.getWriterFactory(conf, cacheConf)
-            .withPath(fsProvider.getFS(), path)
-            .withBlockSize(hfileBlockSize)
-//            .withComparator(bc)
-            .withCompression(compress)
-            .create();
-//        bfw = BloomFilterFactory.createGeneralBloomAtWrite(conf, cacheConf, BloomType.ROW,
keys,
-//            writer, bc);
-        bfw = BloomFilterFactory.createGeneralBloomAtWrite(conf, cacheConf, BloomType.ROW,
keys,
-            writer);
-
-        if (logger.isDebugEnabled())
-          logger.debug("{}Created hoplog writer with compression " + compress, logPrefix);
-      } catch (IOException e) {
-        if (logger.isDebugEnabled())
-          logger.debug("{}IO Error while creating writer", logPrefix);
-        throw e;
-      }
-    }
-
-    @Override
-    public void append(byte[] key, byte[] value) throws IOException {
-      writer.append(key, value);
-      bfw.add(key, 0, key.length);
-    }
-
-    @Override
-    public void append(ByteBuffer key, ByteBuffer value) throws IOException {
-      byte[] keyBytes = byteBufferToArray(key);
-      byte[] valueBytes = byteBufferToArray(value);
-      writer.append(keyBytes, valueBytes);
-      bfw.add(keyBytes, 0, keyBytes.length);
-    }
-
-    @Override
-    public void close() throws IOException {
-      close(null);
-    }
-
-    @Override
-    public void close(EnumMap<Meta, byte[]> metadata) throws IOException {
-      if (closed.get()) {
-        if (logger.isDebugEnabled())
-          logger.debug("{}Writer already closed", logPrefix);
-        return;
-      }
-      
-      bfw.compactBloom();
-      writer.addGeneralBloomFilter(bfw);
-
-      // append system metadata
-      writer.appendFileInfo(Meta.GEMFIRE_MAGIC.toBytes(), Hoplog.MAGIC);
-      writer.appendFileInfo(Meta.SORTED_OPLOG_VERSION.toBytes(), HoplogVersion.V1.toBytes());
-      writer.appendFileInfo(Meta.GEMFIRE_VERSION.toBytes(), Version.CURRENT.toBytes());
-      
-      // append comparator info
-//      if (writer.getComparator() instanceof DelegatingSerializedComparator) {
-//        ByteArrayOutputStream bos = new ByteArrayOutputStream();
-//        DataOutput out = new DataOutputStream(bos);
-//        
-//        writeComparatorInfo(out, ((DelegatingSerializedComparator) writer.getComparator()).getComparators());
-//        writer.appendFileInfo(Meta.COMPARATORS.toBytes(), bos.toByteArray());
-//      }
-      
-      // append user metadata
-      HyperLogLog cachedEntryCountEstimate = null;
-      if (metadata != null) {
-        for (Entry<Meta, byte[]> entry : metadata.entrySet()) {
-          writer.appendFileInfo(entry.getKey().toBytes(), entry.getValue());
-          if (Meta.LOCAL_CARDINALITY_ESTIMATE_V2.equals(entry.getKey())) {
-             cachedEntryCountEstimate = HyperLogLog.Builder.build(entry.getValue()); 
-          }
-        }
-      }
-      
-      writer.close();
-      if (logger.isDebugEnabled())
-        logger.debug("{}Completed closing writer", logPrefix);
-      closed.set(true);
-      // cache estimate value to avoid reads later
-      entryCountEstimate = cachedEntryCountEstimate;
-    }
-
-    @Override
-    public void hsync() throws IOException {
-      throw new UnsupportedOperationException("hsync is not supported for HFiles"); 
-    }
-
-    @Override
-    public long getCurrentSize() throws IOException {
-      throw new UnsupportedOperationException("getCurrentSize is not supported for HFiles");

-    }
-    
-//    private void writeComparatorInfo(DataOutput out, SerializedComparator[] comparators)
throws IOException {
-//      out.writeInt(comparators.length);
-//      for (SerializedComparator sc : comparators) {
-//        out.writeUTF(sc.getClass().getName());
-//        if (sc instanceof DelegatingSerializedComparator) {
-//          writeComparatorInfo(out, ((DelegatingSerializedComparator) sc).getComparators());
-//        }
-//      }
-//    }
-  }
-  
-  private void handleReadIOError(HFileReader hfileReader, IOException e, boolean skipFailIfSafe)
{
-    if (logger.isDebugEnabled())
-      logger.debug("Read IO error", e);
-    boolean safeError = ShutdownHookManager.get().isShutdownInProgress();
-    if (safeError) {
-      // IOException because of closed file system. This happens when member is
-      // shutting down
-      if (logger.isDebugEnabled())
-        logger.debug("IO error caused by filesystem shutdown", e);
-      throw new CacheClosedException("IO error caused by filesystem shutdown", e);
-    } 
-    
-    // expose the error wrapped inside remote exception. Remote exceptions are
-    // handled by file system client. So let the caller handle this error
-    if (e instanceof RemoteException) {
-      e = ((RemoteException) e).unwrapRemoteException();
-      throw new HDFSIOException(LocalizedStrings.HOPLOG_FAILED_TO_READ_HDFS_FILE.toLocalizedString(path),
e);
-    } 
-    
-    FileSystem currentFs = fsProvider.checkFileSystem();
-    if (hfileReader != null && hfileReader.previousFS != currentFs) {
-      if (logger.isDebugEnabled()) {
-        logger.debug("{}Detected new FS client, closing old reader", logPrefix);
-        if (currentFs != null) {
-          if (logger.isDebugEnabled())
-            logger.debug("CurrentFs:" + currentFs.getUri() + "-"
-                + currentFs.hashCode(), logPrefix);
-        }
-        if (hfileReader.previousFS != null) {
-          if (logger.isDebugEnabled())
-            logger.debug("OldFs:" + hfileReader.previousFS.getUri() + "-"
-                + hfileReader.previousFS.hashCode() + ", closing old reader", logPrefix);
-        }
-      }
-      try {
-        HFileSortedOplog.this.compareAndClose(hfileReader, false);
-      } catch (Exception ex) {
-        if (logger.isDebugEnabled())
-          logger.debug("Failed to close reader", ex);
-      }
-      if (skipFailIfSafe) {
-        if (logger.isDebugEnabled())
-          logger.debug("Not faling after io error since FS client changed");
-        return;
-      }
-    }
-
-    // it is not a safe error. let the caller handle it
-    throw new HDFSIOException(LocalizedStrings.HOPLOG_FAILED_TO_READ_HDFS_FILE.toLocalizedString(path),
e);
-  }
-
-  class HFileReader implements HoplogReader, Closeable {
-    private final Reader reader;
-    private volatile BloomFilter hoplogBloom;
-    private final AtomicBoolean closed;
-    private final Map<byte[], byte[]> fileInfo;
-    private final HyperLogLog estimator;
-    private final FileSystem previousFS;
-    
-    public HFileReader() throws IOException {
-      try {
-        FileSystem fs = fsProvider.getFS();
-        reader = HFile.createReader(fs, path, cacheConf);
-        fileInfo = reader.loadFileInfo();
-        closed = new AtomicBoolean(false);
-
-        validate();
-        if (reader.getComparator() instanceof DelegatingSerializedComparator) {
-          loadComparators((DelegatingSerializedComparator) reader.getComparator());
-        }
-
-        // read the old HLL if it exists so that a CardinalityMergeException will trigger
a Major Compaction
-        byte[] hll = fileInfo.get(Meta.LOCAL_CARDINALITY_ESTIMATE.toBytes());
-        if (hll != null) {
-          entryCountEstimate = estimator = HyperLogLog.Builder.build(hll);
-        } else if ((hll = fileInfo.get(Meta.LOCAL_CARDINALITY_ESTIMATE_V2.toBytes())) !=
null) {
-          entryCountEstimate = estimator = HyperLogLog.Builder.build(hll);
-        } else {
-          estimator = new HyperLogLog(HdfsSortedOplogOrganizer.HLL_CONSTANT);
-        }
-        
-        previousFS = fs;
-      } catch (IOException e) {
-        if (logger.isDebugEnabled())
-          logger.debug("IO Error while creating reader", e);
-        throw e;
-      }
-    }
-
-    @Override
-    public byte[] read(byte[] key) throws IOException {
-      IOException err = null;
-      HFileReader delegateReader = this;
-      for (int retry = 1; retry >= 0; retry --) {
-        try {
-          return delegateReader.readDelegate(key);
-        } catch (IOException e) {
-          err = e;
-          handleReadIOError(delegateReader, e, retry > 0);
-          // Current reader may have got closed in error handling. Get the new
-          // one for retry attempt
-          try {
-            delegateReader = (HFileReader) HFileSortedOplog.this.getReader(); 
-          } catch (IOException ex) {
-            handleReadIOError(null, e, false);
-          }
-        }
-      }
-
-      if (logger.isDebugEnabled())
-        logger.debug("Throwing err from read delegate ", err);
-      throw err;
-    }
-
-    private byte[] readDelegate(byte[] key) throws IOException {
-      try {
-        if (!getBloomFilter().mightContain(key)) {
-          // bloom filter check failed, the key is not present in this hoplog
-          return null;
-        }
-      } catch (IllegalArgumentException e) {
-        if (IOException.class.isAssignableFrom(e.getCause().getClass())) {
-          throw (IOException) e.getCause();
-        } else {
-          throw e;
-        }
-      }
-      
-      byte[] valueBytes = null;
-      ByteBuffer bb = get(key);
-      if (bb != null) {
-        valueBytes = new byte[bb.remaining()];
-        bb.get(valueBytes);
-      } else {
-        stats.getBloom().falsePositive();
-      }
-      return valueBytes;
-    }
-
-    @Override
-    public ByteBuffer get(byte[] key) throws IOException {
-      assert key != null;
-      HFileScanner seek = reader.getScanner(false, true);
-      if (seek.seekTo(key) == 0) {
-        return seek.getValue();
-      }
-      return null;
-    }
-
-    @Override
-    public HoplogIterator<byte[], byte[]> scan(byte[] from, boolean fromInclusive,
byte[] to,
-        boolean toInclusive) throws IOException {
-      IOException err = null;
-      HFileReader delegateReader = this;
-      for (int retry = 1; retry >= 0; retry --) {
-        try {
-          return delegateReader.scanDelegate(from, fromInclusive, to, toInclusive);
-        } catch (IOException e) {
-          err = e;
-          handleReadIOError(delegateReader, e, retry > 0);
-          // Current reader may have got closed in error handling. Get the new
-          // one for retry attempt
-          try {
-            delegateReader = (HFileReader) HFileSortedOplog.this.getReader(); 
-          } catch (IOException ex) {
-            handleReadIOError(null, e, false);
-          }
-        }
-      }
-      if (logger.isDebugEnabled())
-        logger.debug("Throwing err from scan delegate ", err);
-      throw err;
-    }
-
-    private HoplogIterator<byte[], byte[]> scanDelegate(byte[] from, boolean fromInclusive,
byte[] to,
-        boolean toInclusive) throws IOException {
-      return new HFileSortedIterator(reader.getScanner(true, false), from,
-          fromInclusive, to, toInclusive);
-    }
-    
-    @Override
-    public HoplogIterator<byte[], byte[]> scan(long offset, long length)
-        throws IOException {
-      /**
-       * Identifies the first and last key to be scanned based on offset and
-       * length. It loads hfile block index and identifies the first hfile block
-       * starting after offset. The key of that block is from key for scanner.
-       * Similarly it locates first block starting beyond offset + length range.
-       * It uses key of that block as the to key for scanner
-       */
-
-      // load block indexes in memory
-      BlockIndexReader bir = reader.getDataBlockIndexReader();
-      int blockCount = bir.getRootBlockCount();
-      
-      byte[] fromKey = null, toKey = null;
-
-      // find from key
-      int i = 0;
-      for (; i < blockCount; i++) {
-        if (bir.getRootBlockOffset(i) < offset) {
-          // hfile block has offset less than this reader's split offset. check
-          // the next block
-          continue;
-        }
-
-        // found the first hfile block starting after offset
-        fromKey = bir.getRootBlockKey(i);
-        break;
-      }
-
-      if (fromKey == null) {
-        // seems no block starts after the offset. return no-op scanner
-        return new HFileSortedIterator(null, null, false, null, false);
-      }
-      
-      // find to key
-      for (; i < blockCount; i++) {
-        if (bir.getRootBlockOffset(i) < (offset + length)) {
-          // this hfile block lies within the offset+lenght range. check the
-          // next block for a higher offset
-          continue;
-        }
-
-        // found the first block starting beyong offset+length range.
-        toKey = bir.getRootBlockKey(i);
-        break;
-      }
-
-      // from key is included in scan and to key is excluded
-      HFileScanner scanner = reader.getScanner(true, false);
-      return new HFileSortedIterator(scanner, fromKey, true, toKey, false);
-    }
-    
-    @Override
-    public HoplogIterator<byte[], byte[]> scan() throws IOException {
-      return scan(null, null);
-    }
-
-    public HoplogIterator<byte[], byte[]> scan(byte[] from, byte[] to)
-        throws IOException {
-      return scan(from, true, to, false);
-    }
-
-    @Override
-    public BloomFilter getBloomFilter() throws IOException {
-      BloomFilter result = hoplogBloom;
-      if (result == null) {
-        synchronized (this) {
-          result = hoplogBloom;
-          if (result == null) {
-            hoplogBloom = result = new BloomFilterImpl();
-          }
-        }
-      }
-      return result;
-    }
-
-    @Override
-    public boolean isClosed() {
-      return closed.get();
-    }
-    
-    @Override
-    public void close() throws IOException {
-      close(true);
-    }
-    
-    public void close(boolean clearCache) throws IOException {
-      if (closed.compareAndSet(false, true)) {
-        if (logger.isDebugEnabled())
-          logger.debug("{}Closing reader", logPrefix);
-        reader.close(clearCache);
-      }
-    }
-
-    @Override
-    public long getEntryCount() {
-      return reader.getEntries();
-    }
-
-    public ICardinality getCardinalityEstimator() {
-      return estimator;
-    }
-
-    @Override
-    public long sizeEstimate() {
-      return getCardinalityEstimator().cardinality();
-    }
-
-    private void validate() throws IOException {
-      // check magic
-      byte[] magic = fileInfo.get(Meta.GEMFIRE_MAGIC.toBytes());
-      if (!Arrays.equals(magic, MAGIC)) {
-        throw new IOException(LocalizedStrings.Soplog_INVALID_MAGIC.toLocalizedString(Hex.toHex(magic)));
-      }
-      
-      // check version compatibility
-      byte[] ver = fileInfo.get(Meta.SORTED_OPLOG_VERSION.toBytes());
-      if (logger.isDebugEnabled()) {
-        logger.debug("{}Hoplog version is " + Hex.toHex(ver), logPrefix);
-      }
-      
-      if (!Arrays.equals(ver, HoplogVersion.V1.toBytes())) {
-        throw new IOException(LocalizedStrings.Soplog_UNRECOGNIZED_VERSION.toLocalizedString(Hex.toHex(ver)));
-      }
-    }
-    
-    private void loadComparators(DelegatingSerializedComparator comparator) throws IOException
{
-      byte[] raw = fileInfo.get(Meta.COMPARATORS.toBytes());
-      assert raw != null;
-
-      DataInput in = new DataInputStream(new ByteArrayInputStream(raw));
-      comparator.setComparators(readComparators(in));
-    }
-    
-    private SerializedComparator[] readComparators(DataInput in) throws IOException {
-      try {
-        SerializedComparator[] comps = new SerializedComparator[in.readInt()];
-        assert comps.length > 0;
-        
-        for (int i = 0; i < comps.length; i++) {
-          comps[i] = (SerializedComparator) Class.forName(in.readUTF()).newInstance();
-          if (comps[i] instanceof DelegatingSerializedComparator) {
-            ((DelegatingSerializedComparator) comps[i]).setComparators(readComparators(in));
-          }
-        }
-        return comps;
-        
-      } catch (Exception e) {
-        throw new IOException(e);
-      }
-    }
-    
-    class BloomFilterImpl implements BloomFilter {
-      private final org.apache.hadoop.hbase.util.BloomFilter hfileBloom;
-
-      public BloomFilterImpl() throws IOException {
-        DataInput bin = reader.getGeneralBloomFilterMetadata();
-        // instantiate bloom filter if meta present in hfile
-        if (bin != null) {
-          hfileBloom = BloomFilterFactory.createFromMeta(bin, reader);
-          if (reader.getComparator() instanceof DelegatingSerializedComparator) {
-            loadComparators((DelegatingSerializedComparator) hfileBloom.getComparator());
-          }
-        } else {
-          hfileBloom = null;
-        }
-      }
-
-      @Override
-      public boolean mightContain(byte[] key) {
-        assert key != null;
-        return mightContain(key, 0, key.length);
-      }
-
-      @Override
-      public boolean mightContain(byte[] key, int keyOffset, int keyLength) {
-        assert key != null;
-        long start = stats.getBloom().begin();
-        boolean found = hfileBloom == null ? true : hfileBloom.contains(key, keyOffset, keyLength,
null);
-        stats.getBloom().end(start);
-        return found;
-      }
-
-      @Override
-      public long getBloomSize() {
-        return hfileBloom == null ? 0 : hfileBloom.getByteSize();
-      }
-    }
-
-    // TODO change the KV types to ByteBuffer instead of byte[]
-    public final class HFileSortedIterator implements HoplogIterator<byte[], byte[]>
{
-      private final HFileScanner scan;
-      
-      private final byte[] from;
-      private final boolean fromInclusive;
-      
-      private final byte[] to;
-      private final boolean toInclusive;
-      
-      private ByteBuffer prefetchedKey;
-      private ByteBuffer prefetchedValue;
-      private ByteBuffer currentKey;
-      private ByteBuffer currentValue;
-      
-      // variable linked to scan stats
-      ScanOperation scanStat;
-      private long scanStart;
-      
-      public HFileSortedIterator(HFileScanner scan, byte[] from, boolean fromInclusive, byte[]
to, 
-          boolean toInclusive) throws IOException {
-        this.scan = scan;
-        this.from = from;
-        this.fromInclusive = fromInclusive;
-        this.to = to;
-        this.toInclusive = toInclusive;
-
-        scanStat = (stats == null) ? new SortedOplogStatistics("", "").new ScanOperation(
-            0, 0, 0, 0, 0, 0, 0) : stats.getScan();
-        scanStart = scanStat.begin();
-
-        if (scan == null) {
-          return;
-        }
-
-        assert from == null || to == null
-            || scan.getReader().getComparator().compare(from, to) <= 0;
-
-        initIterator();
-      }
-      
-      /*
-       * prefetches first key and value from the file for hasnext to work
-       */
-      private void initIterator() throws IOException {
-        long startNext = scanStat.beginIteration();
-        boolean scanSuccessful = true;
-        if (from == null) {
-          scanSuccessful = scan.seekTo();
-        } else {
-          int compare = scan.seekTo(from);
-          if (compare == 0 && !fromInclusive || compare > 0) {
-            // as from in exclusive and first key is same as from, skip the first key
-            scanSuccessful = scan.next();
-          }
-        }
-        
-        populateKV(startNext, scanSuccessful);
-      }
-      
-      @Override
-      public boolean hasNext() {
-        return prefetchedKey != null;
-      }
-
-      @Override
-      public byte[] next() throws IOException {
-        return byteBufferToArray(nextBB());
-      }
-
-      public ByteBuffer nextBB() throws IOException {
-        long startNext = scanStat.beginIteration();
-        if (prefetchedKey == null) {
-          throw new NoSuchElementException();
-        }
-
-        currentKey = prefetchedKey;
-        currentValue = prefetchedValue;
-
-        prefetchedKey = null;
-        prefetchedValue = null;
-
-        if (scan.next()) {
-          populateKV(startNext, true);
-        }
-        
-        return currentKey;
-      }
-
-      
-      private void populateKV(long nextStartTime, boolean scanSuccessful) {
-        if (!scanSuccessful) {
-          //end of file reached. collect stats and return
-          scanStat.endIteration(0, nextStartTime);
-          return;
-        }
-        
-        prefetchedKey = scan.getKey();
-        prefetchedValue = scan.getValue();
-        
-        if (to != null) {
-          // TODO Optimization? Perform int comparison instead of byte[]. Identify
-          // offset of key greater than two.
-          int compare = -1;
-          compare = scan.getReader().getComparator().compare
-              (prefetchedKey.array(), prefetchedKey.arrayOffset(), prefetchedKey.remaining(),
to, 0, to.length);
-          if (compare > 0 || (compare == 0 && !toInclusive)) {
-            prefetchedKey = null;
-            prefetchedValue = null;
-            return;
-          }
-        }
-        
-        // account for bytes read and time spent
-        int byteCount = prefetchedKey.remaining() + prefetchedValue.remaining();
-        scanStat.endIteration(byteCount, nextStartTime);
-      }
-      
-
-      @Override
-      public byte[] getKey() {
-        return byteBufferToArray(getKeyBB());
-      }
-      public ByteBuffer getKeyBB() {
-        return currentKey;
-      }
-
-      @Override
-      public byte[] getValue() {
-        return byteBufferToArray(getValueBB());
-      }
-      public ByteBuffer getValueBB() {
-        return currentValue;
-      }
-
-      @Override
-      public void remove() {
-        throw new UnsupportedOperationException("Cannot delete a key-value from a hfile sorted
oplog");
-      }
-      
-      @Override
-      public void close() {
-        scanStat.end(scanStart);
-      }
-    }
-  }
-  
-  public static byte[] byteBufferToArray(ByteBuffer bb) {
-    if (bb == null) {
-      return null;
-    }
-    
-    byte[] tmp = new byte[bb.remaining()];
-    bb.duplicate().get(tmp);
-    return tmp;
-  }
-}


Mime
View raw message