incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [1/2] git commit: Finished BLUR-33, I had to add the BlurLockFactory back. It was mistakely removed.
Date Thu, 18 Oct 2012 02:04:28 GMT
Updated Branches:
  refs/heads/lucene-4.0.0 8b4853769 -> 4c77501b5


Finished BLUR-33, I had to add the BlurLockFactory back.  It was mistakely removed.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/4c77501b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/4c77501b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/4c77501b

Branch: refs/heads/lucene-4.0.0
Commit: 4c77501b5e45bf562522aa536337ca0598fd696e
Parents: dda0c7a
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Wed Oct 17 22:02:58 2012 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Wed Oct 17 22:02:58 2012 -0400

----------------------------------------------------------------------
 .../indexserver/DefaultBlurIndexWarmup.java        |  101 ++++++-------
 .../indexserver/DistributedIndexServer.java        |  114 +++++++--------
 .../blur/manager/indexserver/LocalIndexServer.java |   18 +-
 .../apache/blur/store/hdfs/BlurLockFactory.java    |  102 +++++++++++++
 4 files changed, 216 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4c77501b/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DefaultBlurIndexWarmup.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DefaultBlurIndexWarmup.java
b/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DefaultBlurIndexWarmup.java
index f828229..25e76e3 100644
--- a/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DefaultBlurIndexWarmup.java
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DefaultBlurIndexWarmup.java
@@ -17,24 +17,13 @@ package org.apache.blur.manager.indexserver;
  * limitations under the License.
  */
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
 import org.apache.blur.manager.indexserver.DistributedIndexServer.ReleaseReader;
-import org.apache.blur.thrift.generated.ColumnPreCache;
 import org.apache.blur.thrift.generated.TableDescriptor;
-import org.apache.blur.utils.BlurConstants;
-import org.apache.lucene.index.FieldInfo;
-import org.apache.lucene.index.FieldInfos;
 import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.index.Term;
