geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kl...@apache.org
Subject [26/47] incubator-geode git commit: GEODE-356: Fix unexpected UNDEFINED in OQL query result set
Date Fri, 08 Jan 2016 18:42:27 GMT
GEODE-356: Fix unexpected UNDEFINED in OQL query result set


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/b6a89ad8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/b6a89ad8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/b6a89ad8

Branch: refs/heads/feature/GEODE-714
Commit: b6a89ad85376aafe16e47b2c3d0428cdff9c38d5
Parents: fb38186
Author: Jianxia Chen <jchen@pivotal.io>
Authored: Wed Jan 6 11:01:06 2016 -0800
Committer: Jianxia Chen <jchen@pivotal.io>
Committed: Wed Jan 6 11:01:06 2016 -0800

----------------------------------------------------------------------
 .../query/internal/index/CompactRangeIndex.java |   8 +-
 .../query/internal/index/MemoryIndexStore.java  |  33 +-
 .../functional/IndexOnEntrySetJUnitTest.java    | 335 +++++++++++++++++++
 3 files changed, 372 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b6a89ad8/gemfire-core/src/main/java/com/gemstone/gemfire/cache/query/internal/index/CompactRangeIndex.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/query/internal/index/CompactRangeIndex.java
b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/query/internal/index/CompactRangeIndex.java
index 41e2ea4..79d0c54 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/query/internal/index/CompactRangeIndex.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/query/internal/index/CompactRangeIndex.java
@@ -16,7 +16,6 @@
  */
 
 package com.gemstone.gemfire.cache.query.internal.index;
-import  com.gemstone.gemfire.internal.cache.CachedDeserializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
@@ -28,6 +27,7 @@ import java.util.Set;
 
 import com.gemstone.gemfire.cache.Cache;
 import com.gemstone.gemfire.cache.CacheException;
+import com.gemstone.gemfire.cache.EntryDestroyedException;
 import com.gemstone.gemfire.cache.Region;
 import com.gemstone.gemfire.cache.query.AmbiguousNameException;
 import com.gemstone.gemfire.cache.query.FunctionDomainException;
@@ -908,6 +908,9 @@ public class CompactRangeIndex extends AbstractIndex {
                 result.add(new CqEntry(indexEntry.getDeserializedRegionKey(),
                     value));
               } else {
+                if (IndexManager.testHook != null) {
+                  IndexManager.testHook.hook(200);
+                }
                 applyProjection(projAttrib, context, result, value,
                     intermediateResults, isIntersection);
               }
@@ -921,6 +924,9 @@ public class CompactRangeIndex extends AbstractIndex {
       } catch (ClassCastException e) {
         
       }
+      catch(EntryDestroyedException e) {
+        //ignore it
+      }
     }
   }
   

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b6a89ad8/gemfire-core/src/main/java/com/gemstone/gemfire/cache/query/internal/index/MemoryIndexStore.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/query/internal/index/MemoryIndexStore.java
b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/query/internal/index/MemoryIndexStore.java
index 966d82a..41de5ea 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/query/internal/index/MemoryIndexStore.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/query/internal/index/MemoryIndexStore.java
@@ -503,7 +503,7 @@ public class MemoryIndexStore implements IndexStore {
     } else if (indexOnRegionKeys) {
       return entry.getKey();
     }
