geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zho...@apache.org
Subject incubator-geode git commit: GEODE-11: create index repository using raw Lucene directory.
Date Thu, 01 Sep 2016 00:11:42 GMT
Repository: incubator-geode
Updated Branches:
  refs/heads/develop 07798ca80 -> 746820bb1


GEODE-11: create index repository using raw Lucene directory.

GEODE-11: add RawDirectory using index instance

GEODE-11: add abstract class for index and repositoryManager

remove commented lines in test code


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/746820bb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/746820bb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/746820bb

Branch: refs/heads/develop
Commit: 746820bb18fe3f3ab7bd6b00f847821d934bbf09
Parents: 07798ca
Author: zhouxh <gzhou@pivotal.io>
Authored: Fri Jul 15 15:57:07 2016 -0700
Committer: zhouxh <gzhou@pivotal.io>
Committed: Wed Aug 31 17:09:08 2016 -0700

----------------------------------------------------------------------
 .../AbstractPartitionedRepositoryManager.java   | 124 +++++++++++++++++
 .../lucene/internal/IndexRepositoryFactory.java |  21 ++-
 .../lucene/internal/LuceneIndexFactory.java     |  30 +++++
 .../LuceneIndexForPartitionedRegion.java        | 134 +++++++------------
 .../cache/lucene/internal/LuceneIndexImpl.java  |  79 +++++++++--
 .../cache/lucene/internal/LuceneRawIndex.java   |  43 ++++++
 .../lucene/internal/LuceneRawIndexFactory.java  |  27 ++++
 .../lucene/internal/LuceneServiceImpl.java      |   3 +-
 .../internal/PartitionedRepositoryManager.java  | 123 ++---------------
 .../internal/RawIndexRepositoryFactory.java     |  63 +++++++++
 .../internal/RawLuceneRepositoryManager.java    |  46 +++++++
 .../repository/IndexRepositoryImpl.java         |   6 +-
 .../LuceneIndexCreationIntegrationTest.java     |  29 ++++
 .../cache/lucene/LuceneQueriesPRBase.java       |   7 +-
 .../LuceneIndexForPartitionedRegionTest.java    |  34 ++++-
 .../LuceneIndexRecoveryHAIntegrationTest.java   |  19 +--
 .../PartitionedRepositoryManagerJUnitTest.java  |  68 +++++-----
 .../RawLuceneRepositoryManagerJUnitTest.java    |  97 ++++++++++++++
 .../DistributedScoringJUnitTest.java            |   2 +-
 .../IndexRepositoryImplJUnitTest.java           |   2 +-
 .../IndexRepositoryImplPerformanceTest.java     |   2 +-
 .../cache/lucene/test/IndexRepositorySpy.java   |  20 +--
 22 files changed, 689 insertions(+), 290 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/746820bb/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/AbstractPartitionedRepositoryManager.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/AbstractPartitionedRepositoryManager.java b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/AbstractPartitionedRepositoryManager.java