-import org.apache.lucene.index.WarmUpByFieldBounds;
-import org.apache.lucene.index.WarmUpByFieldBoundsStatus;
-import org.apache.lucene.util.ReaderUtil;
-
 
 public class DefaultBlurIndexWarmup extends BlurIndexWarmup {
 
@@ -42,48 +31,56 @@ public class DefaultBlurIndexWarmup extends BlurIndexWarmup {
 
   @Override
   public void warmBlurIndex(final TableDescriptor table, final String shard, IndexReader
reader, AtomicBoolean isClosed, ReleaseReader releaseReader) throws IOException {
-    try {
-      ColumnPreCache columnPreCache = table.columnPreCache;
-      List<String> preCacheCols = null;
-      if (columnPreCache != null) {
-        preCacheCols = columnPreCache.preCacheCols;
-      }
-      if (preCacheCols == null) {
-        LOG.info("No pre cache defined, precache all fields.");
-        FieldInfos fieldInfos = ReaderUtil.getMergedFieldInfos(reader);
-        preCacheCols = new ArrayList<String>();
-        for (FieldInfo fieldInfo : fieldInfos) {
-          if (fieldInfo.isIndexed) {
-            preCacheCols.add(fieldInfo.name);
-          }
-        }
-        preCacheCols.remove(BlurConstants.ROW_ID);
-        preCacheCols.remove(BlurConstants.RECORD_ID);
-        preCacheCols.remove(BlurConstants.PRIME_DOC);
-        preCacheCols.remove(BlurConstants.SUPER);
-      }
+    LOG.warn("Warm up NOT supported yet.");
+    // Once the reader warm-up has been re-implemented, this code will change
+    // accordingly.
 
-      WarmUpByFieldBounds warmUpByFieldBounds = new WarmUpByFieldBounds();
-      WarmUpByFieldBoundsStatus status = new WarmUpByFieldBoundsStatus() {
-        @Override
-        public void complete(String name, Term start, Term end, long startPosition, long
endPosition, long totalBytesRead, long nanoTime, AtomicBoolean isClosed) {
-          double bytesPerNano = totalBytesRead / (double) nanoTime;
-          double mBytesPerNano = bytesPerNano / 1024 / 1024;
-          double mBytesPerSecond = mBytesPerNano * 1000000000.0;
-          if (totalBytesRead > 0) {
-            LOG.info("Precached field [{0}] in table [{1}] shard [{2}] file [{3}], [{4}]
bytes cached at [{5} MB/s]", start.field(), table.name, shard, name, totalBytesRead,
-                mBytesPerSecond);
-          }
-        }
-      };
-      if (preCacheCols != null) {
-        for (String field : preCacheCols) {
-          warmUpByFieldBounds.warmUpByField(isClosed, new Term(field), reader, status);
-        }
-      }
-    } finally {
-      releaseReader.release();
-    }
+    // try {
+    // ColumnPreCache columnPreCache = table.columnPreCache;
+    // List<String> preCacheCols = null;
+    // if (columnPreCache != null) {
+    // preCacheCols = columnPreCache.preCacheCols;
+    // }
+    // if (preCacheCols == null) {
+    // LOG.info("No pre cache defined, precache all fields.");
+    // FieldInfos fieldInfos = ReaderUtil.getMergedFieldInfos(reader);
+    // preCacheCols = new ArrayList<String>();
+    // for (FieldInfo fieldInfo : fieldInfos) {
+    // if (fieldInfo.isIndexed) {
+    // preCacheCols.add(fieldInfo.name);
+    // }
+    // }
+    // preCacheCols.remove(BlurConstants.ROW_ID);
+    // preCacheCols.remove(BlurConstants.RECORD_ID);
+    // preCacheCols.remove(BlurConstants.PRIME_DOC);
+    // preCacheCols.remove(BlurConstants.SUPER);
+    // }
+    //
+    // WarmUpByFieldBounds warmUpByFieldBounds = new WarmUpByFieldBounds();
+    // WarmUpByFieldBoundsStatus status = new WarmUpByFieldBoundsStatus() {
+    // @Override
+    // public void complete(String name, Term start, Term end, long
+    // startPosition, long endPosition, long totalBytesRead, long nanoTime,
+    // AtomicBoolean isClosed) {
+    // double bytesPerNano = totalBytesRead / (double) nanoTime;
+    // double mBytesPerNano = bytesPerNano / 1024 / 1024;
+    // double mBytesPerSecond = mBytesPerNano * 1000000000.0;
+    // if (totalBytesRead > 0) {
+    // LOG.info("Precached field [{0}] in table [{1}] shard [{2}] file [{3}], [{4}] bytes
cached at [{5} MB/s]",
+    // start.field(), table.name, shard, name, totalBytesRead,
+    // mBytesPerSecond);
+    // }
+    // }
+    // };
+    // if (preCacheCols != null) {
+    // for (String field : preCacheCols) {
+    // warmUpByFieldBounds.warmUpByField(isClosed, new Term(field), reader,
+    // status);
+    // }
+    // }
+    // } finally {
+    // releaseReader.release();
+    // }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4c77501b/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
b/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
index 65da0a0..4d22c5c 100644
--- a/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
@@ -20,7 +20,6 @@ import static org.apache.blur.utils.BlurConstants.SHARD_PREFIX;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -56,9 +55,8 @@ import org.apache.blur.manager.writer.DirectoryReferenceFileGC;
 import org.apache.blur.metrics.BlurMetrics;
 import org.apache.blur.store.blockcache.BlockDirectory;
 import org.apache.blur.store.blockcache.Cache;
-import org.apache.blur.store.compressed.CompressedFieldDataDirectory;
+import org.apache.blur.store.hdfs.BlurLockFactory;
 import org.apache.blur.store.hdfs.HdfsDirectory;
-import org.apache.blur.store.lock.BlurLockFactory;
 import org.apache.blur.thrift.generated.TableDescriptor;
 import org.apache.blur.utils.BlurConstants;
 import org.apache.blur.utils.BlurUtil;
@@ -71,15 +69,10 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.lucene.index.FieldInfo;
-import org.apache.lucene.index.FieldInfos;
 import org.apache.lucene.index.IndexDeletionPolicy;
 import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.index.TermDocs;
-import org.apache.lucene.index.TermPositions;
-import org.apache.lucene.search.Similarity;
+import org.apache.lucene.search.similarities.Similarity;
 import org.apache.lucene.store.Directory;
-import org.apache.lucene.util.ReaderUtil;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs.Ids;
@@ -269,18 +262,19 @@ public class DistributedIndexServer extends AbstractIndexServer {
       }
 
       private void updateMetrics(BlurMetrics blurMetrics, Map<String, BlurIndex> indexes,
AtomicLong segmentCount, AtomicLong indexMemoryUsage) throws IOException {
-        for (BlurIndex index : indexes.values()) {
-          IndexReader reader = index.getIndexReader();
-          try {
-            IndexReader[] readers = reader.getSequentialSubReaders();
-            if (readers != null) {
-              segmentCount.addAndGet(readers.length);
-            }
-            indexMemoryUsage.addAndGet(BlurUtil.getMemoryUsage(reader));
-          } finally {
-            reader.decRef();
-          }
-        }
+        // @TODO not sure how to do this yet
+//        for (BlurIndex index : indexes.values()) {
+//          IndexReader reader = index.getIndexReader();
+//          try {
+//            IndexReader[] readers = reader.getSequentialSubReaders();
+//            if (readers != null) {
+//              segmentCount.addAndGet(readers.length);
+//            }
+//            indexMemoryUsage.addAndGet(BlurUtil.getMemoryUsage(reader));
+//          } finally {
+//            reader.decRef();
+//          }
+//        }
       }
     }, _delay, _delay);
   }
