lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From markrmil...@apache.org
Subject svn commit: r1561751 - in /lucene/dev/trunk/solr: ./ core/src/java/org/apache/solr/core/ core/src/java/org/apache/solr/store/blockcache/
Date Mon, 27 Jan 2014 17:03:10 GMT
Author: markrmiller
Date: Mon Jan 27 17:03:09 2014
New Revision: 1561751

URL: http://svn.apache.org/r1561751
Log:
SOLR-5666: Using the hdfs write cache can result in appearance of corrupted index.

Added:
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/store/blockcache/Store.java   (with
props)
Modified:
    lucene/dev/trunk/solr/CHANGES.txt
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/HdfsDirectoryFactory.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/store/blockcache/BlockDirectory.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/store/blockcache/BufferStore.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/store/blockcache/CustomBufferedIndexInput.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/store/blockcache/ReusedBufferedIndexOutput.java

Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1561751&r1=1561750&r2=1561751&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Mon Jan 27 17:03:09 2014
@@ -229,6 +229,9 @@ Bug Fixes
 * SOLR-5663: example-DIH uses non-existing column for mapping (case-sensitive)
   (steffkes)
 
+* SOLR-5666: Using the hdfs write cache can result in appearance of corrupted
+  index. (Mark Miller)
+
 Optimizations
 ----------------------
 

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/HdfsDirectoryFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/HdfsDirectoryFactory.java?rev=1561751&r1=1561750&r2=1561751&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/HdfsDirectoryFactory.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/HdfsDirectoryFactory.java Mon
Jan 27 17:03:09 2014
@@ -128,12 +128,10 @@ public class HdfsDirectoryFactory extend
           new Object[] {slabSize, bankCount,
               ((long) bankCount * (long) slabSize)});
       
-      int _1024Size = params.getInt("solr.hdfs.blockcache.bufferstore.1024",
-          8192);
-      int _8192Size = params.getInt("solr.hdfs.blockcache.bufferstore.8192",
-          8192);
+      int bufferSize = params.getInt("solr.hdfs.blockcache.bufferstore.buffersize", 128);
+      int bufferCount = params.getInt("solr.hdfs.blockcache.bufferstore.buffercount", 128
* 128);
       