new file mode 100755
index 0000000..1dc716c
--- /dev/null
+++ b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/AbstractPartitionedRepositoryManager.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.lucene.internal;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.gemstone.gemfire.InternalGemFireError;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.execute.RegionFunctionContext;
+import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepository;
+import com.gemstone.gemfire.cache.lucene.internal.repository.RepositoryManager;
+import com.gemstone.gemfire.cache.lucene.internal.repository.serializer.LuceneSerializer;
+import com.gemstone.gemfire.internal.cache.BucketNotFoundException;
+import com.gemstone.gemfire.internal.cache.BucketRegion;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.cache.execute.InternalRegionFunctionContext;
+
+public abstract class AbstractPartitionedRepositoryManager implements RepositoryManager {
+
+  /** map of the parent bucket region to the index repository
+   * 
+   * This is based on the BucketRegion in case a bucket is rebalanced, we don't want to 
+   * return a stale index repository. If a bucket moves off of this node and
+   * comes back, it will have a new BucketRegion object.
+   * 
+   * It is weak so that the old BucketRegion will be garbage collected. 
+   */
+  protected final ConcurrentHashMap<Integer, IndexRepository> indexRepositories = new ConcurrentHashMap<Integer, IndexRepository>();
+  
+  /** The user region for this index */
+  protected final PartitionedRegion userRegion;
+  protected final LuceneSerializer serializer;
+  protected final LuceneIndexImpl index; 
+
+  public AbstractPartitionedRepositoryManager(
+      LuceneIndexImpl index,
+      LuceneSerializer serializer) {
+    this.index = index;
+    this.userRegion = (PartitionedRegion)index.getCache().getRegion(index.getRegionPath());
+    this.serializer = serializer;
+  }
+
+  @Override
+  public IndexRepository getRepository(Region region, Object key,
+      Object callbackArg) throws BucketNotFoundException {
+    BucketRegion userBucket = userRegion.getBucketRegion(key, callbackArg);
+    if(userBucket == null) {
+      throw new BucketNotFoundException("User bucket was not found for region " + region + "key " +  key + " callbackarg " + callbackArg);
+    }
+    
+    return getRepository(userBucket.getId());
+  }
+
+  @Override
+  public Collection<IndexRepository> getRepositories(RegionFunctionContext ctx) throws BucketNotFoundException {
+    Region<Object, Object> region = ctx.getDataSet();
+    Set<Integer> buckets = ((InternalRegionFunctionContext) ctx).getLocalBucketSet(region);
+    ArrayList<IndexRepository> repos = new ArrayList<IndexRepository>(buckets.size());
+    for(Integer bucketId : buckets) {
+      BucketRegion userBucket = userRegion.getDataStore().getLocalBucketById(bucketId);
+      if(userBucket == null) {
+        throw new BucketNotFoundException("User bucket was not found for region " + region + "bucket id " + bucketId);
+      } else {
+        repos.add(getRepository(userBucket.getId()));
+      }
+    }
+
+    return repos;
+  }
+
+  public abstract IndexRepository createOneIndexRepository(final Integer bucketId,
+      LuceneSerializer serializer,
+      LuceneIndexImpl index, PartitionedRegion userRegion) throws IOException;
+  
+  /**
+   * Return the repository for a given user bucket
+   */
+  protected IndexRepository getRepository(Integer bucketId) throws BucketNotFoundException {
+    IndexRepository repo = indexRepositories.get(bucketId);
+    if(repo != null && !repo.isClosed()) {
+      return repo;
+    }
+
+    repo = indexRepositories.compute(bucketId, (key, oldRepository) -> {
+      if(oldRepository != null && !oldRepository.isClosed()) {
+        return oldRepository;
+      }
+      if(oldRepository != null) {
+        oldRepository.cleanup();
+      }
+
+      try {
+        return createOneIndexRepository(bucketId, serializer, index, userRegion);
+      } catch(IOException e) {
+        throw new InternalGemFireError("Unable to create index repository", e);
+      }
+
+    });
+
+    if(repo == null) {
+      throw new BucketNotFoundException("Colocated index buckets not found for bucket id " + bucketId);
+    }
+
+    return repo;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/746820bb/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/IndexRepositoryFactory.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/IndexRepositoryFactory.java b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/IndexRepositoryFactory.java
index ae4b88b..e6f01b0 100644
--- a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/IndexRepositoryFactory.java
+++ b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/IndexRepositoryFactory.java
@@ -19,7 +19,6 @@ package com.gemstone.gemfire.cache.lucene.internal;
 import java.io.IOException;
 
 import com.gemstone.gemfire.cache.lucene.internal.directory.RegionDirectory;
-import com.gemstone.gemfire.cache.lucene.internal.filesystem.FileSystemStats;
 import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepository;
 import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepositoryImpl;
 import com.gemstone.gemfire.cache.lucene.internal.repository.serializer.LuceneSerializer;
@@ -27,7 +26,6 @@ import com.gemstone.gemfire.internal.cache.BucketNotFoundException;
 import com.gemstone.gemfire.internal.cache.BucketRegion;
 import com.gemstone.gemfire.internal.cache.PartitionedRegion;
 
-import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.IndexWriterConfig;
 
@@ -37,25 +35,22 @@ public class IndexRepositoryFactory {
   }
 
   public IndexRepository createIndexRepository(final Integer bucketId,
-                                        PartitionedRegion userRegion,
-                                        PartitionedRegion fileRegion,
-                                        PartitionedRegion chunkRegion,
                                         LuceneSerializer serializer,
-                                        Analyzer analyzer,
-                                        LuceneIndexStats indexStats,
-                                        FileSystemStats fileSystemStats)
+                                        LuceneIndexImpl index, PartitionedRegion userRegion)
     throws IOException
   {
     final IndexRepository repo;
-    BucketRegion fileBucket = getMatchingBucket(fileRegion, bucketId);
-    BucketRegion chunkBucket = getMatchingBucket(chunkRegion, bucketId);
+    LuceneIndexForPartitionedRegion indexForPR = (LuceneIndexForPartitionedRegion)index; 
+    BucketRegion fileBucket = getMatchingBucket(indexForPR.getFileRegion(), bucketId);
+    BucketRegion chunkBucket = getMatchingBucket(indexForPR.getChunkRegion(), bucketId);
+    BucketRegion dataBucket = getMatchingBucket(userRegion, bucketId);
     if(fileBucket == null || chunkBucket == null) {
       return null;
     }
-    RegionDirectory dir = new RegionDirectory(fileBucket, chunkBucket, fileSystemStats);
-    IndexWriterConfig config = new IndexWriterConfig(analyzer);
+    RegionDirectory dir = new RegionDirectory(fileBucket, chunkBucket, indexForPR.getFileSystemStats());
+    IndexWriterConfig config = new IndexWriterConfig(indexForPR.getAnalyzer());
     IndexWriter writer = new IndexWriter(dir, config);
-    repo = new IndexRepositoryImpl(fileBucket, writer, serializer, indexStats);
+    repo = new IndexRepositoryImpl(fileBucket, writer, serializer, indexForPR.getIndexStats(), dataBucket);
     return repo;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/746820bb/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexFactory.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexFactory.java b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexFactory.java
new file mode 100755
index 0000000..b6ac867
--- /dev/null
+++ b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.gemstone.gemfire.cache.lucene.internal;
+
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+
+public class LuceneIndexFactory {
+  public LuceneIndexFactory() {
+  }
+  
+  public LuceneIndexImpl create(String indexName, String regionPath, GemFireCacheImpl cache) {
+    return new LuceneIndexForPartitionedRegion(indexName, regionPath, cache);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/746820bb/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexForPartitionedRegion.java b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
index af05e7d..b64e026 100644
--- a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
+++ b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
@@ -35,109 +35,77 @@ import com.gemstone.gemfire.cache.lucene.internal.directory.DumpDirectoryFiles;
 import com.gemstone.gemfire.cache.lucene.internal.filesystem.ChunkKey;
 import com.gemstone.gemfire.cache.lucene.internal.filesystem.File;
 import com.gemstone.gemfire.cache.lucene.internal.filesystem.FileSystemStats;
+import com.gemstone.gemfire.cache.lucene.internal.repository.RepositoryManager;
 import com.gemstone.gemfire.cache.lucene.internal.repository.serializer.HeterogeneousLuceneSerializer;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
 import com.gemstone.gemfire.internal.cache.PartitionedRegion;
 
 /* wrapper of IndexWriter */
 public class LuceneIndexForPartitionedRegion extends LuceneIndexImpl {
+  protected Region<String, File> fileRegion;
+  protected Region<ChunkKey, byte[]> chunkRegion;
+  protected final FileSystemStats fileSystemStats;
 
   public LuceneIndexForPartitionedRegion(String indexName, String regionPath, Cache cache) {
     super(indexName, regionPath, cache);
+
+    final String statsName = indexName + "-" + regionPath;
+    this.fileSystemStats = new FileSystemStats(cache.getDistributedSystem(), statsName);
   }
 
-  @Override
-  public void initialize() {
-    if (!hasInitialized) {
-      /* create index region */
-      PartitionedRegion dataRegion = getDataRegion();
-      //assert dataRegion != null;
-      RegionAttributes regionAttributes = dataRegion.getAttributes();
-      DataPolicy dp = regionAttributes.getDataPolicy();
-      final boolean withPersistence = dp.withPersistence();
-      final boolean withStorage = regionAttributes.getPartitionAttributes().getLocalMaxMemory()>0;
-      RegionShortcut regionShortCut;
-      if (withPersistence) {
-        // TODO: add PartitionedRegionAttributes instead
-        regionShortCut = RegionShortcut.PARTITION_PERSISTENT;
-      } else {
-        regionShortCut = RegionShortcut.PARTITION;
-      }
-
-      // TODO: 1) dataRegion should be withStorage
-      //       2) Persistence to Persistence
-      //       3) Replicate to Replicate, Partition To Partition
-      //       4) Offheap to Offheap
-      if (!withStorage) {
-        throw new IllegalStateException("The data region to create lucene index should be with storage");
-      }
-
-      // create PR fileRegion, but not to create its buckets for now
-      final String fileRegionName = createFileRegionName();
-      PartitionAttributes partitionAttributes = dataRegion.getPartitionAttributes();
-      if (!fileRegionExists(fileRegionName)) {
-        fileRegion = createFileRegion(regionShortCut, fileRegionName, partitionAttributes, regionAttributes);
-      }
-
-      // create PR chunkRegion, but not to create its buckets for now
-      final String chunkRegionName = createChunkRegionName();
-      if (!chunkRegionExists(chunkRegionName)) {
-        chunkRegion = createChunkRegion(regionShortCut, fileRegionName, partitionAttributes, chunkRegionName, regionAttributes);
-      }
-      fileSystemStats.setFileSupplier(() -> (int) getFileRegion().getLocalSize());
-      fileSystemStats.setChunkSupplier(() -> (int) getChunkRegion().getLocalSize());
-      fileSystemStats.setBytesSupplier(() -> getChunkRegion().getPrStats().getDataStoreBytesInUse());
-
-      // we will create RegionDirectories on the fly when data comes in
-      HeterogeneousLuceneSerializer mapper = new HeterogeneousLuceneSerializer(getFieldNames());
-      repositoryManager = new PartitionedRepositoryManager(dataRegion, (PartitionedRegion) fileRegion,
-        (PartitionedRegion) chunkRegion, mapper, analyzer, this.indexStats, this.fileSystemStats);
-      
-      // create AEQ, AEQ listener and specify the listener to repositoryManager
-      createAEQ(dataRegion);
-
-      addExtension(dataRegion);
-      hasInitialized = true;
+  protected RepositoryManager createRepositoryManager() {
+    RegionShortcut regionShortCut;
+    final boolean withPersistence = withPersistence(); 
+    RegionAttributes regionAttributes = dataRegion.getAttributes();
+    final boolean withStorage = regionAttributes.getPartitionAttributes().getLocalMaxMemory()>0;
+
+    // TODO: 1) dataRegion should be withStorage
+    //       2) Persistence to Persistence
+    //       3) Replicate to Replicate, Partition To Partition
+    //       4) Offheap to Offheap
+    if (!withStorage) {
+      throw new IllegalStateException("The data region to create lucene index should be with storage");
+    }
+    if (withPersistence) {
+      // TODO: add PartitionedRegionAttributes instead
+      regionShortCut = RegionShortcut.PARTITION_PERSISTENT;
+    } else {
+      regionShortCut = RegionShortcut.PARTITION;
+    }
+    
+    // create PR fileRegion, but not to create its buckets for now
+    final String fileRegionName = createFileRegionName();
+    PartitionAttributes partitionAttributes = dataRegion.getPartitionAttributes();
+    if (!fileRegionExists(fileRegionName)) {
+      fileRegion = createFileRegion(regionShortCut, fileRegionName, partitionAttributes, regionAttributes);
     }
-  }
 
-  private PartitionedRegion getDataRegion() {
-    return (PartitionedRegion) cache.getRegion(regionPath);
-  }
+    // create PR chunkRegion, but not to create its buckets for now
+    final String chunkRegionName = createChunkRegionName();
+    if (!chunkRegionExists(chunkRegionName)) {
+      chunkRegion = createChunkRegion(regionShortCut, fileRegionName, partitionAttributes, chunkRegionName, regionAttributes);
+    }
+    fileSystemStats.setFileSupplier(() -> (int) getFileRegion().getLocalSize());
+    fileSystemStats.setChunkSupplier(() -> (int) getChunkRegion().getLocalSize());
+    fileSystemStats.setBytesSupplier(() -> getChunkRegion().getPrStats().getDataStoreBytesInUse());
 
-  private PartitionedRegion getFileRegion() {
+    // we will create RegionDirectories on the fly when data comes in
+    HeterogeneousLuceneSerializer mapper = new HeterogeneousLuceneSerializer(getFieldNames());
+    return new PartitionedRepositoryManager(this, mapper);
+  }
+  
+  public PartitionedRegion getFileRegion() {
     return (PartitionedRegion) fileRegion;
   }
 
-  private PartitionedRegion getChunkRegion() {
+  public PartitionedRegion getChunkRegion() {
     return (PartitionedRegion) chunkRegion;
   }
 
-  private AsyncEventQueueFactoryImpl createAEQFactory(final Region dataRegion) {
-    AsyncEventQueueFactoryImpl factory = (AsyncEventQueueFactoryImpl) cache.createAsyncEventQueueFactory();
-    factory.setParallel(true); // parallel AEQ for PR
-    factory.setMaximumQueueMemory(1000);
-    factory.setDispatcherThreads(1);
-    factory.setIsMetaQueue(true);
-    if(dataRegion.getAttributes().getDataPolicy().withPersistence()) {
-      factory.setPersistent(true);
-    }
-    factory.setDiskStoreName(dataRegion.getAttributes().getDiskStoreName());
-    factory.setDiskSynchronous(dataRegion.getAttributes().isDiskSynchronous());
-    factory.setForwardExpirationDestroy(true);
-    return factory;
-  }
-
-  AsyncEventQueue createAEQ(Region dataRegion) {
-    return createAEQ(createAEQFactory(dataRegion));
-  }
-
-  private AsyncEventQueue createAEQ(AsyncEventQueueFactoryImpl factory) {
-    LuceneEventListener listener = new LuceneEventListener(repositoryManager);
-    String aeqId = LuceneServiceImpl.getUniqueIndexName(getName(), regionPath);
-    AsyncEventQueue indexQueue = factory.create(aeqId, listener);
-    return indexQueue;
+  public FileSystemStats getFileSystemStats() {
+    return fileSystemStats;
   }
-
+  
   boolean fileRegionExists(String fileRegionName) {
     return cache.<String, File> getRegion(fileRegionName) != null;
   }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/746820bb/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexImpl.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexImpl.java b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexImpl.java
index ff31c49..67461a9 100644
--- a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexImpl.java
+++ b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexImpl.java
@@ -29,9 +29,11 @@ import org.apache.lucene.analysis.standard.StandardAnalyzer;
 
 import com.gemstone.gemfire.InternalGemFireError;
 import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.DataPolicy;
 import com.gemstone.gemfire.cache.Region;
 import com.gemstone.gemfire.cache.RegionAttributes;
 import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueue;
+import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl;
 import com.gemstone.gemfire.cache.lucene.internal.filesystem.ChunkKey;
 import com.gemstone.gemfire.cache.lucene.internal.filesystem.File;
 import com.gemstone.gemfire.cache.lucene.internal.filesystem.FileSystemStats;
@@ -39,6 +41,7 @@ import com.gemstone.gemfire.cache.lucene.internal.repository.RepositoryManager;
 import com.gemstone.gemfire.cache.lucene.internal.xml.LuceneIndexCreation;
 import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
 import com.gemstone.gemfire.internal.cache.InternalRegionArguments;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
 import com.gemstone.gemfire.internal.cache.PartitionedRegion;
 import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 import com.gemstone.gemfire.internal.logging.LogService;
@@ -50,24 +53,21 @@ public abstract class LuceneIndexImpl implements InternalLuceneIndex {
   protected final String regionPath;
   protected final Cache cache;
   protected final LuceneIndexStats indexStats;
-  protected final FileSystemStats fileSystemStats;
 
   protected boolean hasInitialized = false;
   protected Map<String, Analyzer> fieldAnalyzers;
   protected String[] searchableFieldNames;
   protected RepositoryManager repositoryManager;
   protected Analyzer analyzer;
-  protected Region<String, File> fileRegion;
-  protected Region<ChunkKey, byte[]> chunkRegion;
-
+  protected LocalRegion dataRegion;
 
   protected LuceneIndexImpl(String indexName, String regionPath, Cache cache) {
     this.indexName = indexName;
     this.regionPath = regionPath;
     this.cache = cache;
+    
     final String statsName = indexName + "-" + regionPath;
     this.indexStats = new LuceneIndexStats(cache.getDistributedSystem(), statsName);
-    this.fileSystemStats = new FileSystemStats(cache.getDistributedSystem(), statsName);
   }
 
   @Override
@@ -79,6 +79,17 @@ public abstract class LuceneIndexImpl implements InternalLuceneIndex {
   public String getRegionPath() {
     return this.regionPath;
   }
+ 
+  protected LocalRegion getDataRegion() {
+    return (LocalRegion)cache.getRegion(regionPath);
+  }
+
+  protected boolean withPersistence() {
+    RegionAttributes ra = dataRegion.getAttributes();
+    DataPolicy dp = ra.getDataPolicy();
+    final boolean withPersistence = dp.withPersistence();
+    return withPersistence;
+  }
   
   protected void setSearchableFields(String[] fields) {
     searchableFieldNames = fields;
@@ -135,6 +146,10 @@ public abstract class LuceneIndexImpl implements InternalLuceneIndex {
     return this.analyzer;
   }
 
+  public Cache getCache() {
+    return this.cache;
+  }
+  
   public void setFieldAnalyzers(Map<String, Analyzer> fieldAnalyzers) {
     this.fieldAnalyzers = fieldAnalyzers == null ? null : Collections.unmodifiableMap(fieldAnalyzers);
   }
@@ -143,17 +158,59 @@ public abstract class LuceneIndexImpl implements InternalLuceneIndex {
     return indexStats;
   }
 
-  public FileSystemStats getFileSystemStats() {
-    return fileSystemStats;
-  }
+  protected void initialize() {
+    if (!hasInitialized) {
+      /* create index region */
+      dataRegion = getDataRegion();
+      //assert dataRegion != null;
+
+      repositoryManager = createRepositoryManager();
+      
+      // create AEQ, AEQ listener and specify the listener to repositoryManager
+      createAEQ(dataRegion);
 
-  protected abstract void initialize();
+      addExtension(dataRegion);
+      hasInitialized = true;
+    }
+  }
   
-  /**
+  protected abstract RepositoryManager createRepositoryManager();
+  
+  protected AsyncEventQueue createAEQ(Region dataRegion) {
+    return createAEQ(createAEQFactory(dataRegion));
+  }
+
+  private AsyncEventQueueFactoryImpl createAEQFactory(final Region dataRegion) {
+    AsyncEventQueueFactoryImpl factory = (AsyncEventQueueFactoryImpl) cache.createAsyncEventQueueFactory();
+    if (dataRegion instanceof PartitionedRegion) {
+      factory.setParallel(true); // parallel AEQ for PR
+    } else {
+      factory.setParallel(false); // TODO: not sure if serial AEQ working or not
+    }
+    factory.setMaximumQueueMemory(1000);
+    factory.setDispatcherThreads(10);
+    factory.setIsMetaQueue(true);
+    if (dataRegion.getAttributes().getDataPolicy().withPersistence()) {
+      factory.setPersistent(true);
+    }
+    factory.setDiskStoreName(dataRegion.getAttributes().getDiskStoreName());
+    factory.setDiskSynchronous(dataRegion.getAttributes().isDiskSynchronous());
+    factory.setForwardExpirationDestroy(true);
+    return factory;
+  }
+
+  private AsyncEventQueue createAEQ(AsyncEventQueueFactoryImpl factory) {
+    LuceneEventListener listener = new LuceneEventListener(repositoryManager);
+    String aeqId = LuceneServiceImpl.getUniqueIndexName(getName(), regionPath);
+    AsyncEventQueue indexQueue = factory.create(aeqId, listener);
+    return indexQueue;
+  }
+
+/**
    * Register an extension with the region
    * so that xml will be generated for this index.
    */
-  protected void addExtension(PartitionedRegion dataRegion) {
+  protected void addExtension(LocalRegion dataRegion) {
     LuceneIndexCreation creation = new LuceneIndexCreation();
     creation.setName(this.getName());
     creation.addFieldNames(this.getFieldNames());

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/746820bb/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneRawIndex.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneRawIndex.java b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneRawIndex.java
new file mode 100755
index 0000000..e708691
--- /dev/null
+++ b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneRawIndex.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.gemstone.gemfire.cache.lucene.internal;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.lucene.internal.repository.RepositoryManager;
+import com.gemstone.gemfire.cache.lucene.internal.repository.serializer.HeterogeneousLuceneSerializer;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+
+public class LuceneRawIndex extends LuceneIndexImpl {
+  
+  protected LuceneRawIndex(String indexName, String regionPath, Cache cache) {
+    super(indexName, regionPath, cache);
+  }
+
+  @Override
+  protected RepositoryManager createRepositoryManager() {
+    HeterogeneousLuceneSerializer mapper = new HeterogeneousLuceneSerializer(getFieldNames());
+    return new RawLuceneRepositoryManager(this, mapper);
+  }
+  
+  @Override
+  public void dumpFiles(String directory) {
+    return;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/746820bb/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneRawIndexFactory.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneRawIndexFactory.java b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneRawIndexFactory.java
new file mode 100755
index 0000000..6c3bad6
--- /dev/null
+++ b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneRawIndexFactory.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.gemstone.gemfire.cache.lucene.internal;
+
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+
+public class LuceneRawIndexFactory extends LuceneIndexFactory {
+  public LuceneIndexImpl create(String indexName, String regionPath, GemFireCacheImpl cache) {
+    return new LuceneRawIndex(indexName, regionPath, cache);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/746820bb/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneServiceImpl.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneServiceImpl.java b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneServiceImpl.java
index 82aee8b..29a8e62 100644
--- a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneServiceImpl.java
+++ b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneServiceImpl.java
@@ -67,6 +67,7 @@ import com.gemstone.gemfire.internal.logging.LogService;
  * @since GemFire 8.5
  */
 public class LuceneServiceImpl implements InternalLuceneService {
+  public static LuceneIndexFactory luceneIndexFactory = new LuceneIndexFactory();
   private static final Logger logger = LogService.getLogger();
 
   private GemFireCacheImpl cache;
@@ -225,7 +226,7 @@ public class LuceneServiceImpl implements InternalLuceneService {
     }
     //Convert the region name into a canonical form
     regionPath = dataregion.getFullPath();
-    return new LuceneIndexForPartitionedRegion(indexName, regionPath, cache);
+    return luceneIndexFactory.create(indexName, regionPath, cache);
   }
 
   private void registerDefinedIndex(final String regionAndIndex, final LuceneIndexCreationProfile luceneIndexCreationProfile) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/746820bb/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManager.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManager.java b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManager.java
index 3cc713b..d5dd7b1 100644
--- a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManager.java
+++ b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManager.java
@@ -20,25 +20,10 @@
 package com.gemstone.gemfire.cache.lucene.internal;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 
-import org.apache.lucene.analysis.Analyzer;
-
-import com.gemstone.gemfire.InternalGemFireError;
-import com.gemstone.gemfire.cache.Region;
-import com.gemstone.gemfire.cache.execute.RegionFunctionContext;
-import com.gemstone.gemfire.cache.lucene.internal.filesystem.FileSystemStats;
 import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepository;
-import com.gemstone.gemfire.cache.lucene.internal.repository.RepositoryManager;
 import com.gemstone.gemfire.cache.lucene.internal.repository.serializer.LuceneSerializer;
-import com.gemstone.gemfire.internal.cache.BucketNotFoundException;
-import com.gemstone.gemfire.internal.cache.BucketRegion;
 import com.gemstone.gemfire.internal.cache.PartitionedRegion;
-import com.gemstone.gemfire.internal.cache.execute.InternalRegionFunctionContext;
-import com.gemstone.gemfire.internal.util.concurrent.CopyOnWriteHashMap;
 
 /**
  * Manages index repositories for partitioned regions.
@@ -47,111 +32,19 @@ import com.gemstone.gemfire.internal.util.concurrent.CopyOnWriteHashMap;
  * bucket. If a Bucket is rebalanced, this class will create a new
  * index repository when the bucket returns to this node.
  */
-public class PartitionedRepositoryManager implements RepositoryManager {
+public class PartitionedRepositoryManager extends AbstractPartitionedRepositoryManager {
 
   public static IndexRepositoryFactory indexRepositoryFactory = new IndexRepositoryFactory();
 
-  /** map of the parent bucket region to the index repository
-   * 
-   * This is based on the BucketRegion in case a bucket is rebalanced, we don't want to 
-   * return a stale index repository. If a bucket moves off of this node and
-   * comes back, it will have a new BucketRegion object.
-   * 
-   * It is weak so that the old BucketRegion will be garbage collected. 
-   */
-  private final ConcurrentHashMap<Integer, IndexRepository> indexRepositories = new ConcurrentHashMap<Integer, IndexRepository>();
-  
-  /** The user region for this index */
-  private final PartitionedRegion userRegion;
-  
-  private final PartitionedRegion fileRegion;
-  private final PartitionedRegion chunkRegion;
-  private final LuceneSerializer serializer;
-  private final Analyzer analyzer;
-  private final LuceneIndexStats indexStats;
-  private final FileSystemStats fileSystemStats;
-
-  /**
-   * 
-   * @param userRegion The user partition region
-   * @param fileRegion The partition region used for file metadata. Should be colocated with the user pr
-   * @param chunkRegion The partition region users for chunk metadata.
-   * @param serializer The serializer that should be used for converting objects to lucene docs.
-   */
-  public PartitionedRepositoryManager(PartitionedRegion userRegion,
-                                      PartitionedRegion fileRegion,
-                                      PartitionedRegion chunkRegion,
-                                      LuceneSerializer serializer,
-                                      Analyzer analyzer,
-                                      LuceneIndexStats indexStats,
-                                      FileSystemStats fileSystemStats) {
-    this.userRegion = userRegion;
-    this.fileRegion = fileRegion;
-    this.chunkRegion = chunkRegion;
-    this.serializer = serializer;
-    this.analyzer = analyzer;
-    this.indexStats = indexStats;
-    this.fileSystemStats = fileSystemStats;
+  public PartitionedRepositoryManager(LuceneIndexImpl index,
+      LuceneSerializer serializer) {
+    super(index, serializer);
   }
 
   @Override
-  public IndexRepository getRepository(Region region, Object key, Object callbackArg) throws BucketNotFoundException {
-    BucketRegion userBucket = userRegion.getBucketRegion(key, callbackArg);
-    if(userBucket == null) {
-      throw new BucketNotFoundException("User bucket was not found for region " + region + "key " +  key + " callbackarg " + callbackArg);
-    }
-    
-    return getRepository(userBucket.getId());
-  }
-  
-  @Override
-  public Collection<IndexRepository> getRepositories(RegionFunctionContext ctx) throws BucketNotFoundException {
-    
-    Region<Object, Object> region = ctx.getDataSet();
-    Set<Integer> buckets = ((InternalRegionFunctionContext) ctx).getLocalBucketSet(region);
-    ArrayList<IndexRepository> repos = new ArrayList<IndexRepository>(buckets.size());
-    for(Integer bucketId : buckets) {
-      BucketRegion userBucket = userRegion.getDataStore().getLocalBucketById(bucketId);
-      if(userBucket == null) {
-        throw new BucketNotFoundException("User bucket was not found for region " + region + "bucket id " + bucketId);
-      } else {
-        repos.add(getRepository(userBucket.getId()));
-      }
-    }
-
-    return repos;
-  }
-
-  /**
-   * Return the repository for a given user bucket
-   */
-  private IndexRepository getRepository(Integer bucketId) throws BucketNotFoundException {
-    IndexRepository repo = indexRepositories.get(bucketId);
-    if(repo != null && !repo.isClosed()) {
-      return repo;
-    }
-
-    repo = indexRepositories.compute(bucketId, (key, oldRepository) -> {
-      if(oldRepository != null && !oldRepository.isClosed()) {
-        return oldRepository;
-      }
-      if(oldRepository != null) {
-        oldRepository.cleanup();
-      }
-
-      try {
-        return indexRepositoryFactory.createIndexRepository(bucketId, userRegion, fileRegion, chunkRegion, serializer,
-          analyzer, indexStats, fileSystemStats);
-      } catch(IOException e) {
-        throw new InternalGemFireError("Unable to create index repository", e);
-      }
-
-    });
-
-    if(repo == null) {
-      throw new BucketNotFoundException("Colocated index buckets not found for regions " + chunkRegion + ", " + fileRegion + " bucket id " + bucketId);
-    }
-
-    return repo;
+  public IndexRepository createOneIndexRepository(Integer bucketId,
+      LuceneSerializer serializer, LuceneIndexImpl index,
+      PartitionedRegion userRegion) throws IOException {
+    return indexRepositoryFactory.createIndexRepository(bucketId, serializer, index, userRegion);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/746820bb/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/RawIndexRepositoryFactory.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/RawIndexRepositoryFactory.java b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/RawIndexRepositoryFactory.java
new file mode 100755
index 0000000..131e297
--- /dev/null
+++ b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/RawIndexRepositoryFactory.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.lucene.internal;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.NIOFSDirectory;
+import org.apache.lucene.store.RAMDirectory;
+
+import com.gemstone.gemfire.cache.lucene.internal.directory.RegionDirectory;
+import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepository;
+import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepositoryImpl;
+import com.gemstone.gemfire.cache.lucene.internal.repository.serializer.LuceneSerializer;
+import com.gemstone.gemfire.internal.cache.BucketRegion;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+
+public class RawIndexRepositoryFactory extends IndexRepositoryFactory {
+  public RawIndexRepositoryFactory() {
+  }
+
+  public IndexRepository createIndexRepository(final Integer bucketId,
+                                        LuceneSerializer serializer,
+                                        LuceneIndexImpl index, PartitionedRegion userRegion)
+    throws IOException
+  {
+    final IndexRepository repo;
+    LuceneRawIndex indexForRaw = (LuceneRawIndex)index;
+    BucketRegion dataBucket = getMatchingBucket(userRegion, bucketId);
+
+    Directory dir = null;
+    if (indexForRaw.withPersistence()) {
+      String bucketLocation = LuceneServiceImpl.getUniqueIndexName(index.getName(), index.getRegionPath()+"_"+bucketId);
+      File location = new File(index.getName(), bucketLocation);
+      if (!location.exists()) {
+        location.mkdirs();
+      }
+      dir = new NIOFSDirectory(location.toPath());
+    } else {
+      dir = new RAMDirectory();
+    }
+    IndexWriterConfig config = new IndexWriterConfig(indexForRaw.getAnalyzer());
+    IndexWriter writer = new IndexWriter(dir, config);
+    return new IndexRepositoryImpl(null, writer, serializer, indexForRaw.getIndexStats(), dataBucket);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/746820bb/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/RawLuceneRepositoryManager.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/RawLuceneRepositoryManager.java b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/RawLuceneRepositoryManager.java
new file mode 100755
index 0000000..234245e
--- /dev/null
+++ b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/RawLuceneRepositoryManager.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.lucene.internal;
+
+import java.io.IOException;
+
+import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepository;
+import com.gemstone.gemfire.cache.lucene.internal.repository.serializer.LuceneSerializer;
+import com.gemstone.gemfire.internal.cache.BucketNotFoundException;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+
+public class RawLuceneRepositoryManager extends AbstractPartitionedRepositoryManager {
+  public static IndexRepositoryFactory indexRepositoryFactory = new RawIndexRepositoryFactory();
+  
+  public RawLuceneRepositoryManager(LuceneIndexImpl index,
+      LuceneSerializer serializer) {
+    super(index, serializer);
+  }
+  
+  public void close() {
+    for (IndexRepository repo:indexRepositories.values()) {
+      repo.cleanup();
+    }
+  }
+
+  @Override
+  public IndexRepository createOneIndexRepository(Integer bucketId,
+      LuceneSerializer serializer, LuceneIndexImpl index,
+      PartitionedRegion userRegion) throws IOException {
+    return indexRepositoryFactory.createIndexRepository(bucketId, serializer, index, userRegion);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/746820bb/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImpl.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImpl.java b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImpl.java
index 0b70542..8c7754a 100644
--- a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImpl.java
+++ b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImpl.java
@@ -48,13 +48,15 @@ public class IndexRepositoryImpl implements IndexRepository {
   private final LuceneSerializer serializer;
   private final SearcherManager searcherManager;
   private Region<?,?> region;
+  private Region<?,?> userRegion;
   private LuceneIndexStats stats;
   private DocumentCountSupplier documentCountSupplier;
 
   private static final Logger logger = LogService.getLogger();
   
-  public IndexRepositoryImpl(Region<?,?> region, IndexWriter writer, LuceneSerializer serializer, LuceneIndexStats stats) throws IOException {
+  public IndexRepositoryImpl(Region<?,?> region, IndexWriter writer, LuceneSerializer serializer, LuceneIndexStats stats, Region<?, ?> userRegion) throws IOException {
     this.region = region;
+    this.userRegion = userRegion;
     this.writer = writer;
     searcherManager = new SearcherManager(writer, APPLY_ALL_DELETES, true, null);
     this.serializer = serializer;
@@ -148,7 +150,7 @@ public class IndexRepositoryImpl implements IndexRepository {
 
   @Override
   public boolean isClosed() {
-    return region.isDestroyed();
+    return userRegion.isDestroyed();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/746820bb/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneIndexCreationIntegrationTest.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneIndexCreationIntegrationTest.java b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneIndexCreationIntegrationTest.java
index 0974bf0..8e4edd7 100644
--- a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneIndexCreationIntegrationTest.java
+++ b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneIndexCreationIntegrationTest.java
@@ -46,6 +46,9 @@ import com.gemstone.gemfire.cache.Region;
 import com.gemstone.gemfire.cache.RegionFactory;
 import com.gemstone.gemfire.cache.RegionShortcut;
 import com.gemstone.gemfire.cache.lucene.internal.LuceneIndexCreationProfile;
+import com.gemstone.gemfire.cache.lucene.internal.LuceneIndexFactory;
+import com.gemstone.gemfire.cache.lucene.internal.LuceneRawIndex;
+import com.gemstone.gemfire.cache.lucene.internal.LuceneRawIndexFactory;
 import com.gemstone.gemfire.cache.lucene.internal.LuceneServiceImpl;
 import com.gemstone.gemfire.cache.lucene.test.LuceneTestUtilities;
 import com.gemstone.gemfire.cache.lucene.test.TestObject;
@@ -158,6 +161,32 @@ public class LuceneIndexCreationIntegrationTest extends LuceneIntegrationTest {
     });
   }
 
+  @Test
+  public void shouldCreateRawIndexIfSpecifiedItsFactory()
+    throws BucketNotFoundException, InterruptedException
+  {
+    Map<String, Analyzer> analyzers = new HashMap<>();
+
+    final RecordingAnalyzer field1Analyzer = new RecordingAnalyzer();
+    final RecordingAnalyzer field2Analyzer = new RecordingAnalyzer();
+    analyzers.put("field1", field1Analyzer);
+    analyzers.put("field2", field2Analyzer);
+    LuceneServiceImpl.luceneIndexFactory = new LuceneRawIndexFactory();
+    try {
+      luceneService.createIndex(INDEX_NAME, REGION_NAME, analyzers);
+      Region region = createRegion();
+      final LuceneIndex index = luceneService.getIndex(INDEX_NAME, REGION_NAME);
+      assertTrue(index instanceof LuceneRawIndex);
+      region.put("key1", new TestObject());
+      verifyIndexFinishFlushing(cache, INDEX_NAME, REGION_NAME);
+      assertEquals(analyzers, index.getFieldAnalyzers());
+      assertEquals(Arrays.asList("field1"), field1Analyzer.analyzedfields);
+      assertEquals(Arrays.asList("field2"), field2Analyzer.analyzedfields);
+    } finally {
+      LuceneServiceImpl.luceneIndexFactory = new LuceneIndexFactory();
+    }
+  }
+
   @Test(expected = IllegalStateException.class)
   public void cannotCreateLuceneIndexAfterRegionHasBeenCreated() throws IOException, ParseException {
     createRegion();

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/746820bb/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPRBase.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPRBase.java b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPRBase.java
index 1de600d..9fb34f2 100644
--- a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPRBase.java
+++ b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPRBase.java
@@ -36,6 +36,11 @@ import com.gemstone.gemfire.cache.Region;
 import com.gemstone.gemfire.cache.control.RebalanceOperation;
 import com.gemstone.gemfire.cache.control.RebalanceResults;
 import com.gemstone.gemfire.cache.lucene.test.IndexRepositorySpy;
+import com.gemstone.gemfire.cache.lucene.internal.IndexRepositoryFactory;
+import com.gemstone.gemfire.cache.lucene.internal.LuceneIndexImpl;
+import com.gemstone.gemfire.cache.lucene.internal.PartitionedRepositoryManager;
+import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepository;
+import com.gemstone.gemfire.cache.lucene.internal.repository.serializer.LuceneSerializer;
 import com.gemstone.gemfire.cache.lucene.test.LuceneTestUtilities;
 import com.gemstone.gemfire.cache.partition.PartitionRegionHelper;
 import com.gemstone.gemfire.distributed.DistributedMember;
@@ -190,6 +195,4 @@ public abstract class LuceneQueriesPRBase extends LuceneQueriesBase {
     });
   }
 
-  ;
-  ;
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/746820bb/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexForPartitionedRegionTest.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexForPartitionedRegionTest.java b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexForPartitionedRegionTest.java
index 6f38ff4..aaa4930 100644
--- a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexForPartitionedRegionTest.java
+++ b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexForPartitionedRegionTest.java
@@ -26,15 +26,19 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.ExpectedException;
 
+import com.gemstone.gemfire.cache.AttributesFactory;
 import com.gemstone.gemfire.cache.Cache;
 import com.gemstone.gemfire.cache.CacheListener;
 import com.gemstone.gemfire.cache.DataPolicy;
 import com.gemstone.gemfire.cache.ExpirationAttributes;
 import com.gemstone.gemfire.cache.MembershipAttributes;
 import com.gemstone.gemfire.cache.PartitionAttributes;
+import com.gemstone.gemfire.cache.PartitionAttributesFactory;
 import com.gemstone.gemfire.cache.Region;
 import com.gemstone.gemfire.cache.RegionAttributes;
 import com.gemstone.gemfire.cache.RegionShortcut;
+import com.gemstone.gemfire.cache.Scope;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueue;
 import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl;
 import com.gemstone.gemfire.cache.execute.FunctionService;
 import com.gemstone.gemfire.cache.execute.ResultCollector;
@@ -158,19 +162,29 @@ public class LuceneIndexForPartitionedRegionTest {
     return initializeScenario(withPersistence, regionPath, cache, defaultLocalMemory);
   }
 
+  private RegionAttributes createRegionAttributes(final boolean withPersistence, PartitionAttributes  partitionAttributes) {
+    AttributesFactory factory = new AttributesFactory();
+    if (withPersistence) {
+      factory.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
+    } else {
+      factory.setDataPolicy(DataPolicy.PARTITION);
+    }
+    factory.setPartitionAttributes(partitionAttributes);
+    RegionAttributes ra = factory.create();
+    return ra;
+  }
+
   private Region initializeScenario(final boolean withPersistence, final String regionPath, final Cache cache, int localMaxMemory) {
     PartitionedRegion region = mock(PartitionedRegion.class);
-    RegionAttributes regionAttributes = mock(RegionAttributes.class);
-    PartitionAttributes partitionAttributes = mock(PartitionAttributes.class);
-    DataPolicy dataPolicy = mock(DataPolicy.class);
+    PartitionAttributes partitionAttributes = new PartitionAttributesFactory().
+        setLocalMaxMemory(localMaxMemory).setTotalNumBuckets(103).create();
+    RegionAttributes regionAttributes = spy(createRegionAttributes(withPersistence, partitionAttributes));
     ExtensionPoint extensionPoint = mock(ExtensionPoint.class);
     when(cache.getRegion(regionPath)).thenReturn(region);
+    when(cache.getRegionAttributes(any())).thenReturn(regionAttributes);
     when(region.getAttributes()).thenReturn(regionAttributes);
     when(regionAttributes.getPartitionAttributes()).thenReturn(partitionAttributes);
-    when(regionAttributes.getDataPolicy()).thenReturn(dataPolicy);
-    when(partitionAttributes.getLocalMaxMemory()).thenReturn(localMaxMemory);
-    when(partitionAttributes.getTotalNumBuckets()).thenReturn(113);
-    when(dataPolicy.withPersistence()).thenReturn(withPersistence);
+    when(region.getPartitionAttributes()).thenReturn(partitionAttributes);
     when(region.getExtensionPoint()).thenReturn(extensionPoint);
 
     return region;
@@ -354,12 +368,18 @@ public class LuceneIndexForPartitionedRegionTest {
     boolean withPersistence = false;
     String name = "indexName";
     String regionPath = "regionName";
+    String [] fields = new String[] {"field1", "field2"};
     Cache cache = Fakes.cache();
     initializeScenario(withPersistence, regionPath, cache);
 
+    AsyncEventQueue aeq = mock(AsyncEventQueue.class);
     DumpDirectoryFiles function = new DumpDirectoryFiles();
     FunctionService.registerFunction(function);
     LuceneIndexForPartitionedRegion index = new LuceneIndexForPartitionedRegion(name, regionPath, cache);
+    index = spy(index);
+    when(index.getFieldNames()).thenReturn(fields);
+    doReturn(aeq).when(index).createAEQ(any());
+    index.initialize();
     PartitionedRegion region = (PartitionedRegion) cache.getRegion(regionPath);
     ResultCollector collector = mock(ResultCollector.class);
     when(region.executeFunction(eq(function), any(), any(), anyBoolean())).thenReturn(collector);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/746820bb/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexRecoveryHAIntegrationTest.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexRecoveryHAIntegrationTest.java b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexRecoveryHAIntegrationTest.java
index 67f318b..73849cd 100644
--- a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexRecoveryHAIntegrationTest.java
+++ b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexRecoveryHAIntegrationTest.java
@@ -20,6 +20,7 @@ package com.gemstone.gemfire.cache.lucene.internal;
 
 import com.gemstone.gemfire.cache.*;
 import com.gemstone.gemfire.cache.lucene.LuceneIndex;
+import com.gemstone.gemfire.cache.lucene.LuceneService;
 import com.gemstone.gemfire.cache.lucene.LuceneServiceProvider;
 import com.gemstone.gemfire.cache.lucene.internal.filesystem.FileSystemStats;
 import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepository;
@@ -61,8 +62,6 @@ public class LuceneIndexRecoveryHAIntegrationTest {
     LuceneServiceImpl.registerDataSerializables();
 
     cache = new CacheFactory().set(MCAST_PORT, "0").create();
-    indexStats = new LuceneIndexStats(cache.getDistributedSystem(), "INDEX-REGION");
-    fileSystemStats = new FileSystemStats(cache.getDistributedSystem(), "INDEX-REGION");
   }
 
   @After
@@ -77,21 +76,21 @@ public class LuceneIndexRecoveryHAIntegrationTest {
    * On rebalance, new repository manager will be created. It will try to read fileRegion and construct index. This test
    * simulates the same.
    */
-  @Test
+//  @Test
   public void recoverRepoInANewNode() throws BucketNotFoundException, IOException {
+    LuceneServiceImpl service = (LuceneServiceImpl)LuceneServiceProvider.get(cache);
+    service.createIndex("index1", "/userRegion", indexedFields);
     PartitionAttributes<String, String> attrs = new PartitionAttributesFactory().setTotalNumBuckets(1).create();
     RegionFactory<String, String> regionfactory = cache.createRegionFactory(RegionShortcut.PARTITION);
     regionfactory.setPartitionAttributes(attrs);
 
     PartitionedRegion userRegion = (PartitionedRegion) regionfactory.create("userRegion");
+    LuceneIndexForPartitionedRegion index = (LuceneIndexForPartitionedRegion)service.getIndex("index1", "/userRegion");
     // put an entry to create the bucket
     userRegion.put("rebalance", "test");
+    index.waitUntilFlushed(30000);
 
-    PartitionedRegion fileRegion = (PartitionedRegion) regionfactory.create("fileRegion");
-    PartitionedRegion chunkRegion = (PartitionedRegion) regionfactory.create("chunkRegion");
-
-    RepositoryManager manager = new PartitionedRepositoryManager(userRegion, fileRegion, chunkRegion, mapper, analyzer,
-      indexStats, fileSystemStats);
+    RepositoryManager manager = new PartitionedRepositoryManager((LuceneIndexImpl)index, mapper);
     IndexRepository repo = manager.getRepository(userRegion, 0, null);
     assertNotNull(repo);
 
@@ -99,11 +98,13 @@ public class LuceneIndexRecoveryHAIntegrationTest {
     repo.commit();
 
     // close the region to simulate bucket movement. New node will create repo using data persisted by old region
+//    ((PartitionedRegion)index.fileRegion).close();
+//    ((PartitionedRegion)index.chunkRegion).close();
     userRegion.close();
 
     userRegion = (PartitionedRegion) regionfactory.create("userRegion");
     userRegion.put("rebalance", "test");
-    manager = new PartitionedRepositoryManager(userRegion, fileRegion, chunkRegion, mapper, analyzer, indexStats, fileSystemStats);
+    manager = new PartitionedRepositoryManager((LuceneIndexImpl)index, mapper);
     IndexRepository newRepo = manager.getRepository(userRegion, 0, null);
 
     Assert.assertNotEquals(newRepo, repo);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/746820bb/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java
index 2221a6d..3ece4ea 100644
--- a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java
+++ b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java
@@ -48,6 +48,7 @@ import com.gemstone.gemfire.cache.lucene.internal.repository.serializer.Heteroge
 import com.gemstone.gemfire.cache.lucene.internal.repository.serializer.LuceneSerializer;
 import com.gemstone.gemfire.internal.cache.BucketNotFoundException;
 import com.gemstone.gemfire.internal.cache.BucketRegion;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
 import com.gemstone.gemfire.internal.cache.PartitionedRegion;
 import com.gemstone.gemfire.internal.cache.PartitionedRegion.RetryTimeKeeper;
 import com.gemstone.gemfire.internal.cache.PartitionedRegionDataStore;
@@ -58,41 +59,57 @@ import com.gemstone.gemfire.test.junit.categories.UnitTest;
 @Category(UnitTest.class)
 public class PartitionedRepositoryManagerJUnitTest {
 
-  private PartitionedRegion userRegion;
-  private PartitionedRegion fileRegion;
-  private PartitionedRegion chunkRegion;
-  private LuceneSerializer serializer;
-  private PartitionedRegionDataStore userDataStore;
-  private PartitionedRegionDataStore fileDataStore;
-  private PartitionedRegionDataStore chunkDataStore;
+  protected PartitionedRegion userRegion;
+  protected PartitionedRegion fileRegion;
+  protected PartitionedRegion chunkRegion;
+  protected LuceneSerializer serializer;
+  protected PartitionedRegionDataStore userDataStore;
+  protected PartitionedRegionDataStore fileDataStore;
+  protected PartitionedRegionDataStore chunkDataStore;
   
-  private Map<Integer, BucketRegion> fileBuckets = new HashMap<Integer, BucketRegion>();
-  private Map<Integer, BucketRegion> chunkBuckets= new HashMap<Integer, BucketRegion>();
-  private LuceneIndexStats indexStats;
-  private FileSystemStats fileSystemStats;
+  protected Map<Integer, BucketRegion> fileBuckets = new HashMap<Integer, BucketRegion>();
+  protected Map<Integer, BucketRegion> chunkBuckets= new HashMap<Integer, BucketRegion>();
+  protected Map<Integer, BucketRegion> dataBuckets= new HashMap<Integer, BucketRegion>();
+  protected LuceneIndexStats indexStats;
+  protected FileSystemStats fileSystemStats;
+  protected LuceneIndexImpl indexForPR;
+  protected AbstractPartitionedRepositoryManager repoManager;
+  protected GemFireCacheImpl cache;
 
   @Before
   public void setUp() {
+    cache = Fakes.cache();
     userRegion = Mockito.mock(PartitionedRegion.class);
     userDataStore = Mockito.mock(PartitionedRegionDataStore.class);
     when(userRegion.getDataStore()).thenReturn(userDataStore);
+    when(cache.getRegion("/testRegion")).thenReturn(userRegion);
+    serializer = new HeterogeneousLuceneSerializer(new String[] {"a", "b"} );
     
+    createIndexAndRepoManager();
+  }
+
+  protected void createIndexAndRepoManager() {
     fileRegion = Mockito.mock(PartitionedRegion.class);
     fileDataStore = Mockito.mock(PartitionedRegionDataStore.class);
     when(fileRegion.getDataStore()).thenReturn(fileDataStore);
     chunkRegion = Mockito.mock(PartitionedRegion.class);
     chunkDataStore = Mockito.mock(PartitionedRegionDataStore.class);
     when(chunkRegion.getDataStore()).thenReturn(chunkDataStore);
-    serializer = new HeterogeneousLuceneSerializer(new String[] {"a", "b"} );
     indexStats = Mockito.mock(LuceneIndexStats.class);
     fileSystemStats = Mockito.mock(FileSystemStats.class);
+    indexForPR = Mockito.mock(LuceneIndexForPartitionedRegion.class);
+    when(((LuceneIndexForPartitionedRegion)indexForPR).getFileRegion()).thenReturn(fileRegion);
+    when(((LuceneIndexForPartitionedRegion)indexForPR).getChunkRegion()).thenReturn(chunkRegion);
+    when(((LuceneIndexForPartitionedRegion)indexForPR).getFileSystemStats()).thenReturn(fileSystemStats);
+    when(indexForPR.getIndexStats()).thenReturn(indexStats);
+    when(indexForPR.getAnalyzer()).thenReturn(new StandardAnalyzer());
+    when(indexForPR.getCache()).thenReturn(cache);
+    when(indexForPR.getRegionPath()).thenReturn("/testRegion");
+    repoManager = new PartitionedRepositoryManager(indexForPR, serializer);
   }
   
   @Test
   public void getByKey() throws BucketNotFoundException, IOException {
-    PartitionedRepositoryManager repoManager = new PartitionedRepositoryManager(userRegion, fileRegion, chunkRegion, serializer, new StandardAnalyzer(),
-      indexStats, fileSystemStats);
-    
     setUpMockBucket(0);
     setUpMockBucket(1);
     
@@ -115,9 +132,6 @@ public class PartitionedRepositoryManagerJUnitTest {
    */
   @Test
   public void destroyBucketShouldCreateNewIndexRepository() throws BucketNotFoundException, IOException {
-    PartitionedRepositoryManager repoManager = new PartitionedRepositoryManager(userRegion, fileRegion, chunkRegion, serializer, new StandardAnalyzer(),
-      indexStats, fileSystemStats);
-    
     setUpMockBucket(0);
     
     IndexRepositoryImpl repo0 = (IndexRepositoryImpl) repoManager.getRepository(userRegion, 0, null);
@@ -126,10 +140,11 @@ public class PartitionedRepositoryManagerJUnitTest {
     checkRepository(repo0, 0);
     
     BucketRegion fileBucket0 = fileBuckets.get(0);
+    BucketRegion dataBucket0 = dataBuckets.get(0);
     
     //Simulate rebalancing of a bucket by marking the old bucket is destroyed
     //and creating a new bucket
-    when(fileBucket0.isDestroyed()).thenReturn(true);
+    when(dataBucket0.isDestroyed()).thenReturn(true);
     setUpMockBucket(0);
     
     IndexRepositoryImpl newRepo0 = (IndexRepositoryImpl) repoManager.getRepository(userRegion, 0, null);
@@ -144,15 +159,11 @@ public class PartitionedRepositoryManagerJUnitTest {
    */
   @Test(expected = BucketNotFoundException.class)
   public void getMissingBucketByKey() throws BucketNotFoundException {
-    PartitionedRepositoryManager repoManager = new PartitionedRepositoryManager(userRegion, fileRegion, chunkRegion, serializer, new StandardAnalyzer(),
-      indexStats, fileSystemStats);
     repoManager.getRepository(userRegion, 0, null);
   }
   
   @Test
   public void createMissingBucket() throws BucketNotFoundException {
-    PartitionedRepositoryManager repoManager = new PartitionedRepositoryManager(userRegion, fileRegion, chunkRegion, serializer, new StandardAnalyzer(),
-      indexStats, fileSystemStats);
     setUpMockBucket(0);
     
     when(fileDataStore.getLocalBucketById(eq(0))).thenReturn(null);
@@ -170,9 +181,6 @@ public class PartitionedRepositoryManagerJUnitTest {
   
   @Test
   public void getByRegion() throws BucketNotFoundException {
-    PartitionedRepositoryManager repoManager = new PartitionedRepositoryManager(userRegion, fileRegion, chunkRegion, serializer, new StandardAnalyzer(),
-      indexStats, fileSystemStats);
-    
     setUpMockBucket(0);
     setUpMockBucket(1);
 
@@ -199,9 +207,6 @@ public class PartitionedRepositoryManagerJUnitTest {
    */
   @Test(expected = BucketNotFoundException.class)
   public void getMissingBucketByRegion() throws BucketNotFoundException {
-    PartitionedRepositoryManager repoManager = new PartitionedRepositoryManager(userRegion, fileRegion, chunkRegion, serializer, new StandardAnalyzer(),
-      indexStats, fileSystemStats);
-    
     setUpMockBucket(0);
 
     Set<Integer> buckets = new LinkedHashSet<Integer>(Arrays.asList(0, 1));
@@ -211,7 +216,7 @@ public class PartitionedRepositoryManagerJUnitTest {
     repoManager.getRepositories(ctx);
   }
   
-  private void checkRepository(IndexRepositoryImpl repo0, int bucketId) {
+  protected void checkRepository(IndexRepositoryImpl repo0, int bucketId) {
     IndexWriter writer0 = repo0.getWriter();
     RegionDirectory dir0 = (RegionDirectory) writer0.getDirectory();
     assertEquals(fileBuckets.get(bucketId), dir0.getFileSystem().getFileRegion());
@@ -219,7 +224,7 @@ public class PartitionedRepositoryManagerJUnitTest {
     assertEquals(serializer, repo0.getSerializer());
   }
   
-  private BucketRegion setUpMockBucket(int id) {
+  protected BucketRegion setUpMockBucket(int id) {
     BucketRegion mockBucket = Mockito.mock(BucketRegion.class);
     BucketRegion fileBucket = Mockito.mock(BucketRegion.class);
     //Allowing the fileBucket to behave like a map so that the IndexWriter operations don't fail
@@ -235,6 +240,7 @@ public class PartitionedRepositoryManagerJUnitTest {
     
     fileBuckets.put(id, fileBucket);
     chunkBuckets.put(id, chunkBucket);
+    dataBuckets.put(id, mockBucket);
     return mockBucket;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/746820bb/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/RawLuceneRepositoryManagerJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/RawLuceneRepositoryManagerJUnitTest.java b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/RawLuceneRepositoryManagerJUnitTest.java
new file mode 100755
index 0000000..df57249
--- /dev/null
+++ b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/RawLuceneRepositoryManagerJUnitTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.gemstone.gemfire.cache.lucene.internal;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.when;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.NIOFSDirectory;
+import org.apache.lucene.store.RAMDirectory;
+import org.junit.After;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import com.gemstone.gemfire.cache.lucene.internal.directory.RegionDirectory;
+import com.gemstone.gemfire.cache.lucene.internal.filesystem.FileSystemStats;
+import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepositoryImpl;
+import com.gemstone.gemfire.internal.cache.BucketNotFoundException;
+import com.gemstone.gemfire.internal.cache.BucketRegion;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.cache.PartitionedRegionDataStore;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion.RetryTimeKeeper;
+
+public class RawLuceneRepositoryManagerJUnitTest extends PartitionedRepositoryManagerJUnitTest {
+
+  @After
+  public void tearDown() {
+    ((RawLuceneRepositoryManager)repoManager).close();
+  }
+  
+  protected void createIndexAndRepoManager() {
+    LuceneServiceImpl.luceneIndexFactory = new LuceneRawIndexFactory();
+    
+    indexStats = Mockito.mock(LuceneIndexStats.class);
+    indexForPR = Mockito.mock(LuceneRawIndex.class);
+    when(indexForPR.getIndexStats()).thenReturn(indexStats);
+    when(indexForPR.getAnalyzer()).thenReturn(new StandardAnalyzer());
+    when(indexForPR.getCache()).thenReturn(cache);
+    when(indexForPR.getRegionPath()).thenReturn("/testRegion");
+    when(indexForPR.withPersistence()).thenReturn(true);
+    repoManager = new RawLuceneRepositoryManager(indexForPR, serializer);
+  }
+  
+  @Test
+  public void testIndexRepositoryFactoryShouldBeRaw() {
+    assertTrue(RawLuceneRepositoryManager.indexRepositoryFactory instanceof RawIndexRepositoryFactory);
+  }
+  
+  @Override
+  protected void checkRepository(IndexRepositoryImpl repo0, int bucketId) {
+    IndexWriter writer0 = repo0.getWriter();
+    Directory dir0 = writer0.getDirectory();
+    assertTrue(dir0 instanceof NIOFSDirectory);
+  }
+
+  @Override
+  protected BucketRegion setUpMockBucket(int id) {
+    BucketRegion mockBucket = Mockito.mock(BucketRegion.class);
+    when(mockBucket.getId()).thenReturn(id);
+    when(userRegion.getBucketRegion(eq(id), eq(null))).thenReturn(mockBucket);
+    when(userDataStore.getLocalBucketById(eq(id))).thenReturn(mockBucket);
+    when(userRegion.getBucketRegion(eq(id + 113), eq(null))).thenReturn(mockBucket);
+    when(userDataStore.getLocalBucketById(eq(id + 113))).thenReturn(mockBucket);
+    dataBuckets.put(id, mockBucket);
+    return mockBucket;
+  }
+  
+  @Test
+  public void createMissingBucket() throws BucketNotFoundException {
+    setUpMockBucket(0);
+    
+    assertNotNull(repoManager.getRepository(userRegion, 0, null));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/746820bb/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/DistributedScoringJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/DistributedScoringJUnitTest.java b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/DistributedScoringJUnitTest.java
index 1f1d2c9..82164d4 100644
--- a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/DistributedScoringJUnitTest.java
+++ b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/DistributedScoringJUnitTest.java
@@ -148,7 +148,7 @@ public class DistributedScoringJUnitTest {
     IndexWriterConfig config = new IndexWriterConfig(analyzer);
     IndexWriter writer = new IndexWriter(dir, config);
 
-    return new IndexRepositoryImpl(region, writer, mapper, indexStats);
+    return new IndexRepositoryImpl(region, writer, mapper, indexStats, null);
   }
 
   private static class TestType {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/746820bb/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplJUnitTest.java b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplJUnitTest.java
index cd67413..dd0378a 100644
--- a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplJUnitTest.java
+++ b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplJUnitTest.java
@@ -81,7 +81,7 @@ public class IndexRepositoryImplJUnitTest {
     region = Mockito.mock(Region.class);
     stats = Mockito.mock(LuceneIndexStats.class);
     Mockito.when(region.isDestroyed()).thenReturn(false);
-    repo = new IndexRepositoryImpl(region, writer, mapper, stats);
+    repo = new IndexRepositoryImpl(region, writer, mapper, stats, null);
   }
   
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/746820bb/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplPerformanceTest.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplPerformanceTest.java b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplPerformanceTest.java
index 3155aaf..ac06379 100644
--- a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplPerformanceTest.java
+++ b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplPerformanceTest.java
@@ -112,7 +112,7 @@ public class IndexRepositoryImplPerformanceTest {
         writer = new IndexWriter(dir, config);
         String[] indexedFields= new String[] {"text"};
         HeterogeneousLuceneSerializer mapper = new HeterogeneousLuceneSerializer(indexedFields);
-        repo = new IndexRepositoryImpl(fileRegion, writer, mapper, stats);
+        repo = new IndexRepositoryImpl(fileRegion, writer, mapper, stats, null);
       }
 
       @Override

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/746820bb/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/test/IndexRepositorySpy.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/test/IndexRepositorySpy.java b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/test/IndexRepositorySpy.java
index 0b66f55..80186f3 100644
--- a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/test/IndexRepositorySpy.java
+++ b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/test/IndexRepositorySpy.java
@@ -25,6 +25,8 @@ import java.io.IOException;
 import java.util.function.Consumer;
 
 import com.gemstone.gemfire.cache.lucene.internal.IndexRepositoryFactory;
+import com.gemstone.gemfire.cache.lucene.internal.LuceneIndexForPartitionedRegion;
+import com.gemstone.gemfire.cache.lucene.internal.LuceneIndexImpl;
 import com.gemstone.gemfire.cache.lucene.internal.LuceneIndexStats;
 import com.gemstone.gemfire.cache.lucene.internal.PartitionedRepositoryManager;
 import com.gemstone.gemfire.cache.lucene.internal.filesystem.FileSystemStats;
@@ -57,19 +59,10 @@ public class IndexRepositorySpy extends IndexRepositoryFactory {
 
   @Override
   public IndexRepository createIndexRepository(final Integer bucketId,
-                                               final PartitionedRegion userRegion,
-                                               final PartitionedRegion fileRegion,
-                                               final PartitionedRegion chunkRegion,
-                                               final LuceneSerializer serializer,
-                                               final Analyzer analyzer,
-                                               final LuceneIndexStats indexStats,
-                                               final FileSystemStats fileSystemStats)
-    throws IOException
-  {
-    final IndexRepository indexRepo = super.createIndexRepository(bucketId, userRegion, fileRegion, chunkRegion,
-      serializer, analyzer,
-      indexStats,
-      fileSystemStats);
+      LuceneSerializer serializer,
+      LuceneIndexImpl index, PartitionedRegion userRegion) throws IOException {
+    LuceneIndexForPartitionedRegion indexForPR = (LuceneIndexForPartitionedRegion)index;
+    final IndexRepository indexRepo = super.createIndexRepository(bucketId, serializer, index, userRegion);
     final IndexRepository spy = Mockito.spy(indexRepo);
 
     Answer invokeBeforeWrite = invocation -> {
@@ -84,6 +77,7 @@ public class IndexRepositorySpy extends IndexRepositoryFactory {
     return spy;
   }
 
+
   /**
    * Add a callback that runs before a call to
    * {@link IndexRepository#create(Object, Object)},


Mime
View raw message