Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id F02D12004F5 for ; Sat, 12 Aug 2017 01:52:28 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id EE57216E494; Fri, 11 Aug 2017 23:52:28 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id EA55F16E48E for ; Sat, 12 Aug 2017 01:52:26 +0200 (CEST) Received: (qmail 74916 invoked by uid 500); 11 Aug 2017 23:52:21 -0000 Mailing-List: contact commits-help@geode.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.apache.org Delivered-To: mailing list commits@geode.apache.org Received: (qmail 73983 invoked by uid 99); 11 Aug 2017 23:52:19 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 11 Aug 2017 23:52:19 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E9A14F565D; Fri, 11 Aug 2017 23:52:18 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jbarrett@apache.org To: commits@geode.apache.org Date: Fri, 11 Aug 2017 23:52:56 -0000 Message-Id: <519343103a274970bce0094dbc9c9d33@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [40/52] [partial] geode-native git commit: GEODE-3165: Reogranized sources relative to the root for better CMake IDE integration. archived-at: Fri, 11 Aug 2017 23:52:29 -0000 http://git-wip-us.apache.org/repos/asf/geode-native/blob/6cbd424f/clicache/integration-test/ThinClientDeltaTestN.cs ---------------------------------------------------------------------- diff --git a/clicache/integration-test/ThinClientDeltaTestN.cs b/clicache/integration-test/ThinClientDeltaTestN.cs new file mode 100644 index 0000000..a7ee6b3 --- /dev/null +++ b/clicache/integration-test/ThinClientDeltaTestN.cs @@ -0,0 +1,914 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Collections.Generic; +using System.IO; +using System.Threading; + +#pragma warning disable 618 + +namespace Apache.Geode.Client.UnitTests +{ + using NUnit.Framework; + using Apache.Geode.DUnitFramework; + using Apache.Geode.Client.Tests; + using Apache.Geode.Client; + using DeltaEx = Apache.Geode.Client.Tests.DeltaEx; + + public class CqDeltaListener : ICqListener + { + + public CqDeltaListener() + { + m_deltaCount = 0; + m_valueCount = 0; + } + + public void OnEvent(CqEvent aCqEvent) + { + byte[] deltaValue = aCqEvent.getDeltaValue(); + DeltaTestImpl newValue = new DeltaTestImpl(); + DataInput input = CacheHelper.DCache.CreateDataInput(deltaValue); + newValue.FromDelta(input); + if (newValue.GetIntVar() == 5) + { + m_deltaCount++; + } + DeltaTestImpl fullObject = (DeltaTestImpl)(object)aCqEvent.getNewValue(); + if (fullObject.GetIntVar() == 5) + { + m_valueCount++; + } + + } + + public void OnError(CqEvent aCqEvent) + { + } + + public void Close() + { + } + + public int GetDeltaCount() + { + return m_deltaCount; + } + + public int GetValueCount() + { + return m_valueCount; + } + + private int m_deltaCount; + private int m_valueCount; + } + + public class DeltaTestAD : IGeodeDelta, IGeodeSerializable + { + private int _deltaUpdate; + private string _staticData; + + public static DeltaTestAD Create() + { + return new DeltaTestAD(); + } + + public DeltaTestAD() + { + _deltaUpdate = 1; + _staticData = "Data which don't get updated"; + } + + + #region IGeodeDelta Members + + public void FromDelta(DataInput input) + { + _deltaUpdate = input.ReadInt32(); + } + + public bool HasDelta() + { + _deltaUpdate++; + bool isDelta = (_deltaUpdate % 2) == 1; + Util.Log("In DeltaTestAD.HasDelta _deltaUpdate:" + _deltaUpdate + " : isDelta:" + isDelta); + return isDelta; + } + + public void ToDelta(DataOutput output) + { + output.WriteInt32(_deltaUpdate); + } + + #endregion + + #region IGeodeSerializable Members + + public uint ClassId + { + get { return 151; } + } + + public IGeodeSerializable FromData(DataInput input) + { + _deltaUpdate = input.ReadInt32(); + _staticData = input.ReadUTF(); + + return this; + } + + public uint ObjectSize + { + get { return (uint)(4 + _staticData.Length); } + } + + public void ToData(DataOutput output) + { + output.WriteInt32(_deltaUpdate); + output.WriteUTF(_staticData); + } + + public int DeltaUpdate + { + get { return _deltaUpdate; } + set { _deltaUpdate = value; } + } + + #endregion + } + + [TestFixture] + [Category("group1")] + [Category("unicast_only")] + [Category("generics")] + public class ThinClientDeltaTest : ThinClientRegionSteps + { + #region Private members + + private UnitProcess m_client1, m_client2; + private CqDeltaListener myCqListener; + + #endregion + + protected override ClientBase[] GetClients() + { + m_client1 = new UnitProcess(); + m_client2 = new UnitProcess(); + return new ClientBase[] { m_client1, m_client2 }; + } + + [TestFixtureTearDown] + public override void EndTests() + { + CacheHelper.StopJavaServers(); + base.EndTests(); + } + + [TearDown] + public override void EndTest() + { + try + { + CacheHelper.ClearEndpoints(); + CacheHelper.ClearLocators(); + } + finally + { + CacheHelper.StopJavaServers(); + CacheHelper.StopJavaLocators(); + } + base.EndTest(); + } + + public void createLRURegionAndAttachPool(string regionName, string poolName) + { + CacheHelper.CreateLRUTCRegion_Pool(regionName, true, true, null, null, poolName, false, 3); + } + + public void createRegionAndAttachPool(string regionName, string poolName) + { + createRegionAndAttachPool(regionName, poolName, false); + } + + public void createRegionAndAttachPool(string regionName, string poolName, bool cloningEnabled) + { + CacheHelper.CreateTCRegion_Pool(regionName, true, true, null, null, poolName, false, + false, cloningEnabled); + } + + //public void createPooledRegion(string regionName, string poolName, string endpoints, string locators) + //{ + // CacheHelper.CreateTCRegion_Pool(regionName, true, true, null, endpoints, locators, poolName, false); + //} + + public void createPool(string name, string locators, string serverGroup, + int redundancy, bool subscription) + { + CacheHelper.CreatePool(name, locators, serverGroup, redundancy, subscription); + } + + public void createExpirationRegion(string name, string poolName) + { + IRegion region = CacheHelper.CreateExpirationRegion(name, + poolName, ExpirationAction.LocalInvalidate, 5); + } + + public void createExpirationRegion(string name) + { + createExpirationRegion(name, null); + } + + public void CreateRegion(string name) + { + CreateRegion(name, false); + } + + public void CreateRegion(string name, bool enableNotification) + { + CreateRegion(name, enableNotification, false); + } + public void CreateRegion(string name, bool enableNotification, bool cloningEnabled) + { + Apache.Geode.Client.RegionAttributes attrs; + AttributesFactory attrFac = new AttributesFactory(); + attrFac.SetCacheListener(new SimpleCacheListener()); + attrFac.SetCloningEnabled(cloningEnabled); + attrs = attrFac.CreateRegionAttributes(); + CacheHelper.CreateRegion(name, attrs); + } + + //public void CreateOverflowRegion(string name, uint entriesLimit) + //{ + // AttributesFactory af = new AttributesFactory(); + // af.SetScope(ScopeType.DistributedAck); + // af.SetCachingEnabled(true); + // af.SetClientNotificationEnabled(true); + // af.SetLruEntriesLimit(entriesLimit);// LRU Entry limit set to 3 + + // af.SetDiskPolicy(DiskPolicyType.Overflows); + // Properties bdbProperties = Properties.Create(); + // bdbProperties.Insert("CacheSizeGb", "0"); + // bdbProperties.Insert("CacheSizeMb", "512"); + // bdbProperties.Insert("PageSize", "65536"); + // bdbProperties.Insert("MaxFileSize", "512000000"); + // String wdPath = Directory.GetCurrentDirectory(); + // String absPersistenceDir = wdPath + "/absBDB"; + // String absEnvDir = wdPath + "/absBDBEnv"; + // bdbProperties.Insert("PersistenceDirectory", absPersistenceDir); + // bdbProperties.Insert("EnvironmentDirectory", absEnvDir); + // af.SetPersistenceManager("BDBImpl", "createBDBInstance", bdbProperties); + + // CacheHelper.CreateRegion(name, af.CreateRegionAttributes()); + //} + + void DoPutWithDelta() + { + try + { + Serializable.RegisterTypeGeneric(DeltaEx.create, CacheHelper.DCache); + } + catch (IllegalStateException) + { + //do nothng + } + string cKey = m_keys[0]; + DeltaEx val = new DeltaEx(); + IRegion reg = CacheHelper.GetRegion("DistRegionAck"); + + reg[cKey] = (object)val; + val.SetDelta(true); + reg[cKey] = (object)val; + + DeltaEx val1 = new DeltaEx(0); // In this case JAVA side will throw invalid DeltaException + reg[cKey] = (object)val1; + val1.SetDelta(true); + reg[cKey] = (object)val1; + if (DeltaEx.ToDeltaCount != 2) + { + Util.Log("DeltaEx.ToDataCount = " + DeltaEx.ToDataCount); + Assert.Fail(" Delta count should have been 2, is " + DeltaEx.ToDeltaCount); + } + if (DeltaEx.ToDataCount != 3) + Assert.Fail("Data count should have been 3, is " + DeltaEx.ToDataCount); + DeltaEx.ToDeltaCount = 0; + DeltaEx.ToDataCount = 0; + DeltaEx.FromDataCount = 0; + DeltaEx.FromDeltaCount = 0; + } + + void Do_Put_Contains_Remove_WithDelta() + { + try + { + Serializable.RegisterTypeGeneric(DeltaEx.create, CacheHelper.DCache); + } + catch (IllegalStateException) + { + //do nothng + } + string cKey = m_keys[0]; + DeltaEx val = new DeltaEx(); + IRegion reg = CacheHelper.GetRegion("DistRegionAck"); + + reg[cKey] = (object)val; + val.SetDelta(true); + reg[cKey] = (object)val; + + DeltaEx val1 = new DeltaEx(0); // In this case JAVA side will throw invalid DeltaException + reg[cKey] = (object)val1; + val1.SetDelta(true); + reg[cKey] = (object)val1; + if (DeltaEx.ToDeltaCount != 2) + { + Util.Log("DeltaEx.ToDataCount = " + DeltaEx.ToDataCount); + Assert.Fail(" Delta count should have been 2, is " + DeltaEx.ToDeltaCount); + } + if (DeltaEx.ToDataCount != 3) + Assert.Fail("Data count should have been 3, is " + DeltaEx.ToDataCount); + DeltaEx.ToDeltaCount = 0; + DeltaEx.ToDataCount = 0; + DeltaEx.FromDataCount = 0; + DeltaEx.FromDeltaCount = 0; + + // Try Contains with key & value that are present. Result should be true. + KeyValuePair myentry = new KeyValuePair(cKey, val1); + bool containsOpflag = reg.Contains(myentry); + Assert.IsTrue(containsOpflag, "Result should be true as key & value are present"); + + // Try Remove with key & value that are present. Result should be true. + bool removeOpflag = reg.Remove(cKey); + Assert.IsTrue(removeOpflag, "Result should be true as key & value are present"); + + //Check Contains with removed entry. Result should be false. + bool updatedcontainsOpflag = reg.Contains(myentry); + Assert.IsFalse(updatedcontainsOpflag, "Result should be false as key & value are removed"); + } + + void DoNotificationWithDelta() + { + try + { + Serializable.RegisterTypeGeneric(DeltaEx.create, CacheHelper.DCache); + } + catch (IllegalStateException) + { + //do nothig. + } + + string cKey = m_keys[0]; + DeltaEx val = new DeltaEx(); + IRegion reg = CacheHelper.GetRegion("DistRegionAck"); + reg[cKey] = val; + val.SetDelta(true); + reg[cKey] = val; + + string cKey1 = m_keys[1]; + DeltaEx val1 = new DeltaEx(); + reg[cKey1] = val1; + val1.SetDelta(true); + reg[cKey1] = val1; + DeltaEx.ToDeltaCount = 0; + DeltaEx.ToDataCount = 0; + } + + void DoNotificationWithDefaultCloning() + { + string cKey = m_keys[0]; + DeltaTestImpl val = new DeltaTestImpl(); + IRegion reg = CacheHelper.GetRegion("DistRegionAck"); + reg[cKey] = val; + val.SetIntVar(2); + val.SetDelta(true); + reg[cKey] = val; + + javaobject.PdxDelta pd = new javaobject.PdxDelta(1001); + for (int i = 0; i < 10; i++) + { + reg["pdxdelta"] = pd; + } + } + + void DoNotificationWithDeltaLRU() + { + try + { + Serializable.RegisterTypeGeneric(DeltaEx.create, CacheHelper.DCache); + } + catch (IllegalStateException) + { + //do nothig. + } + + string cKey1 = "key1"; + string cKey2 = "key2"; + string cKey3 = "key3"; + string cKey4 = "key4"; + string cKey5 = "key5"; + string cKey6 = "key6"; + DeltaEx val1 = new DeltaEx(); + DeltaEx val2 = new DeltaEx(); + IRegion reg = CacheHelper.GetRegion("DistRegionAck"); + reg[cKey1] = val1; + reg[cKey2] = val1; + reg[cKey3] = val1; + reg[cKey4] = val1; + reg[cKey5] = val1; + reg[cKey6] = val1; + val2.SetDelta(true); + reg[cKey1] = val2; + + DeltaEx.ToDeltaCount = 0; + DeltaEx.ToDataCount = 0; + } + + void DoExpirationWithDelta() + { + try + { + Serializable.RegisterTypeGeneric(DeltaEx.create, CacheHelper.DCache); + } + catch (IllegalStateException) + { + //do nothig. + } + + DeltaEx val1 = new DeltaEx(); + IRegion reg = CacheHelper.GetRegion("DistRegionAck"); + reg[1] = val1; + // Sleep 10 seconds to allow expiration of entry in client 2 + Thread.Sleep(10000); + val1.SetDelta(true); + reg[1] = val1; + DeltaEx.ToDeltaCount = 0; + DeltaEx.ToDataCount = 0; + } + + void DoCqWithDelta() + { + string cKey1 = "key1"; + IRegion reg = CacheHelper.GetRegion("DistRegionAck"); + DeltaTestImpl value = new DeltaTestImpl(); + reg[cKey1] = value; + value.SetIntVar(5); + value.SetDelta(true); + reg[cKey1] = value; + } + + void initializeDeltaClientAD() + { + try + { + Serializable.RegisterTypeGeneric(DeltaTestAD.Create, CacheHelper.DCache); + } + catch (IllegalStateException) + { + //do nothng + } + } + + void DoDeltaAD_C1_1() + { + DeltaTestAD val = new DeltaTestAD(); + IRegion reg = CacheHelper.GetRegion("DistRegionAck"); + reg.GetSubscriptionService().RegisterAllKeys(); + Util.Log("clientAD1 put"); + reg[1] = val; + Util.Log("clientAD1 put done"); + } + + void DoDeltaAD_C2_1() + { + IRegion reg = CacheHelper.GetRegion("DistRegionAck"); + + Util.Log("clientAD2 get"); + DeltaTestAD val = (DeltaTestAD)reg[1]; + + Assert.AreEqual(2, val.DeltaUpdate); + Util.Log("clientAD2 get done"); + reg[1] = val; + Util.Log("clientAD2 put done"); + + javaobject.PdxDelta pd = new javaobject.PdxDelta(1001); + for (int i = 0; i < 10; i++) + { + reg["pdxdelta"] = pd; + } + } + + void DoDeltaAD_C1_afterC2Put() + { + Thread.Sleep(15000); + DeltaTestAD val = null; + IRegion reg = CacheHelper.GetRegion("DistRegionAck"); + Util.Log("client fetching entry from local cache"); + val = (DeltaTestAD)reg.GetEntry(1).Value; + Assert.IsNotNull(val); + Assert.AreEqual(3, val.DeltaUpdate); + Util.Log("done"); + + System.Threading.Thread.Sleep(5000); + //Assert.Greater(javaobject.PdxDelta.GotDelta, 7, "this should have recieve delta"); + javaobject.PdxDelta pd = (javaobject.PdxDelta)(reg.GetLocalView()["pdxdelta"]); + Assert.Greater(pd.Delta, 7, "this should have recieve delta"); + } + + void runDeltaWithAppdomian(bool cloningenable) + { + CacheHelper.SetupJavaServers(true, "cacheserver_with_deltaAD.xml"); + CacheHelper.StartJavaLocator(1, "GFELOC1"); + CacheHelper.StartJavaServerWithLocators(1, "GFECS5", 1); + string regionName = "DistRegionAck"; + // if (usePools) + { + //CacheHelper.CreateTCRegion_Pool_AD("DistRegionAck", false, false, null, null, CacheHelper.Locators, "__TEST_POOL1__", false, false, false); + m_client1.Call(CacheHelper.CreateTCRegion_Pool_AD1, regionName, false, true, CacheHelper.Locators, (string)"__TEST_POOL1__", true, cloningenable); + m_client2.Call(CacheHelper.CreateTCRegion_Pool_AD1, regionName, false, true, CacheHelper.Locators, (string)"__TEST_POOL1__", false, cloningenable); + + // m_client1.Call(createPool, "__TEST_POOL1__", CacheHelper.Locators, (string)null, 0, false); + // m_client1.Call(createRegionAndAttachPool, "DistRegionAck", "__TEST_POOL1__"); + } + + + m_client1.Call(initializeDeltaClientAD); + m_client2.Call(initializeDeltaClientAD); + + m_client1.Call(DoDeltaAD_C1_1); + m_client2.Call(DoDeltaAD_C2_1); + m_client1.Call(DoDeltaAD_C1_afterC2Put); + m_client1.Call(Close); + m_client2.Call(Close); + + CacheHelper.StopJavaServer(1); + CacheHelper.StopJavaLocator(1); + CacheHelper.ClearEndpoints(); + CacheHelper.ClearLocators(); + } + + void runPutWithDelta() + { + CacheHelper.SetupJavaServers(true, "cacheserver_with_delta.xml"); + CacheHelper.StartJavaLocator(1, "GFELOC1"); + CacheHelper.StartJavaServerWithLocators(1, "GFECS5", 1); + m_client1.Call(createPool, "__TEST_POOL1__", CacheHelper.Locators, (string)null, 0, false); + m_client1.Call(createRegionAndAttachPool, "DistRegionAck", "__TEST_POOL1__"); + + m_client1.Call(DoPutWithDelta); + m_client1.Call(Close); + + CacheHelper.StopJavaServer(1); + CacheHelper.StopJavaLocator(1); + CacheHelper.ClearEndpoints(); + CacheHelper.ClearLocators(); + } + + void runPut_Contains_Remove_WithDelta() + { + CacheHelper.SetupJavaServers(true, "cacheserver_with_delta.xml"); + CacheHelper.StartJavaLocator(1, "GFELOC1"); + CacheHelper.StartJavaServerWithLocators(1, "GFECS5", 1); + m_client1.Call(createPool, "__TEST_POOL1__", CacheHelper.Locators, (string)null, 0, false); + m_client1.Call(createRegionAndAttachPool, "DistRegionAck", "__TEST_POOL1__"); + + m_client1.Call(Do_Put_Contains_Remove_WithDelta); + m_client1.Call(Close); + + CacheHelper.StopJavaServer(1); + CacheHelper.StopJavaLocator(1); + CacheHelper.ClearEndpoints(); + CacheHelper.ClearLocators(); + } + + void registerClassCl2() + { + try + { + Serializable.RegisterTypeGeneric(DeltaEx.create, CacheHelper.DCache); + } + catch (IllegalStateException) + { + //do nothing + } + IRegion reg = CacheHelper.GetRegion("DistRegionAck"); + + reg.GetSubscriptionService().RegisterRegex(".*"); + AttributesMutator attrMutator = reg.AttributesMutator; + attrMutator.SetCacheListener(new SimpleCacheListener()); + } + + void registerClassDeltaTestImpl() + { + try + { + Serializable.RegisterTypeGeneric(DeltaTestImpl.CreateDeserializable, CacheHelper.DCache); + } + catch (IllegalStateException) + { + // ARB: ignore exception caused by type reregistration. + } + DeltaTestImpl.ResetDataCount(); + + Thread.Sleep(2000); + IRegion reg = CacheHelper.GetRegion("DistRegionAck"); + try + { + reg.GetSubscriptionService().RegisterRegex(".*"); + } + catch (Exception) + { + // ARB: ignore regex exception for missing notification channel. + } + } + + void registerCq() + { + Pool thePool = CacheHelper.DCache.GetPoolManager().Find("__TEST_POOL1__"); + QueryService cqService = null; + cqService = thePool.GetQueryService(); + CqAttributesFactory attrFac = new CqAttributesFactory(); + myCqListener = new CqDeltaListener(); + attrFac.AddCqListener(myCqListener); + CqAttributes cqAttr = attrFac.Create(); + CqQuery theQuery = cqService.NewCq("select * from /DistRegionAck d where d.intVar > 4", cqAttr, false); + theQuery.Execute(); + } + + void VerifyDeltaCount() + { + Thread.Sleep(1000); + Util.Log("Total Data count" + DeltaEx.FromDataCount); + Util.Log("Total Data count" + DeltaEx.FromDeltaCount); + if (DeltaEx.FromDataCount != 3) + Assert.Fail("Count of fromData called should be 3 "); + if (DeltaEx.FromDeltaCount != 2) + Assert.Fail("Count of fromDelta called should be 2 "); + if (SimpleCacheListener.isSuccess == false) + Assert.Fail("Listener failure"); + SimpleCacheListener.isSuccess = false; + if (DeltaEx.CloneCount != 2) + Assert.Fail("Clone count should be 2, is " + DeltaEx.CloneCount); + + DeltaEx.FromDataCount = 0; + DeltaEx.FromDeltaCount = 0; + DeltaEx.CloneCount = 0; + } + + void VerifyCloning() + { + Thread.Sleep(1000); + string cKey = m_keys[0]; + IRegion reg = CacheHelper.GetRegion("DistRegionAck"); + DeltaTestImpl val = reg[cKey] as DeltaTestImpl; + + if (val.GetIntVar() != 2) + Assert.Fail("Int value after cloning should be 2, is " + val.GetIntVar()); + if (DeltaTestImpl.GetFromDataCount() != 2) + Assert.Fail("After cloning, fromDataCount should have been 2, is " + DeltaTestImpl.GetFromDataCount()); + if (DeltaTestImpl.GetToDataCount() != 1) + Assert.Fail("After cloning, toDataCount should have been 1, is " + DeltaTestImpl.GetToDataCount()); + + System.Threading.Thread.Sleep(5000); + //Assert.Greater(javaobject.PdxDelta.GotDelta, 7, "this should have recieve delta"); + javaobject.PdxDelta pd = (javaobject.PdxDelta)(reg.GetLocalView()["pdxdelta"]); + Assert.Greater(pd.Delta, 7, "this should have recieve delta"); + } + + void VerifyDeltaCountLRU() + { + Thread.Sleep(1000); + if (DeltaEx.FromDataCount != 8) + { + Util.Log("DeltaEx.FromDataCount = " + DeltaEx.FromDataCount); + Util.Log("DeltaEx.FromDeltaCount = " + DeltaEx.FromDeltaCount); + Assert.Fail("Count should have been 8. 6 for common put and two when pulled from database and deserialized"); + } + if (DeltaEx.FromDeltaCount != 1) + { + Util.Log("DeltaEx.FromDeltaCount = " + DeltaEx.FromDeltaCount); + Assert.Fail("Count should have been 1"); + } + DeltaEx.FromDataCount = 0; + DeltaEx.FromDeltaCount = 0; + } + + void VerifyCqDeltaCount() + { + // Wait for Cq event processing in listener + Thread.Sleep(1000); + if (myCqListener.GetDeltaCount() != 1) + { + Assert.Fail("Delta from CQ event does not have expected value"); + } + if (myCqListener.GetValueCount() != 1) + { + Assert.Fail("Value from CQ event is incorrect"); + } + } + void VerifyExpirationDeltaCount() + { + Thread.Sleep(1000); + if (DeltaEx.FromDataCount != 2) + Assert.Fail("Count should have been 2."); + if (DeltaEx.FromDeltaCount != 0) + Assert.Fail("Count should have been 0."); + DeltaEx.FromDataCount = 0; + DeltaEx.FromDeltaCount = 0; + } + + void runNotificationWithDelta() + { + CacheHelper.SetupJavaServers(true, "cacheserver_with_delta.xml"); + CacheHelper.StartJavaLocator(1, "GFELOC1"); + CacheHelper.StartJavaServerWithLocators(1, "GFECS5", 1); + + m_client1.Call(createPool, "__TEST_POOL1__", CacheHelper.Locators, (string)null, 0, true); + m_client1.Call(createRegionAndAttachPool, "DistRegionAck", "__TEST_POOL1__", true); + + m_client2.Call(createPool, "__TEST_POOL1__", CacheHelper.Locators, (string)null, 0, true); + m_client2.Call(createRegionAndAttachPool, "DistRegionAck", "__TEST_POOL1__", true); + + m_client2.Call(registerClassCl2); + + m_client1.Call(DoNotificationWithDelta); + m_client2.Call(VerifyDeltaCount); + m_client1.Call(Close); + m_client2.Call(Close); + + CacheHelper.StopJavaServer(1); + CacheHelper.StopJavaLocator(1); + CacheHelper.ClearEndpoints(); + CacheHelper.ClearLocators(); + } + + void runNotificationWithDefaultCloning() + { + CacheHelper.SetupJavaServers(true, "cacheserver_with_delta_test_impl.xml"); + CacheHelper.StartJavaLocator(1, "GFELOC1"); + CacheHelper.StartJavaServerWithLocators(1, "GFECS5", 1); + + m_client1.Call(createPool, "__TEST_POOL1__", CacheHelper.Locators, (string)null, 0, true); + m_client1.Call(createRegionAndAttachPool, "DistRegionAck", "__TEST_POOL1__", true); + + m_client2.Call(createPool, "__TEST_POOL1__", CacheHelper.Locators, (string)null, 0, true); + m_client2.Call(createRegionAndAttachPool, "DistRegionAck", "__TEST_POOL1__", true); + + m_client1.Call(registerClassDeltaTestImpl); + m_client2.Call(registerClassDeltaTestImpl); + + m_client1.Call(DoNotificationWithDefaultCloning); + m_client2.Call(VerifyCloning); + m_client1.Call(Close); + m_client2.Call(Close); + + CacheHelper.StopJavaServer(1); + CacheHelper.StopJavaLocator(1); + CacheHelper.ClearEndpoints(); + CacheHelper.ClearLocators(); + } + + void runNotificationWithDeltaWithOverFlow() + { + CacheHelper.SetupJavaServers(true, "cacheserver_with_delta.xml"); + CacheHelper.StartJavaLocator(1, "GFELOC1"); + CacheHelper.StartJavaServerWithLocators(1, "GFECS1", 1); + + m_client1.Call(createPool, "__TEST_POOL1__", CacheHelper.Locators, (string)null, 0, true); + m_client1.Call(createLRURegionAndAttachPool, "DistRegionAck", "__TEST_POOL1__"); + + m_client2.Call(createPool, "__TEST_POOL1__", CacheHelper.Locators, (string)null, 0, true); + m_client2.Call(createLRURegionAndAttachPool, "DistRegionAck", "__TEST_POOL1__"); + + m_client2.Call(registerClassCl2); + + m_client1.Call(DoNotificationWithDeltaLRU); + m_client2.Call(VerifyDeltaCountLRU); + m_client1.Call(Close); + m_client2.Call(Close); + CacheHelper.StopJavaServer(1); + CacheHelper.StopJavaLocator(1); + CacheHelper.ClearEndpoints(); + CacheHelper.ClearLocators(); + } + + void runCqWithDelta() + { + CacheHelper.SetupJavaServers(true, "cacheserver_with_delta_test_impl.xml"); + CacheHelper.StartJavaLocator(1, "GFELOC1"); + CacheHelper.StartJavaServerWithLocators(1, "GFECS5", 1); + + m_client1.Call(createPool, "__TEST_POOL1__", CacheHelper.Locators, (string)null, 0, true); + m_client1.Call(createRegionAndAttachPool, "DistRegionAck", "__TEST_POOL1__"); + + m_client2.Call(createPool, "__TEST_POOL1__", CacheHelper.Locators, (string)null, 0, true); + m_client2.Call(createRegionAndAttachPool, "DistRegionAck", "__TEST_POOL1__"); + + m_client1.Call(registerClassDeltaTestImpl); + m_client2.Call(registerClassDeltaTestImpl); + m_client2.Call(registerCq); + + m_client1.Call(DoCqWithDelta); + m_client2.Call(VerifyCqDeltaCount); + m_client1.Call(Close); + m_client2.Call(Close); + + CacheHelper.StopJavaServer(1); + CacheHelper.StopJavaLocator(1); + CacheHelper.ClearEndpoints(); + CacheHelper.ClearLocators(); + } + + void runExpirationWithDelta() + { + CacheHelper.SetupJavaServers(true, "cacheserver_with_delta.xml"); + CacheHelper.StartJavaLocator(1, "GFELOC1"); + CacheHelper.StartJavaServerWithLocators(1, "GFECS5", 1); + + m_client1.Call(createPool, "__TEST_POOL1__", CacheHelper.Locators, (string)null, 0, true); + m_client1.Call(createRegionAndAttachPool, "DistRegionAck", "__TEST_POOL1__"); + + m_client2.Call(createPool, "__TEST_POOL1__", CacheHelper.Locators, (string)null, 0, true); + m_client2.Call(createExpirationRegion, "DistRegionAck", "__TEST_POOL1__"); + + m_client2.Call(registerClassCl2); + + m_client1.Call(DoExpirationWithDelta); + m_client2.Call(VerifyExpirationDeltaCount); + m_client1.Call(Close); + m_client2.Call(Close); + + CacheHelper.StopJavaServer(1); + CacheHelper.StopJavaLocator(1); + CacheHelper.ClearEndpoints(); + CacheHelper.ClearLocators(); + } + + //#region Tests + + [Test] + public void PutWithDeltaAD() + { + runDeltaWithAppdomian(false); + runDeltaWithAppdomian(true);//cloning enable + } + + [Test] + public void PutWithDelta() + { + runPutWithDelta(); + } + + [Test] + public void Put_Contains_Remove_WithDelta() + { + runPut_Contains_Remove_WithDelta(); + } + + [Test] + public void NotificationWithDelta() + { + runNotificationWithDelta(); + } + + [Test] + public void NotificationWithDefaultCloning() + { + runNotificationWithDefaultCloning(); + } + + [Test] + public void NotificationWithDeltaWithOverFlow() + { + runNotificationWithDeltaWithOverFlow(); + } + + [Test] + public void CqWithDelta() + { + runCqWithDelta(); + } + + [Test] + public void ExpirationWithDelta() + { + runExpirationWithDelta(); + } + + //#endregion + } +} + http://git-wip-us.apache.org/repos/asf/geode-native/blob/6cbd424f/clicache/integration-test/ThinClientDurableCqTestsN.cs ---------------------------------------------------------------------- diff --git a/clicache/integration-test/ThinClientDurableCqTestsN.cs b/clicache/integration-test/ThinClientDurableCqTestsN.cs new file mode 100644 index 0000000..b6b0a01 --- /dev/null +++ b/clicache/integration-test/ThinClientDurableCqTestsN.cs @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using NUnit.Framework; +using Apache.Geode.DUnitFramework; +using Apache.Geode.Client.Tests; +using Apache.Geode.Client; +using System; + +namespace Apache.Geode.Client.UnitTests +{ + + [TestFixture] + [Category("group3")] + [Category("unicast_only")] + [Category("generics")] + public class ThinClientDurableCqTests : ThinClientRegionSteps + { + #region Private Members + private UnitProcess m_client1 = null; + private UnitProcess m_client2 = null; + private string[] m_client1DurableCqNames = { "client1DurableCQ1", "client1DurableCQ2", "client1DurableCQ3", "client1DurableCQ4", "client1DurableCQ5", "client1DurableCQ6", "client1DurableCQ7", "client1DurableCQ8" }; + private string[] m_client2DurableCqNames = { "client2DurableCQ1", "client2DurableCQ2", "client2DurableCQ3", "client2DurableCQ4", "client2DurableCQ5", "client2DurableCQ6", "client2DurableCQ7", "client2DurableCQ8" }; + private static string[] QueryRegionNames = { "ListDurableCqs" }; + private static int m_NumberOfCqs = 110; + #endregion + + #region Test helper methods + + protected override ClientBase[] GetClients() + { + m_client1 = new UnitProcess(); + m_client2 = new UnitProcess(); + return new ClientBase[] { m_client1, m_client2 }; + } + + public void InitDurableClient(string locators, int redundancyLevel, + string durableClientId, int durableTimeout) + { + CacheHelper.InitConfigForDurable_Pool(locators, redundancyLevel, durableClientId, durableTimeout); + CacheHelper.CreateTCRegion_Pool(QueryRegionNames[0], true, true, (ICacheListener)null, CacheHelper.Locators, "__TESTPOOL1_", true); + } + + + public void RegisterCqsClient1(bool isRecycle) + { + Util.Log("Registering Cqs for client1."); + CqAttributesFactory cqAf = new CqAttributesFactory(); + CqAttributes attributes = cqAf.Create(); + QueryService qs = CacheHelper.DCache.GetPoolManager().Find("__TESTPOOL1_").GetQueryService(); + + if (!isRecycle) + { + qs.NewCq(m_client1DurableCqNames[0], "Select * From /" + QueryRegionNames[0] + " where id = 1", attributes, true).ExecuteWithInitialResults(); + qs.NewCq(m_client1DurableCqNames[1], "Select * From /" + QueryRegionNames[0] + " where id = 10", attributes, true).ExecuteWithInitialResults(); + qs.NewCq(m_client1DurableCqNames[2], "Select * From /" + QueryRegionNames[0], attributes, false).ExecuteWithInitialResults(); + qs.NewCq(m_client1DurableCqNames[3], "Select * From /" + QueryRegionNames[0] + " where id = 3", attributes, false).ExecuteWithInitialResults(); + } + else + { + qs.NewCq(m_client1DurableCqNames[4], "Select * From /" + QueryRegionNames[0] + " where id = 1", attributes, true).ExecuteWithInitialResults(); + qs.NewCq(m_client1DurableCqNames[5], "Select * From /" + QueryRegionNames[0] + " where id = 10", attributes, true).ExecuteWithInitialResults(); + qs.NewCq(m_client1DurableCqNames[6], "Select * From /" + QueryRegionNames[0], attributes, false).ExecuteWithInitialResults(); + qs.NewCq(m_client1DurableCqNames[7], "Select * From /" + QueryRegionNames[0] + " where id = 3", attributes, false).ExecuteWithInitialResults(); + } + + } + + public void RegisterCqsClient1MultipleChunks() + { + Util.Log("Registering Cqs for client1 for multiple chunks."); + CqAttributesFactory cqAf = new CqAttributesFactory(); + CqAttributes attributes = cqAf.Create(); + QueryService qs = CacheHelper.DCache.GetPoolManager().Find("__TESTPOOL1_").GetQueryService(); + + for (int i = 0; i < m_NumberOfCqs; i++) + qs.NewCq("MyCq_" + i.ToString(), "Select * From /" + QueryRegionNames[0] + " where id = 1", attributes, true).ExecuteWithInitialResults(); + + } + + public void RegisterCqsClient2(bool isRecycle) + { + Util.Log("Registering Cqs for client2."); + CqAttributesFactory cqAf = new CqAttributesFactory(); + CqAttributes attributes = cqAf.Create(); + QueryService qs = CacheHelper.DCache.GetPoolManager().Find("__TESTPOOL1_").GetQueryService(); + + if (!isRecycle) + { + qs.NewCq(m_client2DurableCqNames[0], "Select * From /" + QueryRegionNames[0] + " where id = 1", attributes, true).ExecuteWithInitialResults(); + qs.NewCq(m_client2DurableCqNames[1], "Select * From /" + QueryRegionNames[0] + " where id = 10", attributes, true).ExecuteWithInitialResults(); + qs.NewCq(m_client2DurableCqNames[2], "Select * From /" + QueryRegionNames[0], attributes, true).ExecuteWithInitialResults(); + qs.NewCq(m_client2DurableCqNames[3], "Select * From /" + QueryRegionNames[0] + " where id = 3", attributes, true).ExecuteWithInitialResults(); + } + else + { + qs.NewCq(m_client2DurableCqNames[4], "Select * From /" + QueryRegionNames[0] + " where id = 1", attributes, true).ExecuteWithInitialResults(); + qs.NewCq(m_client2DurableCqNames[5], "Select * From /" + QueryRegionNames[0] + " where id = 10", attributes, true).ExecuteWithInitialResults(); + qs.NewCq(m_client2DurableCqNames[6], "Select * From /" + QueryRegionNames[0], attributes, true).ExecuteWithInitialResults(); + qs.NewCq(m_client2DurableCqNames[7], "Select * From /" + QueryRegionNames[0] + " where id = 3", attributes, true).ExecuteWithInitialResults(); + } + } + + public void VerifyDurableCqListClient1MultipleChunks() + { + Util.Log("Verifying durable Cqs for client1."); + QueryService qs = CacheHelper.DCache.GetPoolManager().Find("__TESTPOOL1_").GetQueryService(); + System.Collections.Generic.List durableCqList = qs.GetAllDurableCqsFromServer(); + Assert.AreNotEqual(null, durableCqList); + + Assert.AreEqual(m_NumberOfCqs, durableCqList.Count, "Durable CQ count sholuld be %d", m_NumberOfCqs); + + Util.Log("Completed verifying durable Cqs for client1."); + } + + public void VerifyDurableCqListClient1(bool isRecycle) + { + Util.Log("Verifying durable Cqs for client1."); + QueryService qs = CacheHelper.DCache.GetPoolManager().Find("__TESTPOOL1_").GetQueryService(); + System.Collections.Generic.List durableCqList = qs.GetAllDurableCqsFromServer(); + Assert.AreNotEqual(null, durableCqList); + + if (!isRecycle) + { + Assert.AreEqual(2, durableCqList.Count, "Durable CQ count sholuld be 2"); + Assert.AreEqual(true, durableCqList.Contains(m_client1DurableCqNames[0])); + Assert.AreEqual(true, durableCqList.Contains(m_client1DurableCqNames[1])); + } + else + { + Assert.AreEqual(4, durableCqList.Count, "Durable CQ count sholuld be 4"); + Assert.AreEqual(true, durableCqList.Contains(m_client1DurableCqNames[0])); + Assert.AreEqual(true, durableCqList.Contains(m_client1DurableCqNames[1])); + Assert.AreEqual(true, durableCqList.Contains(m_client1DurableCqNames[4])); + Assert.AreEqual(true, durableCqList.Contains(m_client1DurableCqNames[5])); + } + Util.Log("Completed verifying durable Cqs for client1."); + } + + public void VerifyDurableCqListClient2(bool isRecycle) + { + Util.Log("Verifying durable Cqs for client2."); + QueryService qs = CacheHelper.DCache.GetPoolManager().Find("__TESTPOOL1_").GetQueryService(); + System.Collections.Generic.List durableCqList = qs.GetAllDurableCqsFromServer(); + Assert.AreNotEqual(null, durableCqList); + + if (!isRecycle) + { + Assert.AreEqual(4, durableCqList.Count, "Durable CQ count sholuld be 4"); + Assert.AreEqual(true, durableCqList.Contains(m_client2DurableCqNames[0])); + Assert.AreEqual(true, durableCqList.Contains(m_client2DurableCqNames[1])); + Assert.AreEqual(true, durableCqList.Contains(m_client2DurableCqNames[2])); + Assert.AreEqual(true, durableCqList.Contains(m_client2DurableCqNames[3])); + } + else + { + Assert.AreEqual(8, durableCqList.Count, "Durable CQ count sholuld be 8"); + Assert.AreEqual(true, durableCqList.Contains(m_client2DurableCqNames[0])); + Assert.AreEqual(true, durableCqList.Contains(m_client2DurableCqNames[1])); + Assert.AreEqual(true, durableCqList.Contains(m_client2DurableCqNames[2])); + Assert.AreEqual(true, durableCqList.Contains(m_client2DurableCqNames[3])); + Assert.AreEqual(true, durableCqList.Contains(m_client2DurableCqNames[4])); + Assert.AreEqual(true, durableCqList.Contains(m_client2DurableCqNames[5])); + Assert.AreEqual(true, durableCqList.Contains(m_client2DurableCqNames[6])); + Assert.AreEqual(true, durableCqList.Contains(m_client2DurableCqNames[7])); + } + } + + public void VerifyEmptyDurableCqListClient1() + { + Util.Log("Verifying empty durable Cqs for client1."); + QueryService qs = CacheHelper.DCache.GetPoolManager().Find("__TESTPOOL1_").GetQueryService(); + System.Collections.Generic.List durableCqList = qs.GetAllDurableCqsFromServer(); + Assert.AreNotEqual(null, durableCqList); + Assert.AreEqual(0, durableCqList.Count, "Durable CQ list sholuld be empty"); + } + + + private void RunTestGetDurableCqsFromServer() + { + try + { + CacheHelper.SetupJavaServers(true, "cacheserverDurableCqs.xml"); + CacheHelper.StartJavaLocator(1, "GFELOC"); + Util.Log("Locator started"); + CacheHelper.StartJavaServerWithLocators(1, "GFECS1", 1); + Util.Log("Cache server 1 started"); + + m_client1.Call(InitDurableClient, CacheHelper.Locators, 0, "DurableClient1", 300); + m_client2.Call(InitDurableClient, CacheHelper.Locators, 0, "DurableClient2", 300); + Util.Log("client initialization done."); + + m_client1.Call(RegisterCqsClient1, false); + m_client2.Call(RegisterCqsClient2, false); + Util.Log("Registered DurableCQs."); + + m_client1.Call(VerifyDurableCqListClient1, false); + m_client2.Call(VerifyDurableCqListClient2, false); + + Util.Log("Verified DurableCQ List."); + } + finally + { + m_client1.Call(CacheHelper.Close); + m_client2.Call(CacheHelper.Close); + CacheHelper.StopJavaServer(1); + CacheHelper.StopJavaLocator(1); + } + + } + + private void RunTestGetDurableCqsFromServerCyclicClients() + { + try + { + CacheHelper.SetupJavaServers(true, "cacheserverDurableCqs.xml"); + CacheHelper.StartJavaLocator(1, "GFELOC"); + Util.Log("Locator started"); + CacheHelper.StartJavaServerWithLocators(1, "GFECS1", 1); + Util.Log("Cache server 1 started"); + + m_client1.Call(InitDurableClient, CacheHelper.Locators, 0, "DurableClient1", 300); + m_client2.Call(InitDurableClient, CacheHelper.Locators, 0, "DurableClient2", 300); + Util.Log("client initialization done."); + + m_client1.Call(RegisterCqsClient1, false); + m_client2.Call(RegisterCqsClient2, false); + Util.Log("Registered DurableCQs."); + + m_client1.Call(VerifyDurableCqListClient1, false); + m_client1.Call(VerifyDurableCqListClient1, false); + Util.Log("Verified DurableCQ List."); + + + m_client1.Call(CacheHelper.CloseKeepAlive); + m_client2.Call(CacheHelper.CloseKeepAlive); + + + m_client1.Call(InitDurableClient, CacheHelper.Locators, 0, "DurableClient1", 300); + m_client2.Call(InitDurableClient, CacheHelper.Locators, 0, "DurableClient2", 300); + Util.Log("client re-initialization done."); + + m_client1.Call(RegisterCqsClient1, true); + m_client2.Call(RegisterCqsClient2, true); + Util.Log("Registered DurableCQs."); + + m_client1.Call(VerifyDurableCqListClient1, true); + m_client1.Call(VerifyDurableCqListClient1, true); + + Util.Log("Verified DurableCQ List."); + } + finally + { + m_client1.Call(CacheHelper.Close); + m_client2.Call(CacheHelper.Close); + + CacheHelper.StopJavaServer(1); + CacheHelper.StopJavaLocator(1); + } + } + + [TestFixtureSetUp] + public override void InitTests() + { + base.InitTests(); + } + + [TestFixtureTearDown] + public override void EndTests() + { + m_client1.Exit(); + m_client2.Exit(); + base.EndTests(); + } + + [SetUp] + public override void InitTest() + { + base.InitTest(); + } + + [TearDown] + public override void EndTest() + { + m_client1.Call(CacheHelper.Close); + m_client2.Call(CacheHelper.Close); + base.EndTest(); + } + + + #endregion + + #region Tests + + [Test] + public void TestGetDurableCqsFromServerWithLocator() + { + RunTestGetDurableCqsFromServer(); + } + + [Test] + public void TestGetDurableCqsFromServerCyclicClientsWithLocator() + { + RunTestGetDurableCqsFromServerCyclicClients(); + } + + #endregion + + + } +} http://git-wip-us.apache.org/repos/asf/geode-native/blob/6cbd424f/clicache/integration-test/ThinClientDurableTestsN.cs ---------------------------------------------------------------------- diff --git a/clicache/integration-test/ThinClientDurableTestsN.cs b/clicache/integration-test/ThinClientDurableTestsN.cs new file mode 100644 index 0000000..e75856e --- /dev/null +++ b/clicache/integration-test/ThinClientDurableTestsN.cs @@ -0,0 +1,982 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Collections.Generic; +using System.Threading; + +namespace Apache.Geode.Client.UnitTests +{ + using NUnit.Framework; + using Apache.Geode.DUnitFramework; + using Apache.Geode.Client; + + + using AssertionException = Apache.Geode.Client.AssertionException; + [TestFixture] + [Category("group2")] + [Category("unicast_only")] + [Category("generics")] + public class ThinClientDurableTests : ThinClientRegionSteps + { + #region Private members + + private UnitProcess m_client1, m_client2, m_feeder; + private string[] m_regexes = { "D-Key-.*", "Key-.*" }; + private string[] m_mixKeys = { "Key-1", "D-Key-1", "L-Key", "LD-Key" }; + private string[] keys = { "Key-1", "Key-2", "Key-3", "Key-4", "Key-5" }; + + private static string DurableClientId1 = "DurableClientId1"; + private static string DurableClientId2 = "DurableClientId2"; + + private static DurableListener m_checker1, m_checker2; + + #endregion + + protected override ClientBase[] GetClients() + { + m_client1 = new UnitProcess(); + m_client2 = new UnitProcess(); + m_feeder = new UnitProcess(); + return new ClientBase[] { m_client1, m_client2, m_feeder }; + } + + [TestFixtureTearDown] + public override void EndTests() + { + CacheHelper.StopJavaServers(); + base.EndTests(); + } + + [TearDown] + public override void EndTest() + { + try + { + m_client1.Call(CacheHelper.Close); + m_client2.Call(CacheHelper.Close); + m_feeder.Call(CacheHelper.Close); + CacheHelper.ClearEndpoints(); + CacheHelper.ClearLocators(); + } + finally + { + CacheHelper.StopJavaServers(); + } + base.EndTest(); + } + + #region Common Functions + + public void InitFeeder(string locators, int redundancyLevel) + { + CacheHelper.CreatePool("__TESTPOOL1_", locators, (string)null, redundancyLevel, false); + CacheHelper.CreateTCRegion_Pool(RegionNames[0], false, true, null, + locators, "__TESTPOOL1_", false); + } + + public void InitFeeder2(string locators, int redundancyLevel) + { + CacheHelper.CreatePool("__TESTPOOL1_", locators, (string)null, redundancyLevel, false); + CacheHelper.CreateTCRegion_Pool(RegionNames[0], false, true, null, + locators, "__TESTPOOL1_", false); + + CacheHelper.CreatePool("__TESTPOOL2_", locators, (string)null, redundancyLevel, false); + CacheHelper.CreateTCRegion_Pool(RegionNames[1], false, true, null, + locators, "__TESTPOOL2_", false); + } + + public void InitDurableClientWithTwoPools(string locators, + int redundancyLevel, string durableClientId, int durableTimeout, int expectedQ0, int expectedQ1) + { + DurableListener checker = null; + CacheHelper.InitConfigForDurable_Pool2(locators, redundancyLevel, + durableClientId, durableTimeout, 35000, "__TESTPOOL1_"); + CacheHelper.CreateTCRegion_Pool(RegionNames[0], false, true, checker, + CacheHelper.Locators, "__TESTPOOL1_", true); + + CacheHelper.InitConfigForDurable_Pool2(locators, redundancyLevel, + durableClientId, durableTimeout, 35000, "__TESTPOOL2_"); + CacheHelper.CreateTCRegion_Pool(RegionNames[1], false, true, checker, + CacheHelper.Locators, "__TESTPOOL2_", true); + + IRegion region0 = CacheHelper.GetVerifyRegion(RegionNames[0]); + IRegion region1 = CacheHelper.GetVerifyRegion(RegionNames[1]); + + try + { + region0.GetSubscriptionService().RegisterAllKeys(true); + region1.GetSubscriptionService().RegisterAllKeys(true); + } + catch (Exception other) + { + Assert.Fail("RegisterAllKeys threw unexpected exception: {0}", other.Message); + } + + Pool pool0 = CacheHelper.DCache.GetPoolManager().Find(region0.Attributes.PoolName); + int pendingEventCount0 = pool0.PendingEventCount; + Util.Log("pendingEventCount0 for pool = {0} {1} ", pendingEventCount0, region0.Attributes.PoolName); + string msg = string.Format("Expected Value ={0}, Actual = {1}", expectedQ0, pendingEventCount0); + Assert.AreEqual(expectedQ0, pendingEventCount0, msg); + + Pool pool1 = CacheHelper.DCache.GetPoolManager().Find(region1.Attributes.PoolName); + int pendingEventCount1 = pool1.PendingEventCount; + Util.Log("pendingEventCount1 for pool = {0} {1} ", pendingEventCount1, region1.Attributes.PoolName); + string msg1 = string.Format("Expected Value ={0}, Actual = {1}", expectedQ1, pendingEventCount1); + Assert.AreEqual(expectedQ1, pendingEventCount1, msg1); + + CacheHelper.DCache.ReadyForEvents(); + Thread.Sleep(10000); + + CacheHelper.DCache.Close(true); + } + + public void ClearChecker(int client) + { + if (client == 1) + { + ThinClientDurableTests.m_checker1 = null; + } + else // client == 2 + { + ThinClientDurableTests.m_checker2 = null; + } + } + + public void InitDurableClient(int client, string locators, int redundancyLevel, + string durableClientId, int durableTimeout) + { + // Create DurableListener for first time and use same afterward. + DurableListener checker = null; + if (client == 1) + { + if (ThinClientDurableTests.m_checker1 == null) + { + ThinClientDurableTests.m_checker1 = DurableListener.Create(); + } + checker = ThinClientDurableTests.m_checker1; + } + else // client == 2 + { + if (ThinClientDurableTests.m_checker2 == null) + { + ThinClientDurableTests.m_checker2 = DurableListener.Create(); + } + checker = ThinClientDurableTests.m_checker2; + } + CacheHelper.InitConfigForDurable_Pool(locators, redundancyLevel, + durableClientId, durableTimeout); + CacheHelper.CreateTCRegion_Pool(RegionNames[0], false, true, checker, + CacheHelper.Locators, "__TESTPOOL1_", true); + + CacheHelper.DCache.ReadyForEvents(); + IRegion region1 = CacheHelper.GetVerifyRegion(RegionNames[0]); + region1.GetSubscriptionService().RegisterRegex(m_regexes[0], true); + region1.GetSubscriptionService().RegisterRegex(m_regexes[1], false); + //CacheableKey[] ldkeys = { new CacheableString(m_mixKeys[3]) }; + ICollection lkeys = new List(); + lkeys.Add((object)m_mixKeys[3]); + region1.GetSubscriptionService().RegisterKeys(lkeys, true, false); + + ICollection ldkeys = new List(); ; + ldkeys.Add((object)m_mixKeys[2]); + region1.GetSubscriptionService().RegisterKeys(ldkeys, false, false); + } + + public void InitClientXml(string cacheXml) + { + CacheHelper.InitConfig(cacheXml); + } + + public void ReadyForEvents() + { + CacheHelper.DCache.ReadyForEvents(); + } + + public void PendingEventCount(IRegion region, int expectedPendingQSize, bool exception) + { + Util.Log("PendingEventCount regionName = {0} ", region); + string poolName = region.Attributes.PoolName; + if (poolName != null) + { + Util.Log("PendingEventCount poolName = {0} ", poolName); + Pool pool = CacheHelper.DCache.GetPoolManager().Find(poolName); + if (exception) + { + try + { + int pendingEventCount = pool.PendingEventCount; + Util.Log("PendingEventCount Should have got exception "); + Assert.Fail("PendingEventCount Should have got exception"); + } + catch (IllegalStateException ex) + { + Util.Log("Got expected exception for PendingEventCount {0} ", ex.Message); + } + } + else + { + int pendingEventCount = pool.PendingEventCount; + Util.Log("pendingEventCount = {0} ", pendingEventCount); + string msg = string.Format("Expected Value ={0}, Actual = {1}", expectedPendingQSize, pendingEventCount); + Assert.AreEqual(expectedPendingQSize, pendingEventCount, msg); + } + } + } + + public void FeederUpdate(int value, int sleep) + { + IRegion region1 = CacheHelper.GetVerifyRegion(RegionNames[0]); + + region1[m_mixKeys[0]] = value; + Thread.Sleep(sleep); + region1[m_mixKeys[1]] = value; + Thread.Sleep(sleep); + region1[m_mixKeys[2]] = value; + Thread.Sleep(sleep); + region1[m_mixKeys[3]] = value; + Thread.Sleep(sleep); + + region1.Remove(m_mixKeys[0]); + Thread.Sleep(sleep); + region1.Remove(m_mixKeys[1]); + Thread.Sleep(sleep); + region1.Remove(m_mixKeys[2]); + Thread.Sleep(sleep); + region1.Remove(m_mixKeys[3]); + Thread.Sleep(sleep); + } + + public void FeederUpdate2(int pool1, int pool2) + { + IRegion region0 = CacheHelper.GetVerifyRegion(RegionNames[0]); + IRegion region1 = CacheHelper.GetVerifyRegion(RegionNames[1]); + + for (int i = 0; i < pool1; i++) + { + region0[i] = i; + } + + for (int i = 0; i < pool2; i++) + { + region1[i] = i; + } + } + + public void ClientDown(bool keepalive) + { + if (keepalive) + { + CacheHelper.CloseKeepAlive(); + } + else + { + CacheHelper.Close(); + } + } + + public void CrashClient() + { + // TODO: crash client here. + } + + public void KillServer() + { + CacheHelper.StopJavaServer(1); + Util.Log("Cacheserver 1 stopped."); + } + + public delegate void KillServerDelegate(); + + #endregion + + + public void VerifyTotal(int client, int keys, int total) + { + DurableListener checker = null; + if (client == 1) + { + checker = ThinClientDurableTests.m_checker1; + } + else // client == 2 + { + checker = ThinClientDurableTests.m_checker2; + } + + if (checker != null) + { + checker.validate(keys, total); + } + else + { + Assert.Fail("Checker is NULL!"); + } + } + + public void VerifyBasic(int client, int keyCount, int eventCount, int durableValue, int nonDurableValue) + {//1 4 8 1 1 + DurableListener checker = null; + if (client == 1) + { + checker = ThinClientDurableTests.m_checker1; + } + else // client == 2 + { + checker = ThinClientDurableTests.m_checker2; + } + + if (checker != null) + { + try + { + checker.validateBasic(keyCount, eventCount, durableValue, nonDurableValue);//4 8 1 1 + } + catch (AssertionException e) + { + Util.Log("VERIFICATION FAILED for client {0}: {1} ", client, e); + throw e; + } + } + else + { + Assert.Fail("Checker is NULL!"); + } + } + + #region Basic Durable Test + + + void runDurableAndNonDurableBasic() + { + CacheHelper.SetupJavaServers(true, + "cacheserver_notify_subscription.xml", "cacheserver_notify_subscription2.xml"); + CacheHelper.StartJavaLocator(1, "GFELOC"); + + for (int redundancy = 0; redundancy <= 1; redundancy++) + { + for (int closeType = 1; closeType <= 2; closeType++) + { + for (int downtime = 0; downtime <= 1; downtime++) // downtime updates + { + Util.Log("Starting loop with closeType = {0}, redundancy = {1}, downtime = {2} ", closeType, redundancy, downtime); + + CacheHelper.StartJavaServerWithLocators(1, "GFECS1", 1); + Util.Log("Cacheserver 1 started."); + + if (redundancy == 1) + { + CacheHelper.StartJavaServerWithLocators(2, "GFECS2", 1); + Util.Log("Cacheserver 2 started."); + } + + m_feeder.Call(InitFeeder, CacheHelper.Locators, 0); + Util.Log("Feeder initialized."); + + m_client1.Call(ClearChecker, 1); + m_client2.Call(ClearChecker, 2); + + m_client1.Call(InitDurableClient, 1, CacheHelper.Locators, redundancy, DurableClientId1, 300); + m_client2.Call(InitDurableClient, 2, CacheHelper.Locators, redundancy, DurableClientId2, 3); + + Util.Log("Clients initialized."); + + m_feeder.Call(FeederUpdate, 1, 10); + + Util.Log("Feeder performed first update."); + Thread.Sleep(45000); // wait for HA Q to drain and notify ack to go out. + + switch (closeType) + { + case 1: + + m_client1.Call(ClientDown, true); + m_client2.Call(ClientDown, true); + + Util.Log("Clients downed with keepalive true."); + break; + case 2: + + m_client1.Call(ClientDown, false); + m_client2.Call(ClientDown, false); + + Util.Log("Clients downed with keepalive false."); + break; + case 3: + + m_client1.Call(CrashClient); + + m_client2.Call(CrashClient); + + Util.Log("Clients downed as crash."); + break; + default: + break; + } + + if (downtime == 1) + { + m_feeder.Call(FeederUpdate, 2, 10); + + Util.Log("Feeder performed update during downtime."); + Thread.Sleep(20000); // wait for HA Q to drain and notify ack to go out. + } + + m_client1.Call(InitDurableClient, 1, CacheHelper.Locators, redundancy, DurableClientId1, 300); + + // Sleep for 45 seconds since durable timeout is 30 seconds so that client2 times out + Thread.Sleep(45000); + + m_client2.Call(InitDurableClient, 2, CacheHelper.Locators, redundancy, DurableClientId2, 30); + + Util.Log("Clients brought back up."); + + if (closeType != 2 && downtime == 1) + { + m_client1.Call(VerifyBasic, 1, 4, 12, 2, 1); + + m_client2.Call(VerifyBasic, 2, 4, 8, 1, 1); + + } + else + { + + m_client1.Call(VerifyBasic, 1, 4, 8, 1, 1); + + m_client2.Call(VerifyBasic, 2, 4, 8, 1, 1); + + } + + Util.Log("Verification completed."); + + m_feeder.Call(ClientDown, false); + + m_client1.Call(ClientDown, false); + + m_client2.Call(ClientDown, false); + + Util.Log("Feeder and Clients closed."); + + CacheHelper.StopJavaServer(1); + Util.Log("Cacheserver 1 stopped."); + + if (redundancy == 1) + { + CacheHelper.StopJavaServer(2); + Util.Log("Cacheserver 2 stopped."); + } + + Util.Log("Completed loop with closeType = {0}, redundancy = {1}, downtime = {2} ", closeType, redundancy, downtime); + + } // end for int downtime + } // end for int closeType + } // end for int redundancy + CacheHelper.StopJavaLocator(1); + } + + // Basic Durable Test to check durable event recieving for different combination + // of Close type ( Keep Alive = true / false ) , Intermediate update and rudundancy + + [Test] + public void DurableAndNonDurableBasic() + { + runDurableAndNonDurableBasic(); + } // end [Test] DurableAndNonDurableBasic + + #endregion + + #region Durable Intrest Test + + public void InitDurableClientRemoveInterest(int client, string locators, + int redundancyLevel, string durableClientId, int durableTimeout) + { + // Client Registered Durable Intrest on two keys. We need to unregister them all here. + + DurableListener checker = null; + if (client == 1) + { + if (ThinClientDurableTests.m_checker1 == null) + { + ThinClientDurableTests.m_checker1 = DurableListener.Create(); + } + checker = ThinClientDurableTests.m_checker1; + } + else // client == 2 + { + if (ThinClientDurableTests.m_checker2 == null) + { + ThinClientDurableTests.m_checker2 = DurableListener.Create(); + } + checker = ThinClientDurableTests.m_checker2; + } + CacheHelper.InitConfigForDurable_Pool(locators, redundancyLevel, + durableClientId, durableTimeout); + CacheHelper.CreateTCRegion_Pool(RegionNames[0], false, true, checker, + CacheHelper.Locators, "__TESTPOOL1_", true); + + CacheHelper.DCache.ReadyForEvents(); + IRegion region1 = CacheHelper.GetVerifyRegion(RegionNames[0]); + + // Unregister Regex only durable + region1.GetSubscriptionService().RegisterRegex(m_regexes[0], true); + region1.GetSubscriptionService().UnregisterRegex(m_regexes[0]); + + // Unregister list only durable + string[] ldkeys = new string[] { m_mixKeys[3] }; + region1.GetSubscriptionService().RegisterKeys(ldkeys, true, false); + region1.GetSubscriptionService().UnregisterKeys(ldkeys); + } + + public void InitDurableClientNoInterest(int client, string locators, + int redundancyLevel, string durableClientId, int durableTimeout) + { + // we use "client" to either create a DurableListener or use the existing ones + // if the clients are initialized for the second time + DurableListener checker = null; + if (client == 1) + { + if (ThinClientDurableTests.m_checker1 == null) + { + ThinClientDurableTests.m_checker1 = DurableListener.Create(); + } + checker = ThinClientDurableTests.m_checker1; + } + else // client == 2 + { + if (ThinClientDurableTests.m_checker2 == null) + { + ThinClientDurableTests.m_checker2 = DurableListener.Create(); + } + checker = ThinClientDurableTests.m_checker2; + } + CacheHelper.InitConfigForDurable_Pool(locators, redundancyLevel, + durableClientId, durableTimeout); + CacheHelper.CreateTCRegion_Pool(RegionNames[0], false, true, checker, + CacheHelper.Locators, "__TESTPOOL1_", true); + CacheHelper.DCache.ReadyForEvents(); + } + + void runDurableInterest() + { + CacheHelper.SetupJavaServers(true, "cacheserver_notify_subscription.xml"); + CacheHelper.StartJavaLocator(1, "GFELOC"); + Util.Log("Locator started"); + CacheHelper.StartJavaServerWithLocators(1, "GFECS1", 1); + Util.Log("Cacheserver 1 started."); + + m_feeder.Call(InitFeeder, CacheHelper.Locators, 0); + Util.Log("Feeder started."); + + m_client1.Call(ClearChecker, 1); + m_client2.Call(ClearChecker, 2); + m_client1.Call(InitDurableClient, 1, CacheHelper.Locators, + 0, DurableClientId1, 60); + m_client2.Call(InitDurableClient, 2, CacheHelper.Locators, + 0, DurableClientId2, 60); + Util.Log("Clients started."); + + m_feeder.Call(FeederUpdate, 1, 10); + Util.Log("Feeder performed first update."); + + Thread.Sleep(15000); + + m_client1.Call(ClientDown, true); + m_client2.Call(ClientDown, true); + Util.Log("Clients downed with keepalive true."); + + m_client1.Call(InitDurableClientNoInterest, 1, CacheHelper.Locators, + 0, DurableClientId1, 60); + Util.Log("Client 1 started with no interest."); + + m_client2.Call(InitDurableClientRemoveInterest, 2, CacheHelper.Locators, + 0, DurableClientId2, 60); + Util.Log("Client 2 started with remove interest."); + + m_feeder.Call(FeederUpdate, 2, 10); + Util.Log("Feeder performed second update."); + + Thread.Sleep(10000); + + // only durable Intrest will remain. + m_client1.Call(VerifyBasic, 1, 4, 12, 2, 1); + + // no second update should be recieved. + m_client2.Call(VerifyBasic, 2, 4, 8, 1, 1); + Util.Log("Verification completed."); + + m_feeder.Call(ClientDown, false); + m_client1.Call(ClientDown, false); + m_client2.Call(ClientDown, false); + Util.Log("Feeder and Clients closed."); + + CacheHelper.StopJavaServer(1); + Util.Log("Cacheserver 1 stopped."); + + CacheHelper.StopJavaLocator(1); + Util.Log("Locator stopped"); + + CacheHelper.ClearEndpoints(); + CacheHelper.ClearLocators(); + } + + //This is to test whether durable registered intrests remains on reconnect. and + // Unregister works on reconnect. + + [Test] + public void DurableInterest() + { + runDurableInterest(); + } // end [Test] DurableInterest + #endregion + + #region Durable Failover Test + + + public void InitDurableClientForFailover(int client, string locators, + int redundancyLevel, string durableClientId, int durableTimeout) + { + // we use "client" to either create a DurableListener or use the existing ones + // if the clients are initialized for the second time + DurableListener checker = null; + if (client == 1) + { + if (ThinClientDurableTests.m_checker1 == null) + { + ThinClientDurableTests.m_checker1 = DurableListener.Create(); + } + checker = ThinClientDurableTests.m_checker1; + } + else // client == 2 + { + if (ThinClientDurableTests.m_checker2 == null) + { + ThinClientDurableTests.m_checker2 = DurableListener.Create(); + } + checker = ThinClientDurableTests.m_checker2; + } + CacheHelper.InitConfigForDurable_Pool(locators, redundancyLevel, + durableClientId, durableTimeout, 35000); + CacheHelper.CreateTCRegion_Pool(RegionNames[0], false, true, checker, + CacheHelper.Locators, "__TESTPOOL1_", true); + CacheHelper.DCache.ReadyForEvents(); + IRegion region1 = CacheHelper.GetVerifyRegion(RegionNames[0]); + + try + { + region1.GetSubscriptionService().RegisterRegex(m_regexes[0], true); + region1.GetSubscriptionService().RegisterRegex(m_regexes[1], false); + } + catch (Exception other) + { + Assert.Fail("RegisterKeys threw unexpected exception: {0}", other.Message); + } + } + + public void FeederUpdateForFailover(string region, int value, int sleep) + { + //update only 2 keys. + IRegion region1 = CacheHelper.GetVerifyRegion(region); + + region1[m_mixKeys[0]] = value; + Thread.Sleep(sleep); + region1[m_mixKeys[1]] = value; + Thread.Sleep(sleep); + + } + + void runDurableFailover() + { + CacheHelper.SetupJavaServers(true, + "cacheserver_notify_subscription.xml", "cacheserver_notify_subscription2.xml"); + + CacheHelper.StartJavaLocator(1, "GFELOC"); + Util.Log("Locator started"); + + for (int clientDown = 0; clientDown <= 1; clientDown++) + { + for (int redundancy = 0; redundancy <= 1; redundancy++) + { + Util.Log("Starting loop with clientDown = {0}, redundancy = {1}", clientDown, redundancy); + + CacheHelper.StartJavaServerWithLocators(1, "GFECS1", 1); + Util.Log("Cacheserver 1 started."); + + m_feeder.Call(InitFeeder, CacheHelper.Locators, 0); + Util.Log("Feeder started with redundancy level as 0."); + + m_client1.Call(ClearChecker, 1); + m_client1.Call(InitDurableClientForFailover, 1, CacheHelper.Locators, + redundancy, DurableClientId1, 300); + Util.Log("Client started with redundancy level as {0}.", redundancy); + + m_feeder.Call(FeederUpdateForFailover, RegionNames[0], 1, 10); + Util.Log("Feeder updates 1 completed."); + + CacheHelper.StartJavaServerWithLocators(2, "GFECS2", 1); + Util.Log("Cacheserver 2 started."); + + //Time for redundancy thread to detect. + Thread.Sleep(35000); + + if (clientDown == 1) + { + m_client1.Call(ClientDown, true); + } + + CacheHelper.StopJavaServer(1); + Util.Log("Cacheserver 1 stopped."); + + //Time for failover + Thread.Sleep(5000); + + m_feeder.Call(FeederUpdateForFailover, RegionNames[0], 2, 10); + Util.Log("Feeder updates 2 completed."); + + //Restart Client + if (clientDown == 1) + { + m_client1.Call(InitDurableClientForFailover, 1, CacheHelper.Locators, + redundancy, DurableClientId1, 300); + Util.Log("Client Restarted with redundancy level as {0}.", redundancy); + } + + //Verify + if (clientDown == 1) + { + if (redundancy == 0) // Events missed + { + m_client1.Call(VerifyBasic, 1, 2, 2, 1, 1); + } + else // redundancy == 1 Only Durable Events should be recieved. + { + m_client1.Call(VerifyBasic, 1, 2, 3, 2, 1); + } + } + else // In normal failover all events should be recieved. + { + m_client1.Call(VerifyBasic, 1, 2, 4, 2, 2); + } + + Util.Log("Verification completed."); + + m_feeder.Call(ClientDown, false); + m_client1.Call(ClientDown, false); + Util.Log("Feeder and Client closed."); + + CacheHelper.StopJavaServer(2); + Util.Log("Cacheserver 2 stopped."); + + Util.Log("Completed loop with clientDown = {0}, redundancy = {1}", clientDown, redundancy); + }// for redundancy + } // for clientDown + CacheHelper.StopJavaLocator(1); + Util.Log("Locator stopped"); + + CacheHelper.ClearEndpoints(); + CacheHelper.ClearLocators(); + } + + void RunDurableClient(int expectedPendingQSize) + { + Properties pp = Properties.Create(); + pp.Insert("durable-client-id", "DurableClientId"); + pp.Insert("durable-timeout", "30"); + + CacheFactory cacheFactory = CacheFactory.CreateCacheFactory(pp); + Cache cache = cacheFactory.Create(); + cache.GetPoolFactory().SetSubscriptionEnabled(true); + cache.GetPoolFactory().SetSubscriptionAckInterval(5000); + cache.GetPoolFactory().SetSubscriptionMessageTrackingTimeout(5000); + Util.Log("Created the Geode Cache Programmatically"); + + RegionFactory regionFactory = cache.CreateRegionFactory(RegionShortcut.CACHING_PROXY); + IRegion region = regionFactory.Create("DistRegionAck"); + Util.Log("Created the DistRegionAck Region Programmatically"); + + QueryService qService = cache.GetQueryService(); + CqAttributesFactory cqFac = new CqAttributesFactory(); + + ICqListener cqLstner = new MyCqListener1(); + cqFac.AddCqListener(cqLstner); + CqAttributes cqAttr = cqFac.Create(); + Util.Log("Attached CqListener"); + String query = "select * from /DistRegionAck"; + CqQuery qry = qService.NewCq("MyCq", query, cqAttr, true); + Util.Log("Created new CqQuery"); + + qry.Execute(); + Util.Log("Executed new CqQuery"); + Thread.Sleep(10000); + + PendingEventCount(region, expectedPendingQSize, false); + + //Send ready for Event message to Server( only for Durable Clients ). + //Server will send queued events to client after recieving this. + cache.ReadyForEvents(); + + Util.Log("Sent ReadyForEvents message to server"); + Thread.Sleep(10000); + // Close the Geode Cache with keepalive = true. Server will queue events for + // durable registered keys and will deliver all events when client will reconnect + // within timeout period and send "readyForEvents()" + + PendingEventCount(region, 0, true); + + cache.Close(true); + + Util.Log("Closed the Geode Cache with keepalive as true"); + } + + void runDurableClientWithTwoPools() + { + CacheHelper.SetupJavaServers(true, "cacheserver_notify_subscription.xml"); + CacheHelper.StartJavaLocator(1, "GFELOC"); + Util.Log("Locator started"); + CacheHelper.StartJavaServerWithLocators(1, "GFECS1", 1); + Util.Log("Cacheserver 1 started."); + + m_feeder.Call(InitFeeder2, CacheHelper.Locators, 0); + Util.Log("Feeder started."); + + m_client1.Call(InitDurableClientWithTwoPools, CacheHelper.Locators, 0, DurableClientId1, 30, -2, -2); + Util.Log("DurableClient with Two Pools Initialized"); + + m_feeder.Call(FeederUpdate2, 5, 10); + Util.Log("Feeder performed first update."); + Thread.Sleep(15000); + + m_client1.Call(InitDurableClientWithTwoPools, CacheHelper.Locators, 0, DurableClientId1, 30, 6, 11); //+1 for marker, so 5+1, 10+1 etc + Util.Log("DurableClient with Two Pools after first update"); + + m_feeder.Call(FeederUpdate2, 10, 5); + Util.Log("Feeder performed second update."); + Thread.Sleep(15000); + + m_client1.Call(InitDurableClientWithTwoPools, CacheHelper.Locators, 0, DurableClientId1, 30, 16, 16); + Util.Log("DurableClient with Two Pools after second update"); + + Thread.Sleep(45000); //45 > 30 secs. + m_client1.Call(InitDurableClientWithTwoPools, CacheHelper.Locators, 0, DurableClientId1, 30, -1, -1); + Util.Log("DurableClient with Two Pools after timeout"); + + m_feeder.Call(ClientDown, false); + Util.Log("Feeder and Clients closed."); + + CacheHelper.StopJavaServer(1); + Util.Log("Cacheserver 1 stopped."); + + CacheHelper.StopJavaLocator(1); + Util.Log("Locator stopped"); + + CacheHelper.ClearEndpoints(); + CacheHelper.ClearLocators(); + } + + void RunFeeder() + { + CacheFactory cacheFactory = CacheFactory.CreateCacheFactory(); + Util.Log("Feeder connected to the Geode Distributed System"); + + Cache cache = cacheFactory.Create(); + Util.Log("Created the Geode Cache"); + + RegionFactory regionFactory = cache.CreateRegionFactory(RegionShortcut.PROXY); + Util.Log("Created the RegionFactory"); + + // Create the Region Programmatically. + IRegion region = regionFactory.Create("DistRegionAck"); + Util.Log("Created the Region Programmatically."); + + PendingEventCount(region, 0, true); + + for (int i = 0; i < 10; i++) + { + region[i] = i; + } + Thread.Sleep(10000); + Util.Log("put on 0-10 keys done."); + + // Close the Geode Cache + cache.Close(); + Util.Log("Closed the Geode Cache"); + } + + void RunFeeder1() + { + CacheFactory cacheFactory = CacheFactory.CreateCacheFactory(); + Util.Log("Feeder connected to the Geode Distributed System"); + + Cache cache = cacheFactory.Create(); + Util.Log("Created the Geode Cache"); + + RegionFactory regionFactory = cache.CreateRegionFactory(RegionShortcut.PROXY); + Util.Log("Created the RegionFactory"); + + // Create the Region Programmatically. + IRegion region = regionFactory.Create("DistRegionAck"); + Util.Log("Created the Region Programmatically."); + + PendingEventCount(region, 0, true); + + for (int i = 10; i < 20; i++) + { + region[i] = i; + } + Thread.Sleep(10000); + Util.Log("put on 10-20 keys done."); + + // Close the Geode Cache + cache.Close(); + Util.Log("Closed the Geode Cache"); + } + + void VerifyEvents() + { + Util.Log("MyCqListener1.m_cntEvents = {0} ", MyCqListener1.m_cntEvents); + Assert.AreEqual(MyCqListener1.m_cntEvents, 20, "Incorrect events, expected 20"); + } + + void runCQDurable() + { + CacheHelper.SetupJavaServers(false, "serverDurableClient.xml"); + CacheHelper.StartJavaServer(1, "GFECS1"); + m_client1.Call(RunDurableClient, -2); // 1st time no Q, hence check -2 as PendingEventCount. + m_client2.Call(RunFeeder); + m_client1.Call(RunDurableClient, 10); + m_client2.Call(RunFeeder1); + m_client1.Call(RunDurableClient, 10); + m_client1.Call(VerifyEvents); + Thread.Sleep(45 * 1000); // sleep 45 secs > 30 secs, check -1 as PendingEventCount. + m_client1.Call(RunDurableClient, -1); + CacheHelper.StopJavaServer(1); + } + + [Test] + public void DurableFailover() + { + runDurableFailover(); + } // end [Test] DurableFailover + + [Test] + public void CQDurable() + { + runCQDurable(); + + runDurableClientWithTwoPools(); + } + #endregion + } +}