geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r..@apache.org
Subject [33/51] [partial] incubator-geode git commit: WAN and CQ code drop under the Pivotal SGA
Date Wed, 20 Jan 2016 02:22:39 GMT
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/test/java/com/gemstone/gemfire/internal/cache/RemoteCQTransactionDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-cq/src/test/java/com/gemstone/gemfire/internal/cache/RemoteCQTransactionDUnitTest.java b/gemfire-cq/src/test/java/com/gemstone/gemfire/internal/cache/RemoteCQTransactionDUnitTest.java
new file mode 100755
index 0000000..6413518
--- /dev/null
+++ b/gemfire-cq/src/test/java/com/gemstone/gemfire/internal/cache/RemoteCQTransactionDUnitTest.java
@@ -0,0 +1,1106 @@
+package com.gemstone.gemfire.internal.cache;
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import com.gemstone.gemfire.cache.AttributesFactory;
+import com.gemstone.gemfire.cache.CacheEvent;
+import com.gemstone.gemfire.cache.CacheListener;
+import com.gemstone.gemfire.cache.CacheTransactionManager;
+import com.gemstone.gemfire.cache.CacheWriter;
+import com.gemstone.gemfire.cache.CacheWriterException;
+import com.gemstone.gemfire.cache.DataPolicy;
+import com.gemstone.gemfire.cache.EntryEvent;
+import com.gemstone.gemfire.cache.EntryNotFoundException;
+import com.gemstone.gemfire.cache.InterestPolicy;
+import com.gemstone.gemfire.cache.PartitionAttributesFactory;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionEvent;
+import com.gemstone.gemfire.cache.Scope;
+import com.gemstone.gemfire.cache.SubscriptionAttributes;
+import com.gemstone.gemfire.cache.TransactionEvent;
+import com.gemstone.gemfire.cache.TransactionListener;
+import com.gemstone.gemfire.cache.TransactionWriter;
+import com.gemstone.gemfire.cache.client.ClientCache;
+import com.gemstone.gemfire.cache.client.ClientCacheFactory;
+import com.gemstone.gemfire.cache.client.ClientRegionFactory;
+import com.gemstone.gemfire.cache.client.ClientRegionShortcut;
+import com.gemstone.gemfire.cache.execute.Function;
+import com.gemstone.gemfire.cache.execute.FunctionContext;
+import com.gemstone.gemfire.cache.query.CqAttributes;
+import com.gemstone.gemfire.cache.query.CqAttributesFactory;
+import com.gemstone.gemfire.cache.query.CqEvent;
+import com.gemstone.gemfire.cache.query.CqListener;
+import com.gemstone.gemfire.cache.server.CacheServer;
+import com.gemstone.gemfire.cache.util.CacheListenerAdapter;
+import com.gemstone.gemfire.cache.util.CacheWriterAdapter;
+import com.gemstone.gemfire.cache30.CacheTestCase;
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.internal.AvailablePort;
+import com.gemstone.gemfire.internal.cache.execute.CustomerIDPartitionResolver;
+import com.gemstone.gemfire.internal.cache.execute.data.CustId;
+import com.gemstone.gemfire.internal.cache.execute.data.Customer;
+import com.gemstone.gemfire.internal.cache.execute.data.Order;
+import com.gemstone.gemfire.internal.cache.execute.data.OrderId;
+
+import dunit.Host;
+import dunit.SerializableCallable;
+import dunit.VM;
+
+/**
+ * @author sbawaska
+ *
+ */
+public class RemoteCQTransactionDUnitTest extends CacheTestCase {
+  final protected String CUSTOMER = "custRegion";
+  final protected String ORDER = "orderRegion";
+  final protected String D_REFERENCE = "distrReference";
+  
+  private final SerializableCallable getNumberOfTXInProgress = new SerializableCallable() {
+    public Object call() throws Exception {
+      TXManagerImpl mgr = getGemfireCache().getTxManager();
+      return mgr.hostedTransactionsInProgressForTest();
+    }
+  };
+  private final SerializableCallable verifyNoTxState = new SerializableCallable() {
+    public Object call() throws Exception {
+      //TXManagerImpl mgr = getGemfireCache().getTxManager();
+      //assertEquals(0, mgr.hostedTransactionsInProgressForTest());
+      final TXManagerImpl mgr = getGemfireCache().getTxManager();
+      waitForCriterion(new WaitCriterion() {
+        @Override
+        public boolean done() {
+          return mgr.hostedTransactionsInProgressForTest() == 0;
+        }
+
+        @Override
+        public String description() {
+          return "";
+        }
+      }, 30 * 1000, 500, true/*throwOnTimeout*/);
+      return null;
+    }
+  };
+  /**
+   * @param name
+   */
+  public RemoteCQTransactionDUnitTest(String name) {
+    super(name);
+  }
+
+  protected enum OP {
+    PUT, GET, DESTROY, INVALIDATE, KEYS, VALUES, ENTRIES, PUTALL, GETALL, REMOVEALL
+  }
+  
+  @Override
+  public void tearDown2() throws Exception {
+//    try { Thread.sleep(5000); } catch (InterruptedException e) { } // FOR MANUAL TESTING OF STATS - DON"T KEEP THIS
+    try {
+      invokeInEveryVM(verifyNoTxState);
+    } finally {
+      closeAllCache();
+      super.tearDown2();
+    }
+  }
+  
+  void createRegion(boolean accessor, int redundantCopies, InterestPolicy interestPolicy) {
+    AttributesFactory af = new AttributesFactory();
+    af.setScope(Scope.DISTRIBUTED_ACK);
+    af.setDataPolicy(DataPolicy.REPLICATE);
+    af.setConcurrencyChecksEnabled(getConcurrencyChecksEnabled());
+    getCache().createRegion(D_REFERENCE,af.create());
+    af = new AttributesFactory();
+    af.setConcurrencyChecksEnabled(getConcurrencyChecksEnabled());
+    if (interestPolicy != null) {
+      af.setSubscriptionAttributes(new SubscriptionAttributes(interestPolicy));
+    }
+    af.setPartitionAttributes(new PartitionAttributesFactory<CustId, Customer>()
+        .setTotalNumBuckets(4).setLocalMaxMemory(accessor ? 0 : 1)
+        .setPartitionResolver(new CustomerIDPartitionResolver("resolver1"))
+        .setRedundantCopies(redundantCopies).create());
+    getCache().createRegion(CUSTOMER, af.create());
+    af.setPartitionAttributes(new PartitionAttributesFactory<OrderId, Order>()
+        .setTotalNumBuckets(4).setLocalMaxMemory(accessor ? 0 : 1)
+        .setPartitionResolver(new CustomerIDPartitionResolver("resolver2"))
+        .setRedundantCopies(redundantCopies).setColocatedWith(CUSTOMER).create());
+    getCache().createRegion(ORDER, af.create());
+  }
+
+  protected boolean getConcurrencyChecksEnabled() {
+    return false;
+  }
+
+  void populateData() {
+    Region custRegion = getCache().getRegion(CUSTOMER);
+    Region orderRegion = getCache().getRegion(ORDER);
+    Region refRegion = getCache().getRegion(D_REFERENCE);
+    for (int i=0; i<5; i++) {
+      CustId custId = new CustId(i);
+      Customer customer = new Customer("customer"+i, "address"+i);
+      OrderId orderId = new OrderId(i, custId);
+      Order order = new Order("order"+i);
+      custRegion.put(custId, customer);
+      orderRegion.put(orderId, order);
+      refRegion.put(custId,customer);
+    }
+  }
+
+  protected void initAccessorAndDataStore(VM accessor, VM datastore, final int redundantCopies) {
+    accessor.invoke(new SerializableCallable() {
+      public Object call() throws Exception {
+        createRegion(true/*accessor*/, redundantCopies, null);
+        return null;
+      }
+    });
+
+    datastore.invoke(new SerializableCallable() {
+      public Object call() throws Exception {
+        createRegion(false/*accessor*/, redundantCopies, null);
+        populateData();
+        return null;
+      }
+    });
+  }
+
+  protected void initAccessorAndDataStore(VM accessor, VM datastore1, VM datastore2, final int redundantCopies) {
+    datastore2.invoke(new SerializableCallable() {
+      public Object call() throws Exception {
+        createRegion(false/*accessor*/, redundantCopies, null);
+        return null;
+      }
+    });
+
+    initAccessorAndDataStore(accessor, datastore1, redundantCopies);
+  }
+
+  private void initAccessorAndDataStoreWithInterestPolicy(VM accessor, VM datastore1, VM datastore2, final int redundantCopies) {
+    datastore2.invoke(new SerializableCallable() {
+      public Object call() throws Exception {
+        createRegion(false/*accessor*/, redundantCopies, InterestPolicy.ALL);
+        return null;
+      }
+    });
+
+    initAccessorAndDataStore(accessor, datastore1, redundantCopies);
+  }
+
+  
+  void validateContains(CustId custId, Set<OrderId> orderId, boolean doesIt)  {
+    validateContains(custId,orderId,doesIt,doesIt);
+  }
+  
+  void validateContains(CustId custId, Set<OrderId> ordersSet, boolean containsKey,boolean containsValue) {
+    Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER);
+    Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER);
+    Region<CustId, Order> refRegion = getCache().getRegion(D_REFERENCE);
+    boolean rContainsKC = custRegion.containsKey(custId);
+    boolean rContainsKO = containsKey;
+    for (OrderId o : ordersSet) {
+      getGemfireCache().getLoggerI18n().fine("SWAP:rContainsKO:"+rContainsKO+" containsKey:"+orderRegion.containsKey(o));
+      rContainsKO = rContainsKO && orderRegion.containsKey(o);
+    }
+    boolean rContainsKR = refRegion.containsKey(custId);
+    
+    boolean rContainsVC = custRegion.containsValueForKey(custId);
+    boolean rContainsVO = containsValue;
+    for (OrderId o: ordersSet) {
+      rContainsVO = rContainsVO && orderRegion.containsValueForKey(o);
+    }
+    boolean rContainsVR = refRegion.containsValueForKey(custId);
+    
+    
+    assertEquals(containsKey,rContainsKC);
+    assertEquals(containsKey,rContainsKO);
+    assertEquals(containsKey,rContainsKR);
+    assertEquals(containsValue,rContainsVR);
+    assertEquals(containsValue,rContainsVC);
+    assertEquals(containsValue,rContainsVO);
+    
+    
+    if(containsKey) {
+      Region.Entry eC =  custRegion.getEntry(custId);
+      for (OrderId o : ordersSet) {
+        assertNotNull(orderRegion.getEntry(o));
+      }
+      Region.Entry eR = refRegion.getEntry(custId);
+      assertNotNull(eC);
+      assertNotNull(eR);
+//      assertEquals(1,custRegion.size());
+  //    assertEquals(1,orderRegion.size());
+    //  assertEquals(1,refRegion.size());
+      
+    } else {
+      //assertEquals(0,custRegion.size());
+      //assertEquals(0,orderRegion.size());
+      //assertEquals(0,refRegion.size());
+      try {
+        Region.Entry eC =  custRegion.getEntry(custId);
+        assertNull("should have had an EntryNotFoundException:"+eC,eC);
+        
+      } catch(EntryNotFoundException enfe) {
+        // this is what we expect
+      }
+      try {
+        for (OrderId o : ordersSet) {
+          assertNull("should have had an EntryNotFoundException:"+orderRegion.getEntry(o),orderRegion.getEntry(o));
+        }
+        
+      } catch(EntryNotFoundException enfe) {
+        // this is what we expect
+      }
+      try {
+        Region.Entry eR =  refRegion.getEntry(custId);
+        assertNull("should have had an EntryNotFoundException:"+eR,eR);
+      } catch(EntryNotFoundException enfe) {
+        // this is what we expect
+      }
+      
+    }
+    
+  }
+
+
+  void verifyAfterCommit(OP op) {
+    Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER);
+    Region<CustId, Customer> refRegion = getCache().getRegion(D_REFERENCE);
+    Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER);
+    CustId custId = new CustId(1);
+    OrderId orderId = new OrderId(1, custId);
+    OrderId orderId2 = new OrderId(2, custId);
+    OrderId orderId3 = new OrderId(3, custId);
+    Customer expectedCust;
+    Order expectedOrder;
+    Order expectedOrder2;
+    Order expectedOrder3;
+    Customer expectedRef;
+    switch (op) {
+    case PUT:
+      expectedCust = new Customer("foo", "bar");
+      expectedOrder = new Order("fooOrder");
+      expectedOrder2 = new Order("fooOrder2");
+      expectedOrder3 = new Order("fooOrder3");
+      expectedRef = expectedCust;
+      assertNotNull(custRegion.getEntry(custId));
+      assertEquals(expectedCust, custRegion.getEntry(custId).getValue());
+      /*
+      assertNotNull(orderRegion.getEntry(orderId));
+      assertEquals(expectedOrder, orderRegion.getEntry(orderId).getValue());
+      
+      assertNotNull(orderRegion.getEntry(orderId2));
+      assertEquals(expectedOrder2, orderRegion.getEntry(orderId2).getValue());
+      
+      assertNotNull(orderRegion.getEntry(orderId3));
+      assertEquals(expectedOrder3, orderRegion.getEntry(orderId3).getValue());
+      */
+      assertNotNull(refRegion.getEntry(custId));
+      assertEquals(expectedRef, refRegion.getEntry(custId).getValue());
+      
+      //Set<OrderId> ordersSet = new HashSet<OrderId>();
+      //ordersSet.add(orderId);ordersSet.add(orderId2);ordersSet.add(orderId3);
+      //validateContains(custId, ordersSet, true);
+      break;
+    case GET:
+      expectedCust = custRegion.get(custId);
+      expectedOrder = orderRegion.get(orderId);
+      expectedRef = refRegion.get(custId);
+      validateContains(custId, Collections.singleton(orderId), true);
+      break;
+    case DESTROY:
+      assertTrue(!custRegion.containsKey(custId));
+      assertTrue(!orderRegion.containsKey(orderId));
+      assertTrue(!refRegion.containsKey(custId));
+      validateContains(custId, Collections.singleton(orderId), false);
+      break;
+    case INVALIDATE:
+      boolean validateContainsKey = true;
+      if (!((GemFireCacheImpl)custRegion.getCache()).isClient()) {
+        assertTrue(custRegion.containsKey(custId));
+        assertTrue(orderRegion.containsKey(orderId));
+        assertTrue(refRegion.containsKey(custId));
+      }
+      assertNull(custRegion.get(custId));
+      assertNull(orderRegion.get(orderId));
+      assertNull(refRegion.get(custId));
+      validateContains(custId,Collections.singleton(orderId),validateContainsKey,false);
+      break;
+    default:
+      throw new IllegalStateException();
+    }
+  }
+  
+  
+  void verifyAfterRollback(OP op) {
+    Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER);
+    Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER);
+    Region<CustId, Customer> refRegion = getCache().getRegion(D_REFERENCE);
+    assertNotNull(custRegion);
+    assertNotNull(orderRegion);
+    assertNotNull(refRegion);
+    
+    CustId custId = new CustId(1);
+    OrderId orderId = new OrderId(1, custId);
+    OrderId orderId2 = new OrderId(2, custId);
+    OrderId orderId3 = new OrderId(3, custId);
+    Customer expectedCust;
+    Order expectedOrder;
+    Customer expectedRef;
+    switch (op) {
+    case PUT:
+      expectedCust = new Customer("customer1", "address1");
+      expectedOrder = new Order("order1");
+      expectedRef = new Customer("customer1", "address1");
+      assertEquals(expectedCust, custRegion.getEntry(custId).getValue());
+      assertEquals(expectedOrder, orderRegion.getEntry(orderId).getValue());
+      getCache().getLogger().info("SWAP:verifyRollback:"+orderRegion);
+      getCache().getLogger().info("SWAP:verifyRollback:"+orderRegion.getEntry(orderId2));
+      assertNull(getGemfireCache().getTXMgr().getTXState());
+      assertNull(""+orderRegion.getEntry(orderId2),orderRegion.getEntry(orderId2));
+      assertNull(orderRegion.getEntry(orderId3));
+      assertNull(orderRegion.get(orderId2));
+      assertNull(orderRegion.get(orderId3));
+      assertEquals(expectedRef, refRegion.getEntry(custId).getValue());
+      validateContains(custId, Collections.singleton(orderId), true);
+      break;
+    case GET:
+      expectedCust = custRegion.getEntry(custId).getValue();
+      expectedOrder = orderRegion.getEntry(orderId).getValue();
+      expectedRef = refRegion.getEntry(custId).getValue();
+      validateContains(custId, Collections.singleton(orderId), true);
+      break;
+    case DESTROY:
+      assertTrue(!custRegion.containsKey(custId));
+      assertTrue(!orderRegion.containsKey(orderId));
+      assertTrue(!refRegion.containsKey(custId));
+      validateContains(custId, Collections.singleton(orderId), true);
+      break;
+    case INVALIDATE:
+      assertTrue(custRegion.containsKey(custId));
+      assertTrue(orderRegion.containsKey(orderId));
+      assertTrue(refRegion.containsKey(custId));
+      assertNull(custRegion.get(custId));
+      assertNull(orderRegion.get(orderId));
+      assertNull(refRegion.get(custId));
+      validateContains(custId,Collections.singleton(orderId),true,true);
+      break;
+    default:
+      throw new IllegalStateException();
+    }
+  }
+  
+  
+  
+
+
+  abstract class CacheCallback {
+    protected boolean isAccessor;
+    protected Exception ex = null;
+    protected void verifyOrigin(EntryEvent event) {
+      try {
+        assertEquals(!isAccessor, event.isOriginRemote());
+      } catch (Exception e) {
+        ex = e;
+      }
+    }
+    protected void verifyPutAll(EntryEvent event) {
+      CustId knownCustId = new CustId(1);
+      OrderId knownOrderId = new OrderId(2, knownCustId);
+      if (event.getKey().equals(knownOrderId)) {
+        try {
+          assertTrue(event.getOperation().isPutAll());
+          assertNotNull(event.getTransactionId());
+        } catch (Exception e) {
+          ex = e;
+        }
+      }
+    }
+
+  }
+  
+  class TestCacheListener extends CacheCallback implements CacheListener {
+    TestCacheListener(boolean isAccessor) {
+      this.isAccessor = isAccessor;
+    }
+    public void afterCreate(EntryEvent event) {
+      verifyOrigin(event);
+      verifyPutAll(event);
+    }
+    public void afterUpdate(EntryEvent event) {
+      verifyOrigin(event);
+      verifyPutAll(event);
+    }
+    public void afterDestroy(EntryEvent event) {
+      verifyOrigin(event);
+    }
+    public void afterInvalidate(EntryEvent event) {
+      verifyOrigin(event);
+    }
+    public void afterRegionClear(RegionEvent event) {
+    }
+    public void afterRegionCreate(RegionEvent event) {
+    }
+    public void afterRegionDestroy(RegionEvent event) {
+    }
+    public void afterRegionInvalidate(RegionEvent event) {
+    }
+    public void afterRegionLive(RegionEvent event) {
+    }
+    public void close() {
+    }
+  }
+
+  class TestCacheWriter extends CacheCallback implements CacheWriter {
+    
+    private volatile boolean wasFired = false;
+    
+    TestCacheWriter(boolean isAccessor) {
+      this.isAccessor = isAccessor;
+    }
+    public void beforeCreate(EntryEvent event) throws CacheWriterException {
+      getGemfireCache().getLogger().info("SWAP:beforeCreate:"+event+" op:"+event.getOperation());
+      verifyOrigin(event);
+      verifyPutAll(event);
+      setFired(event);
+    }
+    public void setFired(EntryEvent event) {
+      wasFired = true;
+    }
+    public void beforeUpdate(EntryEvent event) throws CacheWriterException {
+      getGemfireCache().getLogger().info("SWAP:beforeCreate:"+event+" op:"+event.getOperation());
+      verifyOrigin(event);
+      verifyPutAll(event);
+      setFired(event);
+    }
+    public void beforeDestroy(EntryEvent event) throws CacheWriterException {
+      verifyOrigin(event);
+      setFired(event);
+    }
+    public void beforeRegionClear(RegionEvent event)
+        throws CacheWriterException {
+      setFired(null);
+    }
+    public void beforeRegionDestroy(RegionEvent event)
+        throws CacheWriterException {
+      setFired(null);
+    }
+    public void close() {
+    }
+  }
+
+  abstract class txCallback {
+    protected boolean isAccessor;
+    protected Exception ex = null;
+    protected void verify(TransactionEvent txEvent) {
+      for (CacheEvent e : txEvent.getEvents()) {
+        verifyOrigin(e);
+        verifyPutAll(e);
+      }
+    }
+    private void verifyOrigin(CacheEvent event) {
+      try {
+        assertEquals(true, event.isOriginRemote()); //change to !isAccessor after fixing #41498
+      } catch (Exception e) {
+        ex = e;
+      }
+    }
+    private void verifyPutAll(CacheEvent p_event) {
+      if (!(p_event instanceof EntryEvent)) {
+        return;
+      }
+      EntryEvent event = (EntryEvent)p_event;
+      CustId knownCustId = new CustId(1);
+      OrderId knownOrderId = new OrderId(2, knownCustId);
+      if (event.getKey().equals(knownOrderId)) {
+        try {
+          assertTrue(event.getOperation().isPutAll());
+          assertNotNull(event.getTransactionId());
+        } catch (Exception e) {
+          ex = e;
+        }
+      }
+    }
+  }
+  
+  class TestTxListener extends txCallback implements TransactionListener {
+    private boolean listenerInvoked;
+    TestTxListener(boolean isAccessor) {
+      this.isAccessor = isAccessor;
+    }
+    public void afterCommit(TransactionEvent event) {
+      listenerInvoked = true;
+      verify(event);
+    }
+    public void afterFailedCommit(TransactionEvent event) {
+      verify(event);
+    }
+    public void afterRollback(TransactionEvent event) {
+      listenerInvoked = true;
+      verify(event);
+    }
+    public boolean isListenerInvoked() {
+      return this.listenerInvoked;
+    }
+    public void close() {
+    }
+  }
+
+  class TestTxWriter extends txCallback implements TransactionWriter {
+    public TestTxWriter(boolean isAccessor) {
+      this.isAccessor = isAccessor;
+    }
+    public void beforeCommit(TransactionEvent event) {
+      verify(event);
+    }
+    public void close() {
+    }
+  }
+
+
+  
+  final CustId expectedCustId = new CustId(6);
+  final Customer expectedCustomer = new Customer("customer6", "address6");
+  class TXFunction implements Function {
+    static final String id = "TXFunction";
+    public void execute(FunctionContext context) {
+      Region r = null;
+      r = getGemfireCache().getRegion(CUSTOMER);
+      getGemfireCache().getLogger().fine("SWAP:callingPut");
+      r.put(expectedCustId, expectedCustomer);
+      GemFireCacheImpl.getInstance().getLogger().warning(" XXX DOIN A PUT ",new Exception());
+      context.getResultSender().lastResult(Boolean.TRUE);
+    }
+    public String getId() {
+      return id;
+    }
+    public boolean hasResult() {
+      return true;
+    }
+    public boolean optimizeForWrite() {
+      return true;
+    }
+    public boolean isHA() {
+      return false;
+    }
+  }
+  
+  enum Executions {
+    OnRegion,
+    OnMember
+  }
+  
+  /**
+   * @param ds1
+   * @param pr
+   * @return first key found on the given member
+   */
+  CustId getKeyOnMember(final DistributedMember owner,
+      PartitionedRegion pr) {
+    CustId retVal = null;
+    for (int i=0; i<5; i++) {
+      CustId custId = new CustId(i);
+      DistributedMember member = pr.getOwnerForKey(pr.getKeyInfo(custId));
+      if (member.equals(owner)) {
+        retVal = custId;
+        break;
+      }
+    }
+    return retVal;
+  }
+  /**
+   * @param i
+   * @return
+   */
+  protected Set<Customer> getCustomerSet(int size) {
+    Set<Customer> expectedSet = new HashSet<Customer>();
+    for (int i=0; i<size; i++) {
+      expectedSet.add(new Customer("customer"+i, "address"+i));
+    }
+    return expectedSet;
+  }
+
+  Set<CustId> getCustIdSet(int size) {
+    Set<CustId> expectedSet = new HashSet<CustId>();
+    for (int i=0; i<size; i++) {
+      expectedSet.add(new CustId(i));
+    }
+    return expectedSet;
+  }
+
+
+  class OneUpdateCacheListener extends CacheListenerAdapter {
+    boolean success = false;
+    
+    public boolean getSuccess() {
+      return success;
+    }
+    
+    @Override
+    public void afterCreate(EntryEvent event) {
+      fail("create not expected");
+    }
+    @Override
+    public void afterUpdate(EntryEvent event) {
+      if(!success) {
+        System.out.println("WE WIN!");
+        success = true;
+      } else {
+        fail("Should have only had one update");
+      }
+    }
+    @Override
+    public void afterDestroy(EntryEvent event) {
+      fail("destroy not expected");
+    }
+    @Override
+    public void afterInvalidate(EntryEvent event) {
+      fail("invalidate not expected");
+    }
+  }
+  
+  class OneDestroyAndThenOneCreateCacheWriter extends CacheWriterAdapter {
+    private boolean oneDestroy;
+    private boolean oneCreate;
+    
+    public void checkSuccess() throws Exception {
+      if(oneDestroy && oneCreate) {
+        // chill
+      } else {
+        fail("Didn't get both events. oneDestroy="+oneDestroy+" oneCreate="+oneCreate);
+      }
+    }
+
+    @Override
+    public void beforeCreate(EntryEvent event) throws CacheWriterException {
+     if(!oneDestroy) {
+       fail("destroy should have arrived in writer before create");
+     } else {
+       if(oneCreate) {
+         fail("more than one create detected! expecting destroy then create");
+       } else {
+         oneCreate = true;
+       }
+     }
+    }
+    @Override
+    public void beforeUpdate(EntryEvent event) throws CacheWriterException {
+        fail("update not expected");
+    }
+    @Override
+    public void beforeDestroy(EntryEvent event) throws CacheWriterException {
+      if(oneDestroy) {
+        fail("only one destroy expected");
+      } else {
+        if(oneCreate) {
+          fail("destroy is supposed to precede create");
+        } else {
+          oneDestroy = true;
+        }
+      }
+    }
+    
+  }
+
+  protected Integer startServer(VM vm) {
+    return (Integer) vm.invoke(new SerializableCallable() {
+      public Object call() throws Exception {
+        int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+        CacheServer s = getCache().addCacheServer();
+        s.setPort(port);
+        s.start();
+        return port;
+      }
+    });
+  }
+  protected void createClientRegion(VM vm, final int port, final boolean isEmpty, final boolean ri, final boolean CQ) {
+    vm.invoke(new SerializableCallable() {
+      public Object call() throws Exception {
+        ClientCacheFactory ccf = new ClientCacheFactory();
+        ccf.addPoolServer("localhost"/*getServerHostName(Host.getHost(0))*/, port);
+        ccf.setPoolSubscriptionEnabled(true);
+        ccf.set("log-level", getDUnitLogLevel());
+        ClientCache cCache = getClientCache(ccf);
+        ClientRegionFactory<Integer, String> crf = cCache
+            .createClientRegionFactory(isEmpty ? ClientRegionShortcut.PROXY
+                : ClientRegionShortcut.CACHING_PROXY);
+        crf.setConcurrencyChecksEnabled(getConcurrencyChecksEnabled());
+        crf.addCacheListener(new ClientListener());
+        Region r = crf.create(D_REFERENCE);
+        Region cust = crf.create(CUSTOMER);
+        Region order = crf.create(ORDER);
+        if (ri) {
+          r.registerInterestRegex(".*");
+          cust.registerInterestRegex(".*");
+          order.registerInterestRegex(".*");
+        }
+        if(CQ) {
+          CqAttributesFactory cqf = new CqAttributesFactory();
+          cqf.addCqListener(new ClientCQListener());
+          CqAttributes ca = cqf.create();
+          cCache.getQueryService().newCq("SELECT * FROM "+cust.getFullPath(), ca).execute();
+        }
+        return null;
+      }
+    });
+  }
+
+  protected class ClientCQListener implements CqListener {
+
+    boolean invoked = false;
+    public void onError(CqEvent aCqEvent) {
+      // TODO Auto-generated method stub
+      
+    }
+
+    public void onEvent(CqEvent aCqEvent) {
+      // TODO Auto-generated method stub
+      invoked =true;
+      
+    }
+
+    public void close() {
+      // TODO Auto-generated method stub
+      
+    }
+    
+  }
+  
+protected static class ClientListener extends CacheListenerAdapter {
+    boolean invoked = false;
+    int invokeCount = 0;
+    int invalidateCount = 0;
+    int putCount = 0;
+    boolean putAllOp = false;
+    boolean isOriginRemote = false;
+    int creates;
+    int updates;
+    
+    @Override
+    public void afterCreate(EntryEvent event) {
+      event.getRegion().getCache().getLogger().warning("ZZZ AFTER CREATE:"+event.getKey());
+      invoked = true;
+      invokeCount++;
+      putCount++;
+      creates++;
+      event.getRegion().getCache().getLogger().warning("ZZZ AFTER CREATE:"+event.getKey()+" isPutAll:"+event.getOperation().isPutAll()+" op:"+event.getOperation());
+      putAllOp = event.getOperation().isPutAll();
+      isOriginRemote = event.isOriginRemote();
+    }
+    @Override
+    public void afterUpdate(EntryEvent event) {
+    	event.getRegion().getCache().getLogger().warning("ZZZ AFTER UPDATE:"+event.getKey()+" isPutAll:"+event.getOperation().isPutAll()+" op:"+event.getOperation());
+        putAllOp = event.getOperation().isPutAll();
+      invoked = true;
+      invokeCount++;
+      putCount++;
+      updates++;
+      isOriginRemote = event.isOriginRemote();
+    }
+    
+    @Override
+    public void afterInvalidate(EntryEvent event) {
+      event.getRegion().getCache().getLogger().warning("ZZZ AFTER UPDATE:"+event.getKey());
+      invoked = true;
+      invokeCount++;
+      invalidateCount++;
+      isOriginRemote = event.isOriginRemote();
+    }
+    
+    public void reset() {
+    	invoked = false;
+    	invokeCount = 0;
+    	invalidateCount = 0;
+    	putCount = 0;
+    	isOriginRemote = false;
+    	creates = 0;
+    	updates = 0;
+    }
+  }
+  
+  protected static class ServerListener extends CacheListenerAdapter {
+    boolean invoked = false;
+    int creates;
+    int updates;
+    @Override
+    public void afterCreate(EntryEvent event) {
+      invoked = true;
+      creates++;
+    }
+    @Override
+    public void afterUpdate(EntryEvent event) {
+      invoked = true;
+      updates++;
+    }
+    @Override
+    public void afterDestroy(EntryEvent event) {
+      invoked = true;
+    }
+    @Override
+    public void afterInvalidate(EntryEvent event) {
+      invoked = true;
+    }
+  }
+  
+  private static final String EMPTY_REGION = "emptyRegionName";
+  
+  public void testTXWithCQCommitInDatastoreCQ() throws Exception {
+    Host host = Host.getHost(0);
+    VM accessor = host.getVM(0);
+    VM datastore = host.getVM(1);
+    VM client = host.getVM(2);
+    
+    initAccessorAndDataStore(accessor, datastore, 0);
+    int port = startServer(datastore);
+    
+    createClientRegion(client, port, false, true, true);
+    datastore.invoke(new SerializableCallable() {
+      public Object call() throws Exception {
+        Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER);
+        Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER);
+        Region<CustId,Customer> refRegion = getCache().getRegion(D_REFERENCE);
+        CustId custId = new CustId(1);
+        OrderId orderId = new OrderId(1, custId);
+        getCache().getCacheTransactionManager().begin();
+        custRegion.put(custId, new Customer("foo", "bar"));
+        orderRegion.put(orderId, new Order("fooOrder"));
+        refRegion.put(custId, new Customer("foo", "bar"));
+        getCache().getCacheTransactionManager().commit();
+        return null;
+      }
+    });
+    
+    Thread.sleep(10000);
+    client.invoke(new SerializableCallable() {
+      public Object call() throws Exception {
+        Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER);
+        Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER);
+        Region<CustId,Customer> refRegion = getCache().getRegion(D_REFERENCE);
+        ClientListener cl = (ClientListener) custRegion.getAttributes().getCacheListeners()[0];
+        getCache().getLogger().info("SWAP:CLIENTinvoked:"+cl.invoked);
+        
+        assertTrue(((ClientCQListener)custRegion.getCache().getQueryService().getCqs()[0].getCqAttributes().getCqListener()).invoked);
+        assertTrue(cl.invoked);
+        return null;
+      }
+    });
+  }
+  
+  
+  public void testTXWithCQCommitInDatastoreConnectedToAccessorCQ() throws Exception {
+    Host host = Host.getHost(0);
+    VM accessor = host.getVM(0);
+    VM datastore = host.getVM(1);
+    VM client = host.getVM(2);
+    
+    initAccessorAndDataStore(accessor, datastore, 0);
+    int port = startServer(accessor);
+    
+    createClientRegion(client, port, false, true, true);
+    datastore.invoke(new SerializableCallable() {
+      public Object call() throws Exception {
+        Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER);
+        Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER);
+        Region<CustId,Customer> refRegion = getCache().getRegion(D_REFERENCE);
+        CustId custId = new CustId(1);
+        OrderId orderId = new OrderId(1, custId);
+        getCache().getCacheTransactionManager().begin();
+        custRegion.put(custId, new Customer("foo", "bar"));
+//        orderRegion.put(orderId, new Order("fooOrder"));
+//        refRegion.put(custId, new Customer("foo", "bar"));
+        getCache().getCacheTransactionManager().commit();
+        return null;
+      }
+    });
+    
+    Thread.sleep(10000);
+    client.invoke(new SerializableCallable() {
+      public Object call() throws Exception {
+        Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER);
+        Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER);
+        Region<CustId,Customer> refRegion = getCache().getRegion(D_REFERENCE);
+        ClientListener cl = (ClientListener) custRegion.getAttributes().getCacheListeners()[0];
+        getCache().getLogger().info("SWAP:CLIENTinvoked:"+cl.invoked);
+        assertTrue(cl.invoked);
+        assertTrue(((ClientCQListener)custRegion.getCache().getQueryService().getCqs()[0].getCqAttributes().getCqListener()).invoked);
+        return null;
+      }
+    });
+  }
+  
+  
+  public void testTXWithCQCommitInDatastoreConnectedToDatastoreCQ() throws Exception {
+    Host host = Host.getHost(0);
+    VM accessor = host.getVM(0);
+    VM datastore = host.getVM(1);
+    VM client = host.getVM(2);
+    
+    initAccessorAndDataStore(accessor, datastore, 0);
+    int port = startServer(datastore);
+    
+    createClientRegion(client, port, false, true, true);
+    datastore.invoke(new SerializableCallable() {
+      public Object call() throws Exception {
+        Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER);
+        Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER);
+        Region<CustId,Customer> refRegion = getCache().getRegion(D_REFERENCE);
+        CustId custId = new CustId(1);
+        OrderId orderId = new OrderId(1, custId);
+        getCache().getCacheTransactionManager().begin();
+        custRegion.put(custId, new Customer("foo", "bar"));
+//        orderRegion.put(orderId, new Order("fooOrder"));
+//        refRegion.put(custId, new Customer("foo", "bar"));
+        getCache().getCacheTransactionManager().commit();
+        return null;
+      }
+    });
+    
+    Thread.sleep(10000);
+    client.invoke(new SerializableCallable() {
+      public Object call() throws Exception {
+        Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER);
+        Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER);
+        Region<CustId,Customer> refRegion = getCache().getRegion(D_REFERENCE);
+        ClientListener cl = (ClientListener) custRegion.getAttributes().getCacheListeners()[0];
+        getCache().getLogger().info("SWAP:CLIENTinvoked:"+cl.invoked);
+        assertTrue(cl.invoked);
+        assertTrue(((ClientCQListener)custRegion.getCache().getQueryService().getCqs()[0].getCqAttributes().getCqListener()).invoked);
+        return null;
+      }
+    });
+  }
+  
+  public void testTXWithCQCommitInAccessorConnectedToDatastoreCQ() throws Exception {
+    Host host = Host.getHost(0);
+    VM accessor = host.getVM(0);
+    VM datastore = host.getVM(1);
+    VM client = host.getVM(2);
+    
+    initAccessorAndDataStore(accessor, datastore, 0);
+    int port = startServer(datastore);
+    
+    createClientRegion(client, port, false, true, true);
+    accessor.invoke(new SerializableCallable() {
+      public Object call() throws Exception {
+        Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER);
+        Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER);
+        Region<CustId,Customer> refRegion = getCache().getRegion(D_REFERENCE);
+        CustId custId = new CustId(1);
+        OrderId orderId = new OrderId(1, custId);
+        getCache().getCacheTransactionManager().begin();
+        custRegion.put(custId, new Customer("foo", "bar"));
+//        orderRegion.put(orderId, new Order("fooOrder"));
+//        refRegion.put(custId, new Customer("foo", "bar"));
+        getCache().getCacheTransactionManager().commit();
+        return null;
+      }
+    });
+    
+    Thread.sleep(10000);
+    client.invoke(new SerializableCallable() {
+      public Object call() throws Exception {
+        Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER);
+        Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER);
+        Region<CustId,Customer> refRegion = getCache().getRegion(D_REFERENCE);
+        ClientListener cl = (ClientListener) custRegion.getAttributes().getCacheListeners()[0];
+        getCache().getLogger().info("SWAP:CLIENTinvoked:"+cl.invoked);
+        assertTrue(cl.invoked);
+        assertTrue(((ClientCQListener)custRegion.getCache().getQueryService().getCqs()[0].getCqAttributes().getCqListener()).invoked);
+        return null;
+      }
+    });
+  }
+  
+  public void testTXWithCQCommitInAccessorConnectedToAccessorCQ() throws Exception {
+    Host host = Host.getHost(0);
+    VM accessor = host.getVM(0);
+    VM datastore = host.getVM(1);
+    VM client = host.getVM(2);
+    
+    initAccessorAndDataStore(accessor, datastore, 0);
+    int port = startServer(accessor);
+    
+    createClientRegion(client, port, false, true, true);
+    accessor.invoke(new SerializableCallable() {
+      public Object call() throws Exception {
+        Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER);
+        Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER);
+        Region<CustId,Customer> refRegion = getCache().getRegion(D_REFERENCE);
+        CustId custId = new CustId(1);
+        OrderId orderId = new OrderId(1, custId);
+        getCache().getCacheTransactionManager().begin();
+        custRegion.put(custId, new Customer("foo", "bar"));
+//        orderRegion.put(orderId, new Order("fooOrder"));
+//        refRegion.put(custId, new Customer("foo", "bar"));
+        getCache().getCacheTransactionManager().commit();
+        return null;
+      }
+    });
+    
+    Thread.sleep(10000);
+    client.invoke(new SerializableCallable() {
+      public Object call() throws Exception {
+        Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER);
+        Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER);
+        Region<CustId,Customer> refRegion = getCache().getRegion(D_REFERENCE);
+        ClientListener cl = (ClientListener) custRegion.getAttributes().getCacheListeners()[0];
+        getCache().getLogger().info("SWAP:CLIENTinvoked:"+cl.invoked);
+        assertTrue(cl.invoked);
+        assertTrue(((ClientCQListener)custRegion.getCache().getQueryService().getCqs()[0].getCqAttributes().getCqListener()).invoked);
+        return null;
+      }
+    });
+  }
+  
+  
+  public void testCQCommitInDatastoreConnectedToAccessorCQ() throws Exception {
+    Host host = Host.getHost(0);
+    VM accessor = host.getVM(0);
+    VM datastore = host.getVM(1);
+    VM client = host.getVM(2);
+    
+    initAccessorAndDataStore(accessor, datastore, 0);
+    int port = startServer(accessor);
+    
+    createClientRegion(client, port, false, true, true);
+    datastore.invoke(new SerializableCallable() {
+      public Object call() throws Exception {
+        Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER);
+        Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER);
+        Region<CustId,Customer> refRegion = getCache().getRegion(D_REFERENCE);
+        CustId custId = new CustId(1);
+        OrderId orderId = new OrderId(1, custId);
+        getCache().getCacheTransactionManager().begin();
+        custRegion.put(custId, new Customer("foo", "bar"));
+//        orderRegion.put(orderId, new Order("fooOrder"));
+//        refRegion.put(custId, new Customer("foo", "bar"));
+        getCache().getCacheTransactionManager().commit();
+        return null;
+      }
+    });
+    
+    Thread.sleep(10000);
+    client.invoke(new SerializableCallable() {
+      public Object call() throws Exception {
+        Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER);
+        Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER);
+        Region<CustId,Customer> refRegion = getCache().getRegion(D_REFERENCE);
+        ClientListener cl = (ClientListener) custRegion.getAttributes().getCacheListeners()[0];
+        getCache().getLogger().info("SWAP:CLIENTinvoked:"+cl.invoked);
+        assertTrue(cl.invoked);
+        assertTrue(((ClientCQListener)custRegion.getCache().getQueryService().getCqs()[0].getCqAttributes().getCqListener()).invoked);
+        return null;
+      }
+    });
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/test/java/com/gemstone/gemfire/internal/cache/ha/CQListGIIDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-cq/src/test/java/com/gemstone/gemfire/internal/cache/ha/CQListGIIDUnitTest.java b/gemfire-cq/src/test/java/com/gemstone/gemfire/internal/cache/ha/CQListGIIDUnitTest.java
new file mode 100755
index 0000000..05eb566
--- /dev/null
+++ b/gemfire-cq/src/test/java/com/gemstone/gemfire/internal/cache/ha/CQListGIIDUnitTest.java
@@ -0,0 +1,814 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+/**
+ *
+ */
+package com.gemstone.gemfire.internal.cache.ha;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import com.gemstone.gemfire.LogWriter;
+import com.gemstone.gemfire.cache.AttributesFactory;
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.CacheException;
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.DataPolicy;
+import com.gemstone.gemfire.cache.DiskStoreFactory;
+import com.gemstone.gemfire.cache.ExpirationAttributes;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionAttributes;
+import com.gemstone.gemfire.cache.Scope;
+import com.gemstone.gemfire.cache.client.PoolFactory;
+import com.gemstone.gemfire.cache.client.PoolManager;
+import com.gemstone.gemfire.cache.query.CqAttributes;
+import com.gemstone.gemfire.cache.query.CqAttributesFactory;
+import com.gemstone.gemfire.cache.query.CqListener;
+import com.gemstone.gemfire.cache.query.CqQuery;
+import com.gemstone.gemfire.cache.query.QueryService;
+import com.gemstone.gemfire.cache.query.SelectResults;
+import com.gemstone.gemfire.cache.query.cq.dunit.CqQueryTestListener;
+import com.gemstone.gemfire.cache.query.data.Portfolio;
+import com.gemstone.gemfire.cache.server.CacheServer;
+import com.gemstone.gemfire.cache30.CertifiableTestCacheListener;
+import com.gemstone.gemfire.distributed.DistributedSystem;
+import com.gemstone.gemfire.distributed.internal.DistributionConfig;
+import com.gemstone.gemfire.internal.AvailablePort;
+import com.gemstone.gemfire.internal.cache.CacheServerImpl;
+import com.gemstone.gemfire.internal.cache.tier.sockets.CacheServerTestUtil;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ClientUpdateMessageImpl;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ConflationDUnitTest;
+import com.gemstone.gemfire.internal.cache.InternalRegionArguments;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+
+import dunit.DistributedTestCase;
+import dunit.Host;
+import dunit.VM;
+
+/**
+ *
+ *
+ * @author ashetkar
+ * @since 5.7
+ *
+ */
+public class CQListGIIDUnitTest extends DistributedTestCase {
+  private final static int CREATE = 0;
+
+  private final static int UPDATE = 1;
+
+  private final static int DESTROY = 2;
+
+  private final static int INVALIDATE = 3;
+
+  private final static int CLOSE = 4;
+
+  private final static int REGION_CLEAR = 5;
+
+  private final static int REGION_INVALIDATE = 6;
+
+  protected static Cache cache = null;
+
+  protected static VM serverVM0 = null;
+
+  private static VM serverVM1 = null;
+
+  protected static VM clientVM1 = null;
+
+  protected static VM clientVM2 = null;
+
+  private static int PORT1;
+
+  private static int PORT2;
+
+  private static final String regionName = "CQListGIIDUnitTest";
+
+  private static final Map map = new HashMap();
+
+  private static LogWriter logger = null;
+
+  public static final String[] regions = new String[] { "regionA", "regionB" };
+
+  public static final String KEY = "key-";
+
+  public String[] cqs = new String[] {
+  // 0 - Test for ">"
+      "SELECT ALL * FROM /root/" + regions[0] + " p where p.ID > 0",
+
+      // 1 - Test for "=" and "and".
+      "SELECT ALL * FROM /root/" + regions[0]
+          + " p where p.ID = 2 and p.status='active'",
+
+      // 2 - Test for "<" and "and".
+      "SELECT ALL * FROM /root/" + regions[1]
+          + " p where p.ID < 5 and p.status='active'",
+
+      // FOLLOWING CQS ARE NOT TESTED WITH VALUES; THEY ARE USED TO TEST PARSING
+      // LOGIC WITHIN CQ.
+      // 3
+      "SELECT * FROM /root/" + regions[0] + " ;",
+      // 4
+      "SELECT ALL * FROM /root/" + regions[0],
+      // 5
+      "import com.gemstone.gemfire.cache.\"query\".data.Portfolio; "
+          + "SELECT ALL * FROM /root/" + regions[0] + " TYPE Portfolio",
+      // 6
+      "import com.gemstone.gemfire.cache.\"query\".data.Portfolio; "
+          + "SELECT ALL * FROM /root/" + regions[0] + " p TYPE Portfolio",
+      // 7
+      "SELECT ALL * FROM /root/" + regions[1]
+          + " p where p.ID < 5 and p.status='active';",
+      // 8
+      "SELECT ALL * FROM /root/" + regions[0] + "  ;",
+      // 9
+      "SELECT ALL * FROM /root/" + regions[0] + " p where p.description = NULL",
+      // 10
+      "SELECT ALL * FROM /root/" + regions[0]
+          + " p where p.ID > 0 and p.status='active'", };
+
+  /**
+   * @param name
+   *          name of the test
+   */
+  public CQListGIIDUnitTest(String name) {
+    super(name);
+  }
+
+  /**
+   * Sets up the test.
+   */
+  public void setUp() throws Exception {
+    super.setUp();
+
+    final Host host = Host.getHost(0);
+    serverVM0 = host.getVM(0);
+    serverVM1 = host.getVM(1);
+    clientVM1 = host.getVM(2);
+    clientVM2 = host.getVM(3);
+
+    PORT1 = ((Integer)serverVM0.invoke(CQListGIIDUnitTest.class,
+        "createServerCache",
+        new Object[] { HARegionQueue.HA_EVICTION_POLICY_MEMORY })).intValue();
+    PORT2 = ((Integer)serverVM1.invoke(CQListGIIDUnitTest.class,
+        "createServerCache",
+        new Object[] { HARegionQueue.HA_EVICTION_POLICY_ENTRY })).intValue();
+  }
+
+  /**
+   * Tears down the test.
+   */
+  public void tearDown2() throws Exception {
+    super.tearDown2();
+    serverVM0.invoke(ConflationDUnitTest.class, "unsetIsSlowStart");
+    serverVM1.invoke(ConflationDUnitTest.class, "unsetIsSlowStart");
+    closeCache();
+    clientVM1.invoke(CQListGIIDUnitTest.class, "closeCache");
+    clientVM2.invoke(CQListGIIDUnitTest.class, "closeCache");
+    // then close the servers
+    serverVM0.invoke(CQListGIIDUnitTest.class, "closeCache");
+    serverVM1.invoke(CQListGIIDUnitTest.class, "closeCache");
+    disconnectAllFromDS();
+  }
+
+  private void createCache(Properties props) throws Exception {
+    DistributedSystem ds = getSystem(props);
+    ds.disconnect();
+    ds = getSystem(props);
+    assertNotNull(ds);
+    cache = CacheFactory.create(ds);
+    assertNotNull(cache);
+  }
+
+  public static Integer createServerCache() throws Exception {
+    return createServerCache(null);
+  }
+
+  public static Integer createServerCache(String ePolicy) throws Exception {
+    return createServerCache(ePolicy, Integer.valueOf(1));
+  }
+
+  public static Integer createServerCache(String ePolicy, Integer cap)
+      throws Exception {
+    new CQListGIIDUnitTest("temp").createCache(new Properties());
+    AttributesFactory factory = new AttributesFactory();
+    factory.setScope(Scope.DISTRIBUTED_ACK);
+    factory.setDataPolicy(DataPolicy.REPLICATE);
+    RegionAttributes attrs = factory.create();
+    // cache.createRegion(regionName, attrs);
+    createRegion(regions[0], "root", attrs);
+    createRegion(regions[1], "root", attrs);
+    Thread.sleep(2000);
+    logger = cache.getLogger();
+
+    int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+    CacheServer server1 = cache.addCacheServer();
+    server1.setPort(port);
+    server1.setNotifyBySubscription(true);
+    if (ePolicy != null) {
+      File overflowDirectory = new File("bsi_overflow_"+port);
+      overflowDirectory.mkdir();
+      DiskStoreFactory dsf = cache.createDiskStoreFactory();
+      File[] dirs1 = new File[] {overflowDirectory};
+
+      server1.getClientSubscriptionConfig().setEvictionPolicy(ePolicy);
+      server1.getClientSubscriptionConfig().setCapacity(cap.intValue());
+      // specify diskstore for this server
+      server1.getClientSubscriptionConfig().setDiskStoreName(dsf.setDiskDirs(dirs1).create("bsi").getName());
+    }
+    server1.start();
+    Thread.sleep(2000);
+    return Integer.valueOf(server1.getPort());
+  }
+
+  public static Integer createOneMoreBridgeServer(Boolean notifyBySubscription)
+      throws Exception {
+    int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+    CacheServer server1 = cache.addCacheServer();
+    server1.setPort(port);
+    server1.setNotifyBySubscription(notifyBySubscription.booleanValue());
+    server1.getClientSubscriptionConfig().setEvictionPolicy(
+        HARegionQueue.HA_EVICTION_POLICY_MEMORY);
+    // let this server to use default diskstore
+    server1.start();
+    return Integer.valueOf(server1.getPort());
+  }
+
+  public static final Region createRegion(String name, String rootName,
+      RegionAttributes attrs) throws CacheException {
+    Region root = cache.getRegion(rootName);
+    if (root == null) {
+      // don't put listeners on root region
+      RegionAttributes rootAttrs = attrs;
+      AttributesFactory fac = new AttributesFactory(attrs);
+      ExpirationAttributes expiration = ExpirationAttributes.DEFAULT;
+
+      // fac.setCacheListener(null);
+      fac.setCacheLoader(null);
+      fac.setCacheWriter(null);
+      fac.setPoolName(null);
+      fac.setPartitionAttributes(null);
+      fac.setRegionTimeToLive(expiration);
+      fac.setEntryTimeToLive(expiration);
+      fac.setRegionIdleTimeout(expiration);
+      fac.setEntryIdleTimeout(expiration);
+      rootAttrs = fac.create();
+      root = cache.createRegion(rootName, rootAttrs);
+    }
+
+    return createSubregion(root, name, attrs, null);
+  }
+
+  /**
+   * A helper for creating a subregion, potentially using a package protected
+   * method to do so.  
+   * @param root the parent region
+   * @param name the name of the subregion to create
+   * @param attrs the attributes used to create the subregion
+   * @param internalArgs if not null, then use the package protected creation mechanism
+   * @return the subregion whose parent is the provided root
+   * @throws CacheException
+   * @see Region#createSubregion(String, RegionAttributes)
+   * @see LocalRegion#createSubregion(String, RegionAttributes, InternalRegionArguments)
+   */
+  public static Region createSubregion(Region root, String name,
+      RegionAttributes attrs, final InternalRegionArguments internalArgs) throws CacheException
+  {
+    if (internalArgs == null) {
+      return root.createSubregion(name, attrs);
+    } else {
+      try {
+        LocalRegion lr = (LocalRegion) root;
+        return lr.createSubregion(name, attrs, internalArgs);
+      } catch (IOException ioe) {
+        AssertionError assErr = new AssertionError("unexpected exception");
+        assErr.initCause(ioe);
+        throw assErr;
+      } catch (ClassNotFoundException cnfe) {
+        AssertionError assErr = new AssertionError("unexpected exception");
+        assErr.initCause(cnfe);
+        throw assErr;
+      } 
+    }
+  }
+  
+  public static void createClientCache(Integer port1, Integer port2,
+      String rLevel) throws Exception {
+    createClientCache(port1, port2, Integer.valueOf(-1), rLevel, Boolean.FALSE);
+  }
+
+  public static void createClientCache(Integer port1, Integer port2,
+      String rLevel, Boolean addListener) throws Exception {
+    createClientCache(port1, port2, Integer.valueOf(-1), rLevel, addListener);
+  }
+
+  public static void createClientCache(Integer port1, Integer port2,
+      Integer port3, String rLevel) throws Exception {
+    createClientCache(port1, port2, port3, rLevel, Boolean.FALSE);
+  }
+
+  public static void destroyClientPool() {
+    cache.getRegion("root").getSubregion(regions[0]).close();
+    cache.getRegion("root").getSubregion(regions[1]).close();
+    PoolManager.find("clientPool").destroy();
+  }
+  
+  public static void createClientCache(Integer port1, Integer port2,
+      Integer port3, String rLevel, Boolean addListener) throws Exception {
+    CacheServerTestUtil.disableShufflingOfEndpoints();
+    String host = DistributedTestCase.getIPLiteral();
+
+    Properties props = new Properties();
+    props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
+    props.setProperty(DistributionConfig.LOCATORS_NAME, "");
+    new CQListGIIDUnitTest("temp").createCache(props);
+
+    PoolFactory pf = PoolManager.createFactory();
+    int endPointCount = 1;
+    pf.addServer(host, port1);
+    if (port2.intValue() != -1) {
+      pf.addServer(host, port2);
+      endPointCount++;
+    }
+    if (port3.intValue() != -1) {
+      pf.addServer(host, port3);
+      endPointCount++;
+    }
+    pf.setRetryAttempts(5);
+    pf.setReadTimeout(2500);
+    pf.setSocketBufferSize(32768);
+    pf.setPingInterval(1000);
+    pf.setMinConnections(endPointCount*2);
+    pf.setSubscriptionRedundancy(Integer.parseInt(rLevel));
+    pf.setSubscriptionEnabled(true).create("clientPool");
+
+    try {
+      cache.getQueryService();
+    }
+    catch (Exception cqe) {
+      fail("Failed to getCQService.", cqe);
+    }
+
+    AttributesFactory factory = new AttributesFactory();
+    factory.setScope(Scope.LOCAL);
+    factory.setPoolName("clientPool");
+
+    RegionAttributes attrs = factory.create();
+    createRegion(regions[0], "root", attrs);
+    createRegion(regions[1], "root", attrs);
+    logger = cache.getLogger();
+  }
+
+  /* Register CQs */
+  public static void createCQ(String cqName, String queryStr) {
+    getLogWriter().info("### Create CQ. ###" + cqName);
+    // Get CQ Service.
+    QueryService cqService = null;
+    try {
+      cqService = cache.getQueryService();
+    }
+    catch (Exception cqe) {
+      fail("Failed to getCQService.", cqe);
+    }
+    // Create CQ Attributes.
+    CqAttributesFactory cqf = new CqAttributesFactory();
+    CqListener[] cqListeners = { new CqQueryTestListener(getLogWriter()) };
+    ((CqQueryTestListener)cqListeners[0]).cqName = cqName;
+
+    cqf.initCqListeners(cqListeners);
+    CqAttributes cqa = cqf.create();
+
+    // Create CQ.
+    try {
+      CqQuery cq1 = cqService.newCq(cqName, queryStr, cqa);
+      assertTrue("newCq() state mismatch", cq1.getState().isStopped());
+    }
+    catch (Exception ex) {
+      getLogWriter().info("CqService is :" + cqService);
+      ex.printStackTrace();
+      AssertionError err = new AssertionError("Failed to create CQ " + cqName
+          + " . ");
+      err.initCause(ex);
+      throw err;
+    }
+  }
+
+  public static void executeCQ(String cqName, Boolean initialResults) {
+    getLogWriter().info("### DEBUG EXECUTE CQ START ####");
+    // Get CQ Service.
+    QueryService cqService = null;
+    CqQuery cq1 = null;
+    cqService = cache.getQueryService();
+
+    // Get CqQuery object.
+    try {
+      cq1 = cqService.getCq(cqName);
+      if (cq1 == null) {
+        getLogWriter().info(
+            "Failed to get CqQuery object for CQ name: " + cqName);
+        fail("Failed to get CQ " + cqName, new Exception("Failed to get CQ "
+            + cqName));
+      }
+      else {
+        getLogWriter().info("Obtained CQ, CQ name: " + cq1.getName());
+        assertTrue("newCq() state mismatch", cq1.getState().isStopped());
+      }
+    }
+    catch (Exception ex) {
+      getLogWriter().info("CqService is :" + cqService);
+      getLogWriter().error(ex);
+      AssertionError err = new AssertionError("Failed to execute  CQ " + cqName);
+      err.initCause(ex);
+      throw err;
+    }
+
+    if (initialResults.booleanValue()) {
+      SelectResults cqResults = null;
+
+      try {
+        cqResults = cq1.executeWithInitialResults();
+      }
+      catch (Exception ex) {
+        getLogWriter().info("CqService is :" + cqService);
+        ex.printStackTrace();
+        AssertionError err = new AssertionError("Failed to execute  CQ "
+            + cqName);
+        err.initCause(ex);
+        throw err;
+      }
+      getLogWriter().info("initial result size = " + cqResults.size());
+      assertTrue("executeWithInitialResults() state mismatch", cq1.getState()
+          .isRunning());
+      // if (expectedResultsSize >= 0) {
+      // assertEquals("unexpected results size", expectedResultsSize, cqResults
+      // .size());
+      // }
+    }
+    else {
+
+      try {
+        cq1.execute();
+      }
+      catch (Exception ex) {
+        getLogWriter().info("CqService is :" + cqService);
+        ex.printStackTrace();
+        AssertionError err = new AssertionError("Failed to execute  CQ "
+            + cqName);
+        err.initCause(ex);
+        throw err;
+      }
+      assertTrue("execute() state mismatch", cq1.getState().isRunning());
+    }
+  }
+
+  public static void registerInterestListCQ(String regionName, int keySize) {
+    // Get CQ Service.
+    Region region = null;
+    try {
+      region = cache.getRegion("root").getSubregion(regionName);
+      region.getAttributesMutator().setCacheListener(
+          new CertifiableTestCacheListener(getLogWriter()));
+    }
+    catch (Exception cqe) {
+      AssertionError err = new AssertionError("Failed to get Region.");
+      err.initCause(cqe);
+      throw err;
+    }
+
+    try {
+      List list = new ArrayList();
+      for (int i = 1; i <= keySize; i++) {
+        list.add(KEY + i);
+      }
+      region.registerInterest(list);
+    }
+    catch (Exception ex) {
+      AssertionError err = new AssertionError("Failed to Register InterestList");
+      err.initCause(ex);
+      throw err;
+    }
+  }
+
+  public static void waitForCreated(String cqName, String key) {
+    waitForEvent(0, cqName, key);
+  }
+
+  public static void waitForEvent(int event, String cqName, String key) {
+    // Get CQ Service.
+    QueryService cqService = null;
+    try {
+      cqService = cache.getQueryService();
+    }
+    catch (Exception cqe) {
+      cqe.printStackTrace();
+      fail("Failed to getCQService.", cqe);
+    }
+
+    CqQuery cQuery = cqService.getCq(cqName);
+    if (cQuery == null) {
+      fail("Failed to get CqQuery for CQ : " + cqName, new Exception(
+          "Failed to get CqQuery for CQ : " + cqName));
+    }
+
+    CqAttributes cqAttr = cQuery.getCqAttributes();
+    CqListener[] cqListener = cqAttr.getCqListeners();
+    CqQueryTestListener listener = (CqQueryTestListener)cqListener[0];
+
+    switch (event) {
+      case CREATE:
+        listener.waitForCreated(key);
+        break;
+
+      case UPDATE:
+        listener.waitForUpdated(key);
+        break;
+
+      case DESTROY:
+        listener.waitForDestroyed(key);
+        break;
+
+      case INVALIDATE:
+        listener.waitForInvalidated(key);
+        break;
+
+      case CLOSE:
+        listener.waitForClose();
+        break;
+
+      case REGION_CLEAR:
+        listener.waitForRegionClear();
+        break;
+
+      case REGION_INVALIDATE:
+        listener.waitForRegionInvalidate();
+        break;
+
+    }
+  }
+
+  public static void registerInterestListAll() {
+    try {
+      Region r = cache.getRegion("/" + regionName);
+      assertNotNull(r);
+      r.registerInterest("ALL_KEYS");
+    }
+    catch (Exception ex) {
+      fail("failed in registerInterestListAll", ex);
+    }
+  }
+
+  public static void registerInterestList() {
+    try {
+      Region r = cache.getRegion("/" + regionName);
+      assertNotNull(r);
+      r.registerInterest("k1");
+      r.registerInterest("k3");
+      r.registerInterest("k5");
+    }
+    catch (Exception ex) {
+      fail("failed while registering keys", ex);
+    }
+  }
+
+  public static void putEntries(String rName, Integer num) {
+    try {
+      Region r = cache.getRegion("root").getSubregion(rName);
+      assertNotNull(r);
+      for (int i = 0; i < num.longValue(); i++) {
+        r.put(KEY + i, new Portfolio(i + 1));
+      }
+      getLogWriter().info(
+          "### Number of Entries in Region " + rName + ": " + r.keys().size());
+    }
+    catch (Exception ex) {
+      fail("failed in putEntries()", ex);
+    }
+  }
+
+  /**
+   *
+   *
+   * @throws Exception
+   */
+  public void _testSpecificClientCQIsGIIedPart1() throws Exception {
+    Integer size = Integer.valueOf(10);
+    // slow start for dispatcher
+    serverVM0.invoke(ConflationDUnitTest.class, "setIsSlowStart",
+        new Object[] { "30000" });
+    serverVM1.invoke(ConflationDUnitTest.class, "setIsSlowStart",
+        new Object[] { "30000" });
+
+    // createClientCache(Integer.valueOf(PORT1), Integer.valueOf(PORT2), "1");
+    clientVM1.invoke(CQListGIIDUnitTest.class, "createClientCache",
+        new Object[] { Integer.valueOf(PORT1), Integer.valueOf(PORT2), "1" });
+    clientVM2.invoke(CQListGIIDUnitTest.class, "createClientCache",
+        new Object[] { Integer.valueOf(PORT1), Integer.valueOf(PORT2), "0" });
+
+    clientVM1.invoke(CQListGIIDUnitTest.class, "createCQ", new Object[] {
+        "testSpecificClientCQIsGIIed_0", cqs[0] });
+    clientVM1.invoke(CQListGIIDUnitTest.class, "executeCQ", new Object[] {
+        "testSpecificClientCQIsGIIed_0", Boolean.FALSE });
+    clientVM2.invoke(CQListGIIDUnitTest.class, "createCQ", new Object[] {
+        "testSpecificClientCQIsGIIed_0", cqs[0] });
+    clientVM2.invoke(CQListGIIDUnitTest.class, "executeCQ", new Object[] {
+        "testSpecificClientCQIsGIIed_0", Boolean.FALSE });
+
+    serverVM1.invoke(CQListGIIDUnitTest.class, "stopServer");
+
+    serverVM0.invoke(CQListGIIDUnitTest.class, "putEntries", new Object[] {
+        regions[0], size });
+
+    serverVM1.invoke(CQListGIIDUnitTest.class, "startServer");
+    Thread.sleep(3000); // TODO: Find a better 'n reliable alternative
+
+    serverVM0.invoke(CQListGIIDUnitTest.class, "VerifyCUMCQList", new Object[] {
+        size, Integer.valueOf(2) });
+    serverVM1.invoke(CQListGIIDUnitTest.class, "VerifyCUMCQList", new Object[] {
+        size, Integer.valueOf(1) });
+    serverVM0.invoke(ConflationDUnitTest.class, "unsetIsSlowStart");
+    serverVM1.invoke(ConflationDUnitTest.class, "unsetIsSlowStart");
+  }
+
+  /**
+   * This test asserts that cq list of a client for an event is not lost if that
+   * client's queue has been GII'ed to a server where that event already
+   * existed.
+   *
+   * @throws Exception
+   */
+  public void testClientCQNotLostAtGIIReceiver() throws Exception {
+    Integer size = Integer.valueOf(10);
+    VM serverVM2 = clientVM2;
+
+    int port3 = ((Integer)serverVM2.invoke(CQListGIIDUnitTest.class,
+        "createServerCache",
+        new Object[] { HARegionQueue.HA_EVICTION_POLICY_MEMORY })).intValue();
+
+    // slow start for dispatcher
+    serverVM0.invoke(ConflationDUnitTest.class, "setIsSlowStart",
+        new Object[] { "45000" });
+
+    // createClientCache(Integer.valueOf(PORT1), Integer.valueOf(PORT2), "1");
+    createClientCache(Integer.valueOf(PORT1), Integer.valueOf(PORT2),
+        Integer.valueOf(port3), "1");
+    try {
+    clientVM1.invoke(CQListGIIDUnitTest.class, "createClientCache",
+        new Object[] { Integer.valueOf(PORT1), Integer.valueOf(port3),
+            Integer.valueOf(PORT2), "1" });
+    try {
+    createCQ("testSpecificClientCQIsGIIed_0", cqs[0]);
+    executeCQ("testSpecificClientCQIsGIIed_0", Boolean.FALSE);
+    clientVM1.invoke(CQListGIIDUnitTest.class, "createCQ", new Object[] {
+        "testSpecificClientCQIsGIIed_0", cqs[0] });
+    clientVM1.invoke(CQListGIIDUnitTest.class, "executeCQ", new Object[] {
+        "testSpecificClientCQIsGIIed_0", Boolean.FALSE });
+
+    serverVM0.invoke(CQListGIIDUnitTest.class, "putEntries", new Object[] {
+        regions[0], size });
+
+    serverVM1.invoke(CQListGIIDUnitTest.class, "VerifyCUMCQList", new Object[] {
+        size, Integer.valueOf(1) });
+
+    serverVM2.invoke(CQListGIIDUnitTest.class, "stopServer");
+    Thread.sleep(3000); // TODO: Find a better 'n reliable alternative
+
+    serverVM0.invoke(CQListGIIDUnitTest.class, "VerifyCUMCQList", new Object[] {
+      size, Integer.valueOf(2) });
+    serverVM1.invoke(CQListGIIDUnitTest.class, "VerifyCUMCQList", new Object[] {
+        size, Integer.valueOf(2) });
+    } finally {
+      clientVM1.invoke(CQListGIIDUnitTest.class, "destroyClientPool");
+    }
+
+    } finally {
+      destroyClientPool();
+    }
+  }
+
+  public static void VerifyCUMCQList(Integer numOfKeys, Integer numOfClients) {
+    try {
+      Iterator iter = cache.getCacheServers().iterator();
+      if (iter.hasNext()) {
+        CacheServerImpl server = (CacheServerImpl)iter.next();
+        Map haContainer = server.getAcceptor().getCacheClientNotifier()
+            .getHaContainer();
+        Object[] keys = haContainer.keySet().toArray();
+        logger.fine("### numOfKeys :" + numOfKeys.intValue() + " keys.length : " + keys.length +
+            " haContainer size : " + haContainer.size());
+        assertEquals(numOfKeys.intValue(), keys.length);
+        for (int i = 0; i < numOfKeys.intValue(); i++) {
+          logger.fine("i=: " + i);
+          ClientUpdateMessageImpl cum = (ClientUpdateMessageImpl)haContainer
+              .get(keys[i]);
+          assertNotNull(cum);
+          assertNotNull(cum.getClientCqs());
+          assertEquals("This test may fail if the image provider gets an ack from client before providing image",
+              numOfClients.intValue(), cum.getClientCqs().size());
+        }
+      }
+    }
+    catch (Exception e) {
+      fail("failed in VerifyCUMCQList()" + e, e);
+    }
+  }
+
+  private static void stopOneBridgeServer(Integer port) {
+    try {
+      Iterator iter = cache.getCacheServers().iterator();
+      if (iter.hasNext()) {
+        CacheServer server = (CacheServer)iter.next();
+        if (server.getPort() == port.intValue()) {
+          server.stop();
+        }
+      }
+    }
+    catch (Exception e) {
+      fail("failed in stopOneBridgeServer()" + e);
+    }
+  }
+
+  public static void stopServer() {
+    try {
+      Iterator iter = cache.getCacheServers().iterator();
+      if (iter.hasNext()) {
+        CacheServer server = (CacheServer)iter.next();
+        server.stop();
+      }
+    }
+    catch (Exception e) {
+      fail("failed in stopServer()" + e);
+    }
+  }
+
+  public static void startServer() {
+    try {
+      Iterator iter = cache.getCacheServers().iterator();
+      if (iter.hasNext()) {
+        CacheServer server = (CacheServer)iter.next();
+        server.start();
+      }
+    }
+    catch (Exception e) {
+      fail("failed in startServer()" + e);
+    }
+  }
+
+  public static void waitTillMessagesAreDispatched(Integer port, Long waitLimit) {
+    try {
+      boolean dispatched = false;
+      Map haContainer = null;
+      haContainer = cache.getRegion(Region.SEPARATOR
+          + CacheServerImpl.generateNameForClientMsgsRegion(port.intValue()));
+      if (haContainer == null) {
+        Object[] servers = cache.getCacheServers().toArray();
+        for (int i = 0; i < servers.length; i++) {
+          if (port.intValue() == ((CacheServerImpl)servers[i]).getPort()) {
+            haContainer = ((CacheServerImpl)servers[i]).getAcceptor()
+                .getCacheClientNotifier().getHaContainer();
+            break;
+          }
+        }
+      }
+      long startTime = System.currentTimeMillis();
+      while (waitLimit.longValue() > (System.currentTimeMillis() - startTime)) {
+        if (haContainer.size() == 0) {
+          dispatched = true;
+          break;
+        }
+        try {
+          Thread.sleep(50);
+        }
+        catch (InterruptedException ie) {
+          fail("interrupted");
+        }
+      }
+      logger.fine("Exiting sleep, time elapsed was: "
+          + (System.currentTimeMillis() - startTime));
+      if (!dispatched) {
+        throw new Exception(
+            "Test tuning issue: The HARegionQueue is not fully drained, so cannot continue the test.");
+      }
+    }
+    catch (Exception e) {
+      fail("failed in waitTillMessagesAreDispatched()" + e);
+    }
+  }
+
+  public static void closeCache() {
+    if (cache != null && !cache.isClosed()) {
+      cache.close();
+      cache.getDistributedSystem().disconnect();
+    }
+  }
+
+}


Mime
View raw message