-    return ((LocalRegion) this.region).new NonTXEntry(entry);
+    return new CachedEntryWrapper(((LocalRegion) this.region).new NonTXEntry(entry));
   }
 
   public Object getTargetObjectInVM(RegionEntry entry) {
@@ -588,6 +588,7 @@ public class MemoryIndexStore implements IndexStore {
     protected Iterator<Map.Entry> mapIterator;
     protected Iterator valuesIterator;
     protected Object currKey;
+    protected Object currValue; //RegionEntry
     final long iteratorStartTime = GemFireCacheImpl.getInstance().cacheTimeMillis();
     protected MemoryIndexStoreEntry currentEntry = new MemoryIndexStoreEntry(iteratorStartTime);
     
@@ -640,8 +641,8 @@ public class MemoryIndexStore implements IndexStore {
         if (values instanceof Collection) {
           this.valuesIterator = ((Collection) values).iterator();
         } else {
-          currentEntry.setMemoryIndexStoreEntry(currKey, (RegionEntry) values);
           this.valuesIterator = null;
+          currValue = values;
         }
         return values != null &&
                 (values instanceof RegionEntry
@@ -659,6 +660,7 @@ public class MemoryIndexStore implements IndexStore {
      */
     public MemoryIndexStoreEntry next() {
       if (valuesIterator == null) {
+        currentEntry.setMemoryIndexStoreEntry(currKey, (RegionEntry) currValue);
         return currentEntry;
       }
 
@@ -728,7 +730,7 @@ public class MemoryIndexStore implements IndexStore {
     private RegionEntry regionEntry;
     private boolean updateInProgress;
     private Object value;
-    private long iteratorStartTime;
+    private long iteratorStartTime;    
 
     private MemoryIndexStoreEntry(long iteratorStartTime) {
     	this.iteratorStartTime = iteratorStartTime;
@@ -770,5 +772,30 @@ public class MemoryIndexStore implements IndexStore {
           ||  IndexManager.needsRecalculation(iteratorStartTime, regionEntry.getLastModified());
     }
   }
+  
+  class CachedEntryWrapper {
+
+    private Object key, value;
+
+    public CachedEntryWrapper(LocalRegion.NonTXEntry entry) {
+      this.key = entry.getKey();
+      this.value = entry.getValue();
+    }
+
+    public Object getKey() {
+      return this.key;
+    }
+
+    public Object getValue() {
+      return this.value;
+    }
+
+    public String toString() {
+      return new StringBuilder("CachedEntryWrapper@").append(
+          Integer.toHexString(System.identityHashCode(this))).append(' ')
+          .append(this.key).append(' ').append(this.value).toString();
+    }
+  }
+
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b6a89ad8/gemfire-core/src/test/java/com/gemstone/gemfire/cache/query/functional/IndexOnEntrySetJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/query/functional/IndexOnEntrySetJUnitTest.java
b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/query/functional/IndexOnEntrySetJUnitTest.java
new file mode 100644
index 0000000..e375b90
--- /dev/null
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/query/functional/IndexOnEntrySetJUnitTest.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.query.functional;
+
+import java.text.ParseException;
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.AttributesFactory;
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.DataPolicy;
+import com.gemstone.gemfire.cache.PartitionAttributesFactory;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionAttributes;
+import com.gemstone.gemfire.cache.query.CacheUtils;
+import com.gemstone.gemfire.cache.query.Index;
+import com.gemstone.gemfire.cache.query.QueryService;
+import com.gemstone.gemfire.cache.query.SelectResults;
+import com.gemstone.gemfire.cache.query.Struct;
+import com.gemstone.gemfire.cache.query.internal.QueryObserverAdapter;
+import com.gemstone.gemfire.cache.query.internal.QueryObserverHolder;
+import com.gemstone.gemfire.cache.query.internal.index.IndexManager;
+import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
+
+@Category(IntegrationTest.class)
+public class IndexOnEntrySetJUnitTest {
+
+  private static String testRegionName = "regionName";
+  private static Region testRegion;
+  private static int numElem = 100;
+  private String newValue = "NEW VALUE";
+
+   @Before
+  public void setUp() throws Exception {    
+    System.setProperty("gemfire.Query.VERBOSE", "true");
+    CacheUtils.startCache();
+  }
+
+   @After
+  public void tearDown() throws Exception {
+    // Destroy current Region for other tests    
+    IndexManager.testHook = null;
+    if (testRegion != null) {
+      testRegion.destroyRegion();
+    }
+    CacheUtils.closeCache();    
+  }
+
+  private String[] getQueriesOnRegion(String regionName) {
+    return new String[] {"SELECT DISTINCT entry.value, entry.key FROM /" + regionName + ".entrySet
entry WHERE entry.key.PartitionID > 0 AND " + "entry.key.Index > 1 ORDER BY entry.key.Index
ASC LIMIT 2",
+    "SELECT DISTINCT entry.value, entry.key FROM /" + regionName + ".entrySet entry WHERE
entry.key.Index > 1 ORDER BY entry.key.Index ASC LIMIT 2",
+    "SELECT DISTINCT * FROM /" + regionName + ".entrySet entry WHERE entry.key.PartitionID
> 0 AND " + "entry.key.Index > 1 ORDER BY entry.key.Index ASC LIMIT 2",
+    "SELECT DISTINCT entry.value, entry.key FROM /" + regionName + ".entrySet entry WHERE
entry.key.PartitionID > 0 AND " + "entry.key.Index > 1 LIMIT 2",
+    "SELECT DISTINCT entry.value, entry.key FROM /" + regionName + ".entrySet entry WHERE
entry.key.PartitionID > 0 AND " + "entry.key.Index > 1 ORDER BY entry.key.Index ASC",};
+  }
+  
+  private String[] getQueriesOnRegionForPut(String regionName) {
+    return new String[] {"SELECT DISTINCT entry.value, entry.key FROM /" + regionName + ".entrySet
entry WHERE entry.key.PartitionID = 50 AND " + "entry.key.Index > 1 ORDER BY entry.key.Index
ASC LIMIT 2",
+        "SELECT DISTINCT entry.value, entry.key FROM /" + regionName + ".entrySet entry WHERE
entry.value = 50 AND " + "entry.key.Index > 1 ORDER BY entry.key.Index ASC LIMIT 2"};
+  }
+
+  /**
+   * Test queries with index on replicated regions and concurrent PUT, DESTORY, INVALIDATE
operations.
+   * Make sure there is no UNDEFINED in the query result.
+   */
+  @Test
+  public void testQueriesOnReplicatedRegion() throws Exception {
+    testRegion = createReplicatedRegion(testRegionName);
+    String regionPath = "/" + testRegionName + ".entrySet entry";
+    executeQueryTest(getQueriesOnRegion(testRegionName), "entry.key.Index", regionPath);
+  }
+
+  /**
+   * Test queries with index on partitioned regions and concurrent PUT, DESTORY, INVALIDATE
operations.
+   * Make sure there is no UNDEFINED in the query result.
+   */
+  @Test
+  public void testQueriesOnPartitionedRegion() throws Exception {
+    testRegion = createPartitionedRegion(testRegionName);
+    String regionPath = "/" + testRegionName + ".entrySet entry";
+    executeQueryTest(getQueriesOnRegion(testRegionName), "entry.key.Index", regionPath);
+  }
+
+  private Region createReplicatedRegion(String regionName) throws ParseException {
+    Cache cache = CacheUtils.getCache();
+    AttributesFactory attributesFactory = new AttributesFactory();
+    attributesFactory.setDataPolicy(DataPolicy.REPLICATE);
+    RegionAttributes regionAttributes = attributesFactory.create();
+    return cache.createRegion(regionName, regionAttributes);
+  }
+
+  private Region createPartitionedRegion(String regionName) throws ParseException {
+    Cache cache = CacheUtils.getCache();
+    PartitionAttributesFactory prAttFactory = new PartitionAttributesFactory();
+    AttributesFactory attributesFactory = new AttributesFactory();
+    attributesFactory.setPartitionAttributes(prAttFactory.create());
+    RegionAttributes regionAttributes = attributesFactory.create();
+    return cache.createRegion(regionName, regionAttributes);
+  }
+
+  private void populateRegion(Region region) throws Exception {
+    for (int i = 1; i <= numElem; i++) {
+      putData(i, region);
+    }
+  }
+
+  private void putData(int id, Region region) throws ParseException {
+    region.put(new SomeKey(id, id), id);
+  }
+  
+  private void clearData(Region region) {
+    Iterator it = region.entrySet().iterator();
+    while (it.hasNext()) {
+      Region.Entry entry = (Region.Entry) it.next();
+      region.destroy(entry.getKey());
+    }
+  }
+
+  /**** Query Execution Helpers ****/
+
+  private void executeQueryTest(String[] queries, String indexedExpression, String regionPath)
throws Exception {
+    Cache cache = CacheUtils.getCache();
+    boolean[] booleanVals = {true, false};
+    for (String query : queries) {
+      for (boolean isDestroy : booleanVals) {
+        clearData(testRegion);
+        populateRegion(testRegion);
+        Assert.assertNotNull(cache.getRegion(testRegionName));
+        Assert.assertEquals(numElem, cache.getRegion(testRegionName).size());
+        if (isDestroy) {
+          helpTestFunctionalIndexForQuery(query, indexedExpression, regionPath, new DestroyEntryTestHook(testRegion));
+        }
+        else {
+          helpTestFunctionalIndexForQuery(query, indexedExpression, regionPath, new InvalidateEntryTestHook(testRegion));
+        }
+      }
+    }
+    
+    queries = getQueriesOnRegionForPut(testRegionName);
+    for (String query : queries) {
+      clearData(testRegion);
+      populateRegion(testRegion);
+      Assert.assertNotNull(cache.getRegion(testRegionName));
+      Assert.assertEquals(numElem, cache.getRegion(testRegionName).size());
+      helpTestFunctionalIndexForQuery(query, indexedExpression, regionPath, new PutEntryTestHook(testRegion));
+    }
+  }
+
+  /**
+   *  helper method to test against a functional index
+   *  make sure there is no UNDEFINED result
+   * 
+   * @param query
+   * 
+   * @throws Exception */
+  private SelectResults helpTestFunctionalIndexForQuery(String query, String indexedExpression,
String regionPath, AbstractTestHook testHook) throws Exception {
+    MyQueryObserverAdapter observer = new MyQueryObserverAdapter();
+    QueryObserverHolder.setInstance(observer);
+    IndexManager.testHook = testHook;
+    QueryService qs = CacheUtils.getQueryService();
+    Index index = qs.createIndex("testIndex", indexedExpression, regionPath);
+    SelectResults indexedResults = (SelectResults) qs.newQuery(query).execute();
+    Iterator iterator = indexedResults.iterator();
+    while (iterator.hasNext()) {
+      Object row = iterator.next();
+      if (row instanceof Struct) {
+        Object[] fields = ((Struct) row).getFieldValues();
+        for (Object field: fields) {
+          Assert.assertTrue(field != QueryService.UNDEFINED);
+          if (field instanceof String) {
+            Assert.assertTrue(((String) field).compareTo(newValue) != 0);
+          }
+        }
+      }
+      else {
+        Assert.assertTrue(row != QueryService.UNDEFINED);
+        if (row instanceof String) {
+          Assert.assertTrue(((String) row).compareTo(newValue) != 0);
+        }
+      }
+    }
+    Assert.assertTrue(indexedResults.size() > 0);
+    Assert.assertTrue(observer.indexUsed);
+    Assert.assertTrue(((AbstractTestHook) IndexManager.testHook).isTestHookCalled());
+    ((AbstractTestHook) IndexManager.testHook).reset();
+    qs.removeIndex(index);
+   
+    return indexedResults;
+  }
+
+  class MyQueryObserverAdapter extends QueryObserverAdapter {
+    public boolean indexUsed = false;
+
+    public void afterIndexLookup(Collection results) {
+      super.afterIndexLookup(results);
+      indexUsed = true;
+    }
+  }
+
+  class SomeKey {
+    public int Index = 1;
+    public int PartitionID = 1;
+
+    public SomeKey(int index, int partitionId) {
+      this.Index = index;
+      this.PartitionID = partitionId;
+    }
+
+    public boolean equals(Object other) {
+      if (other instanceof SomeKey) {
+        SomeKey otherKey = (SomeKey) other;
+        return this.Index == otherKey.Index && this.PartitionID == otherKey.PartitionID;
+      }
+      return false;
+    }
+
+    public String toString() {
+      return "somekey:" + Index + "," + PartitionID;
+
+    }
+  }
+
+  /**
+   * Test hook
+   */
+  abstract class AbstractTestHook implements IndexManager.TestHook {
+    boolean isTestHookCalled = false;
+    Object waitObj = new Object();
+    Region r;
+
+    public void reset() {
+      isTestHookCalled = false;
+    }
+    
+    public boolean isTestHookCalled() {
+      return isTestHookCalled;
+    }
+    
+    /**
+     * Subclass override with different operation
+     */
+    public abstract void doOp();
+    
+    @Override
+    public void hook(int spot) throws RuntimeException {
+      if (spot == 200) {
+        if (!isTestHookCalled) {
+          isTestHookCalled = true;
+          try {
+            new Thread(new Runnable() {
+              public void run() {
+                doOp();
+                synchronized (waitObj) {
+                  waitObj.notifyAll();
+                }
+              }
+            }).start();
+            synchronized (waitObj) {
+              waitObj.wait();
+            }
+          } catch (InterruptedException e) {
+            throw new Error(e);
+          }
+        }
+      }
+
+    }
+
+  }
+  
+  class DestroyEntryTestHook extends AbstractTestHook {
+    
+    DestroyEntryTestHook(Region r) {
+      this.r = r;
+    }
+    
+    public void doOp() {
+      Iterator it = r.entrySet().iterator();
+      while (it.hasNext()) {
+        Region.Entry entry = (Region.Entry) it.next();
+          r.destroy(entry.getKey());
+        }      
+    }
+  }
+  
+  class InvalidateEntryTestHook extends AbstractTestHook {
+    
+    InvalidateEntryTestHook(Region r) {
+      this.r = r;
+    }
+    
+    public void doOp() {
+      Iterator it = r.entrySet().iterator();
+      while (it.hasNext()) {
+        Region.Entry entry = (Region.Entry) it.next();
+          r.invalidate(entry.getKey());
+      }      
+    }
+  }
+  
+  class PutEntryTestHook extends AbstractTestHook {
+    
+    PutEntryTestHook(Region r) {
+      this.r = r;
+    }
+    
+    public void doOp() {
+      Iterator it = r.entrySet().iterator();
+      while (it.hasNext()) {
+        Region.Entry entry = (Region.Entry) it.next();
+        r.put(entry.getKey(), newValue);
+      }      
+    }
+  }
+}


Mime
View raw message