-      BufferStore.init(_1024Size, _8192Size, metrics);
+      BufferStore.initNewBuffer(bufferSize, bufferCount);
       long totalMemory = (long) bankCount * (long) numberOfBlocksPerBank
           * (long) blockSize;
       try {

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/store/blockcache/BlockDirectory.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/store/blockcache/BlockDirectory.java?rev=1561751&r1=1561750&r2=1561751&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/store/blockcache/BlockDirectory.java
(original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/store/blockcache/BlockDirectory.java
Mon Jan 27 17:03:09 2014
@@ -138,33 +138,34 @@ public class BlockDirectory extends Dire
   }
   
   static class CachedIndexInput extends CustomBufferedIndexInput {
-    
-    private IndexInput _source;
-    private int _blockSize;
-    private long _fileLength;
-    private String _cacheName;
-    private Cache _cache;
+    private final Store store;
+    private IndexInput source;
+    private final int blockSize;
+    private final long fileLength;
+    private final String cacheName;
+    private final Cache cache;
     
     public CachedIndexInput(IndexInput source, int blockSize, String name,
         String cacheName, Cache cache, int bufferSize) {
       super(name, bufferSize);
-      _source = source;
-      _blockSize = blockSize;
-      _fileLength = source.length();
-      _cacheName = cacheName;
-      _cache = cache;
+      this.source = source;
+      this.blockSize = blockSize;
+      fileLength = source.length();
+      this.cacheName = cacheName;
+      this.cache = cache;
+      store = BufferStore.instance(blockSize);
     }
     
     @Override
     public IndexInput clone() {
       CachedIndexInput clone = (CachedIndexInput) super.clone();
-      clone._source = (IndexInput) _source.clone();
+      clone.source = (IndexInput) source.clone();
       return clone;
     }
     
     @Override
     public long length() {
-      return _source.length();
+      return source.length();
     }
     
     @Override
@@ -186,7 +187,7 @@ public class BlockDirectory extends Dire
       // read whole block into cache and then provide needed data
       long blockId = getBlock(position);
       int blockOffset = (int) getPosition(position);
-      int lengthToReadInBlock = Math.min(len, _blockSize - blockOffset);
+      int lengthToReadInBlock = Math.min(len, blockSize - blockOffset);
       if (checkCache(blockId, blockOffset, b, off, lengthToReadInBlock)) {
         return lengthToReadInBlock;
       } else {
@@ -199,25 +200,25 @@ public class BlockDirectory extends Dire
     private void readIntoCacheAndResult(long blockId, int blockOffset,
         byte[] b, int off, int lengthToReadInBlock) throws IOException {
       long position = getRealPosition(blockId, 0);
-      int length = (int) Math.min(_blockSize, _fileLength - position);
-      _source.seek(position);
+      int length = (int) Math.min(blockSize, fileLength - position);
+      source.seek(position);
       
-      byte[] buf = BufferStore.takeBuffer(_blockSize);
-      _source.readBytes(buf, 0, length);
+      byte[] buf = store.takeBuffer(blockSize);
+      source.readBytes(buf, 0, length);
       System.arraycopy(buf, blockOffset, b, off, lengthToReadInBlock);
-      _cache.update(_cacheName, blockId, 0, buf, 0, _blockSize);
-      BufferStore.putBuffer(buf);
+      cache.update(cacheName, blockId, 0, buf, 0, blockSize);
+      store.putBuffer(buf);
     }
     
     private boolean checkCache(long blockId, int blockOffset, byte[] b,
         int off, int lengthToReadInBlock) {
-      return _cache.fetch(_cacheName, blockId, blockOffset, b, off,
+      return cache.fetch(cacheName, blockId, blockOffset, b, off,
           lengthToReadInBlock);
     }
     
     @Override
     protected void closeInternal() throws IOException {
-      _source.close();
+      source.close();
     }
   }
   

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/store/blockcache/BufferStore.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/store/blockcache/BufferStore.java?rev=1561751&r1=1561750&r2=1561751&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/store/blockcache/BufferStore.java
(original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/store/blockcache/BufferStore.java
Mon Jan 27 17:03:09 2014
@@ -19,34 +19,50 @@ package org.apache.solr.store.blockcache
 
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-public class BufferStore {
-  
-  public static Logger LOG = LoggerFactory.getLogger(BufferStore.class);
-  
-  private static BlockingQueue<byte[]> _1024 = setupBuffers(1024, 1);
-  private static BlockingQueue<byte[]> _8192 = setupBuffers(8192, 1);
-  public static AtomicLong shardBuffercacheLost = new AtomicLong();
-  public static AtomicLong shardBuffercacheAllocate1024 = new AtomicLong();
-  public static AtomicLong shardBuffercacheAllocate8192 = new AtomicLong();
-  public static AtomicLong shardBuffercacheAllocateOther = new AtomicLong();
-  
-  public static void init(int _1024Size, int _8192Size, Metrics metrics) {
-
-    LOG.info("Initializing the 1024 buffers with [{}] buffers.", _1024Size);
-    _1024 = setupBuffers(1024, _1024Size);
-    LOG.info("Initializing the 8192 buffers with [{}] buffers.", _8192Size);
-    _8192 = setupBuffers(8192, _8192Size);
-    shardBuffercacheLost = metrics.shardBuffercacheLost;
-    shardBuffercacheAllocate1024 = metrics.shardBuffercacheAllocate1024;
-    shardBuffercacheAllocate8192 = metrics.shardBuffercacheAllocate8192;
-    shardBuffercacheAllocateOther = metrics.shardBuffercacheAllocateOther;
+public class BufferStore implements Store {
+
+  private static final Store EMPTY = new Store() {
+
+    @Override
+    public byte[] takeBuffer(int bufferSize) {
+      return new byte[bufferSize];
+    }
+
+    @Override
+    public void putBuffer(byte[] buffer) {
+    }
+  };
+
+  private final static ConcurrentMap<Integer, BufferStore> bufferStores = new ConcurrentHashMap<Integer,
BufferStore>();
+
+  private final BlockingQueue<byte[]> buffers;
+
+  private final int bufferSize;
+
+  public synchronized static void initNewBuffer(int bufferSize, long totalAmount) {
+    if (totalAmount == 0) {
+      return;
+    }
+    BufferStore bufferStore = bufferStores.get(bufferSize);
+    if (bufferStore == null) {
+      long count = totalAmount / bufferSize;
+      if (count > Integer.MAX_VALUE) {
+        count = Integer.MAX_VALUE;
+      }
+      BufferStore store = new BufferStore(bufferSize, (int) count);
+      bufferStores.put(bufferSize, store);
+    }
+  }
+
+  private BufferStore(int bufferSize, int count) {
+    this.bufferSize = bufferSize;
+    buffers = setupBuffers(bufferSize, count);
   }
-  
+
   private static BlockingQueue<byte[]> setupBuffers(int bufferSize, int count) {
     BlockingQueue<byte[]> queue = new ArrayBlockingQueue<byte[]>(count);
     for (int i = 0; i < count; i++) {
@@ -54,57 +70,44 @@ public class BufferStore {
     }
     return queue;
   }
-  
-  public static byte[] takeBuffer(int bufferSize) {
-    switch (bufferSize) {
-      case 1024:
-        return newBuffer1024(_1024.poll());
-      case 8192:
-        return newBuffer8192(_8192.poll());
-      default:
-        return newBuffer(bufferSize);
+
+  public static Store instance(int bufferSize) {
+    BufferStore bufferStore = bufferStores.get(bufferSize);
+    if (bufferStore == null) {
+      return EMPTY;
     }
+    return bufferStore;
   }
-  
-  public static void putBuffer(byte[] buffer) {
+
+  @Override
+  public byte[] takeBuffer(int bufferSize) {
+    if (this.bufferSize != bufferSize) {
+      throw new RuntimeException("Buffer with length [" + bufferSize + "] does not match
buffer size of ["
+          + bufferSize + "]");
+    }
+    return newBuffer(buffers.poll());
+  }
+
+  @Override
+  public void putBuffer(byte[] buffer) {
     if (buffer == null) {
       return;
     }
-    int bufferSize = buffer.length;
-    switch (bufferSize) {
-      case 1024:
-        checkReturn(_1024.offer(buffer));
-        return;
-      case 8192:
-        checkReturn(_8192.offer(buffer));
-        return;
-    }
-  }
-  
-  private static void checkReturn(boolean offer) {
-    if (!offer) {
-      shardBuffercacheLost.incrementAndGet();
+    if (buffer.length != bufferSize) {
+      throw new RuntimeException("Buffer with length [" + buffer.length + "] does not match
buffer size of ["
+          + bufferSize + "]");
     }
+    checkReturn(buffers.offer(buffer));
   }
-  
-  private static byte[] newBuffer1024(byte[] buf) {
-    if (buf != null) {
-      return buf;
-    }
-    shardBuffercacheAllocate1024.incrementAndGet();
-    return new byte[1024];
+
+  private void checkReturn(boolean offer) {
+
   }
-  
-  private static byte[] newBuffer8192(byte[] buf) {
+
+  private byte[] newBuffer(byte[] buf) {
     if (buf != null) {
       return buf;
     }
-    shardBuffercacheAllocate8192.incrementAndGet();
-    return new byte[8192];
-  }
-  
-  private static byte[] newBuffer(int size) {
-    shardBuffercacheAllocateOther.incrementAndGet();
-    return new byte[size];
+    return new byte[bufferSize];
   }
 }

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/store/blockcache/CustomBufferedIndexInput.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/store/blockcache/CustomBufferedIndexInput.java?rev=1561751&r1=1561750&r2=1561751&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/store/blockcache/CustomBufferedIndexInput.java
(original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/store/blockcache/CustomBufferedIndexInput.java
Mon Jan 27 17:03:09 2014
@@ -35,6 +35,8 @@ public abstract class CustomBufferedInde
   private int bufferLength = 0; // end of valid bytes
   private int bufferPosition = 0; // next byte to read
   
+  private Store store;
+  
   @Override
   public byte readByte() throws IOException {
     if (bufferPosition >= bufferLength) refill();
@@ -49,6 +51,7 @@ public abstract class CustomBufferedInde
     super(resourceDesc);
     checkBufferSize(bufferSize);
     this.bufferSize = bufferSize;
+    this.store = BufferStore.instance(bufferSize);
   }
   
   private void checkBufferSize(int bufferSize) {
@@ -179,7 +182,7 @@ public abstract class CustomBufferedInde
     if (newLength <= 0) throw new EOFException("read past EOF");
     
     if (buffer == null) {
-      buffer = BufferStore.takeBuffer(bufferSize);
+      buffer = store.takeBuffer(bufferSize);
       seekInternal(bufferStart);
     }
     readInternal(buffer, 0, newLength);
@@ -191,7 +194,7 @@ public abstract class CustomBufferedInde
   @Override
   public final void close() throws IOException {
     closeInternal();
-    BufferStore.putBuffer(buffer);
+    store.putBuffer(buffer);
     buffer = null;
   }
   

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/store/blockcache/ReusedBufferedIndexOutput.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/store/blockcache/ReusedBufferedIndexOutput.java?rev=1561751&r1=1561750&r2=1561751&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/store/blockcache/ReusedBufferedIndexOutput.java
(original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/store/blockcache/ReusedBufferedIndexOutput.java
Mon Jan 27 17:03:09 2014
@@ -1,6 +1,6 @@
 package org.apache.solr.store.blockcache;
 
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -38,6 +38,8 @@ public abstract class ReusedBufferedInde
   /** total length of the file */
   private long fileLength = 0;
   
+  private final Store store;
+  
   public ReusedBufferedIndexOutput() {
     this(BUFFER_SIZE);
   }
@@ -45,7 +47,8 @@ public abstract class ReusedBufferedInde
   public ReusedBufferedIndexOutput(int bufferSize) {
     checkBufferSize(bufferSize);
     this.bufferSize = bufferSize;
-    buffer = BufferStore.takeBuffer(this.bufferSize);
+    store = BufferStore.instance(bufferSize);
+    buffer = store.takeBuffer(this.bufferSize);
   }
   
   protected long getBufferStart() {
@@ -80,7 +83,7 @@ public abstract class ReusedBufferedInde
   public void close() throws IOException {
     flushBufferToCache();
     closeInternal();
-    BufferStore.putBuffer(buffer);
+    store.putBuffer(buffer);
     buffer = null;
   }
   

Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/store/blockcache/Store.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/store/blockcache/Store.java?rev=1561751&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/store/blockcache/Store.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/store/blockcache/Store.java Mon Jan
27 17:03:09 2014
@@ -0,0 +1,26 @@
+package org.apache.solr.store.blockcache;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public interface Store {
+
+  byte[] takeBuffer(int bufferSize);
+
+  void putBuffer(byte[] buffer);
+
+}



Mime
View raw message