@@ -478,20 +472,21 @@ public class DistributedIndexServer extends AbstractIndexServer {
 
     BlurLockFactory lockFactory = new BlurLockFactory(_configuration, hdfsDirPath, _nodeName,
BlurConstants.getPid());
 
-    Directory directory = new HdfsDirectory(hdfsDirPath);
+    Directory directory = new HdfsDirectory(_configuration, hdfsDirPath);
     directory.setLockFactory(lockFactory);
 
     TableDescriptor descriptor = _clusterStatus.getTableDescriptor(true, _cluster, table);
     String compressionClass = descriptor.compressionClass;
     int compressionBlockSize = descriptor.compressionBlockSize;
     if (compressionClass != null) {
-      CompressionCodec compressionCodec;
-      try {
-        compressionCodec = BlurUtil.getInstance(compressionClass, CompressionCodec.class);
-        directory = new CompressedFieldDataDirectory(directory, compressionCodec, compressionBlockSize);
-      } catch (Exception e) {
-        throw new IOException(e);
-      }
+      throw new RuntimeException("Not supported yet");
+//      CompressionCodec compressionCodec;
+//      try {
+//        compressionCodec = BlurUtil.getInstance(compressionClass, CompressionCodec.class);
+//        directory = new CompressedFieldDataDirectory(directory, compressionCodec, compressionBlockSize);
+//      } catch (Exception e) {
+//        throw new IOException(e);
+//      }
     }
 
     Directory dir;
@@ -553,35 +548,38 @@ public class DistributedIndexServer extends AbstractIndexServer {
   }
 
   private void warmUpAllSegments(IndexReader reader) throws IOException {
-    IndexReader[] indexReaders = reader.getSequentialSubReaders();
-    if (indexReaders != null) {
-      for (IndexReader r : indexReaders) {
-        warmUpAllSegments(r);
-      }
-    }
-    int maxDoc = reader.maxDoc();
-    int numDocs = reader.numDocs();
-    FieldInfos fieldInfos = ReaderUtil.getMergedFieldInfos(reader);
-    Collection<String> fieldNames = new ArrayList<String>();
-    for (FieldInfo fieldInfo : fieldInfos) {
-      if (fieldInfo.isIndexed) {
-        fieldNames.add(fieldInfo.name);
-      }
-    }
-    int primeDocCount = reader.docFreq(BlurConstants.PRIME_DOC_TERM);
-    TermDocs termDocs = reader.termDocs(BlurConstants.PRIME_DOC_TERM);
-    termDocs.next();
-    termDocs.close();
-
-    TermPositions termPositions = reader.termPositions(BlurConstants.PRIME_DOC_TERM);
-    if (termPositions.next()) {
-      if (termPositions.freq() > 0) {
-        termPositions.nextPosition();
-      }
-    }
-    termPositions.close();
-    LOG.info("Warmup of indexreader [" + reader + "] complete, maxDocs [" + maxDoc + "],
numDocs [" + numDocs + "], primeDocumentCount [" + primeDocCount + "], fieldCount ["
-        + fieldNames.size() + "]");
+    LOG.warn("Warm up NOT supported yet.");
+    //Once the reader warm-up has been re-implemented, this code will change accordingly.
+    
+//    IndexReader[] indexReaders = reader.getSequentialSubReaders();
+//    if (indexReaders != null) {
+//      for (IndexReader r : indexReaders) {
+//        warmUpAllSegments(r);
+//      }
+//    }
+//    int maxDoc = reader.maxDoc();
+//    int numDocs = reader.numDocs();
+//    FieldInfos fieldInfos = ReaderUtil.getMergedFieldInfos(reader);
+//    Collection<String> fieldNames = new ArrayList<String>();
+//    for (FieldInfo fieldInfo : fieldInfos) {
+//      if (fieldInfo.isIndexed) {
+//        fieldNames.add(fieldInfo.name);
+//      }
+//    }
+//    int primeDocCount = reader.docFreq(BlurConstants.PRIME_DOC_TERM);
+//    TermDocs termDocs = reader.termDocs(BlurConstants.PRIME_DOC_TERM);
+//    termDocs.next();
+//    termDocs.close();
+//
+//    TermPositions termPositions = reader.termPositions(BlurConstants.PRIME_DOC_TERM);
+//    if (termPositions.next()) {
+//      if (termPositions.freq() > 0) {
+//        termPositions.nextPosition();
+//      }
+//    }
+//    termPositions.close();
+//    LOG.info("Warmup of indexreader [" + reader + "] complete, maxDocs [" + maxDoc + "],
numDocs [" + numDocs + "], primeDocumentCount [" + primeDocCount + "], fieldCount ["
+//        + fieldNames.size() + "]");
   }
 
   private synchronized Map<String, BlurIndex> openMissingShards(final String table,
Set<String> shardsToServe, final Map<String, BlurIndex> tableIndexes) {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4c77501b/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java
b/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java
index d5ee79f..041eb2a 100644
--- a/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java
@@ -16,7 +16,7 @@ package org.apache.blur.manager.indexserver;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-import static org.apache.blur.lucene.LuceneConstant.LUCENE_VERSION;
+import static org.apache.blur.lucene.LuceneVersionConstant.LUCENE_VERSION;
 
 import java.io.File;
 import java.io.IOException;
@@ -38,21 +38,21 @@ import org.apache.blur.lucene.search.FairSimilarity;
 import org.apache.blur.manager.writer.BlurIndex;
 import org.apache.blur.manager.writer.BlurIndexCloser;
 import org.apache.blur.manager.writer.BlurNRTIndex;
-import org.apache.blur.store.compressed.CompressedFieldDataDirectory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.lucene.analysis.KeywordAnalyzer;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.lucene.analysis.core.KeywordAnalyzer;
 import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.analysis.util.CharArraySet;
 import org.apache.lucene.index.CorruptIndexException;
-import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.DirectoryReader;
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.IndexWriterConfig;
-import org.apache.lucene.search.Similarity;
+import org.apache.lucene.search.similarities.Similarity;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.MMapDirectory;
 
-
 public class LocalIndexServer extends AbstractIndexServer {
 
   private final static Log LOG = LogFactory.getLog(LocalIndexServer.class);
@@ -61,7 +61,7 @@ public class LocalIndexServer extends AbstractIndexServer {
   private File _localDir;
   private BlurIndexCloser _closer;
   private int _blockSize = 65536;
-  private CompressionCodec _compression = CompressedFieldDataDirectory.DEFAULT_COMPRESSION;
+  private CompressionCodec _compression = new DefaultCodec();
   private Path _walPath;
   private Configuration _configuration = new Configuration();
 
@@ -75,7 +75,7 @@ public class LocalIndexServer extends AbstractIndexServer {
 
   @Override
   public BlurAnalyzer getAnalyzer(String table) {
-    return new BlurAnalyzer(new StandardAnalyzer(LUCENE_VERSION, new HashSet<String>()));
+    return new BlurAnalyzer(new StandardAnalyzer(LUCENE_VERSION, new CharArraySet(LUCENE_VERSION,
new HashSet<String>(), false)));
   }
 
   @Override
@@ -130,7 +130,7 @@ public class LocalIndexServer extends AbstractIndexServer {
       for (File f : tableFile.listFiles()) {
         if (f.isDirectory()) {
           MMapDirectory directory = new MMapDirectory(f);
-          if (!IndexReader.indexExists(directory)) {
+          if (!DirectoryReader.indexExists(directory)) {
             new IndexWriter(directory, new IndexWriterConfig(LUCENE_VERSION, new KeywordAnalyzer())).close();
           }
           String shardName = f.getName();

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4c77501b/src/blur-store/src/main/java/org/apache/blur/store/hdfs/BlurLockFactory.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/org/apache/blur/store/hdfs/BlurLockFactory.java
b/src/blur-store/src/main/java/org/apache/blur/store/hdfs/BlurLockFactory.java
new file mode 100644
index 0000000..b1c3930
--- /dev/null
+++ b/src/blur-store/src/main/java/org/apache/blur/store/hdfs/BlurLockFactory.java
@@ -0,0 +1,102 @@
+package org.apache.blur.store.hdfs;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.lucene.store.Lock;
+import org.apache.lucene.store.LockFactory;
+
+public class BlurLockFactory extends LockFactory {
+
+  private final Configuration _configuration;
+  private final FileSystem _fileSystem;
+  private final String _baseLockKey;
+  private byte[] _lockKey;
+  private final Path _dir;
+
+  public BlurLockFactory(Configuration configuration, Path dir, String host, int pid) throws
IOException {
+    _configuration = configuration;
+    _dir = dir;
+    _fileSystem = _dir.getFileSystem(_configuration);
+    _baseLockKey = host + "/" + pid;
+  }
+
+  @Override
+  public Lock makeLock(String lockName) {
+    final Path lockPath = new Path(_dir, lockName);
+    return new Lock() {
+      private boolean _set;
+
+      @Override
+      public boolean obtain() throws IOException {
+        if (_set) {
+          throw new IOException("Lock for [" + _baseLockKey + "] can only be set once.");
+        }
+        try {
+          _lockKey = (_baseLockKey + "/" + System.currentTimeMillis()).getBytes();
+          FSDataOutputStream outputStream = _fileSystem.create(lockPath, true);
+          outputStream.write(_lockKey);
+          outputStream.close();
+        } finally {
+          _set = true;
+        }
+        return true;
+      }
+
+      @Override
+      public void release() throws IOException {
+        _fileSystem.delete(lockPath, false);
+      }
+
+      @Override
+      public boolean isLocked() throws IOException {
+        if (!_set) {
+          return false;
+        }
+        if (!_fileSystem.exists(lockPath)) {
+          return false;
+        }
+        FileStatus fileStatus = _fileSystem.getFileStatus(lockPath);
+        long len = fileStatus.getLen();
+        if (len != _lockKey.length) {
+          return false;
+        }
+        byte[] buf = new byte[_lockKey.length];
+        FSDataInputStream inputStream = _fileSystem.open(lockPath);
+        inputStream.readFully(buf);
+        inputStream.close();
+        if (Arrays.equals(_lockKey, buf)) {
+          return true;
+        }
+        return false;
+      }
+    };
+  }
+
+  @Override
+  public void clearLock(String lockName) throws IOException {
+    _fileSystem.delete(new Path(_dir, lockName), false);
+  }
+}
\ No newline at end of file


Mime
View raw message