geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hiteshkhame...@apache.org
Subject [57/61] [abbrv] incubator-geode git commit: GEODE-37 change package name from com.gemstone.gemfire (for ./geode-wan/src/main/java/com/gemstone/gemfire)to org.apache.geode for(to ./geode-wan/src/main/java/org/apache/geode)
Date Tue, 13 Sep 2016 22:44:45 GMT
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/RemoteParallelGatewaySenderEventProcessor.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/RemoteParallelGatewaySenderEventProcessor.java
b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/RemoteParallelGatewaySenderEventProcessor.java
new file mode 100644
index 0000000..815932e
--- /dev/null
+++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/RemoteParallelGatewaySenderEventProcessor.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan.parallel;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.client.internal.Connection;
+import com.gemstone.gemfire.cache.client.internal.pooling.ConnectionDestroyedException;
+import com.gemstone.gemfire.cache.wan.GatewaySender;
+import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderConfigurationException;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventCallbackDispatcher;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventDispatcher;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventRemoteDispatcher;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderException;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderStats;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+public class RemoteParallelGatewaySenderEventProcessor extends ParallelGatewaySenderEventProcessor
{
+  private static final Logger logger = LogService.getLogger();
+  
+  protected RemoteParallelGatewaySenderEventProcessor(
+      AbstractGatewaySender sender) {
+    super(sender);
+  }
+  
+  /**
+   * use in concurrent scenario where queue is to be shared among all the processors.
+   */
+  protected RemoteParallelGatewaySenderEventProcessor(AbstractGatewaySender sender,  Set<Region>
userRegions, int id, int nDispatcher) {
+    super(sender,  userRegions, id, nDispatcher);
+  }
+  
+  @Override
+  protected void rebalance() {
+    GatewaySenderStats statistics = this.sender.getStatistics();
+    long startTime = statistics.startLoadBalance();
+    try {
+      if (this.dispatcher.isRemoteDispatcher()) {
+        GatewaySenderEventRemoteDispatcher remoteDispatcher = (GatewaySenderEventRemoteDispatcher)
this.dispatcher;
+        if (remoteDispatcher.isConnectedToRemote()) {
+          remoteDispatcher.stopAckReaderThread();
+          remoteDispatcher.destroyConnection();
+        }
+      }
+    } finally {
+      statistics.endLoadBalance(startTime);
+    }
+  }
+  
+  public void initializeEventDispatcher() {
+    if (logger.isDebugEnabled()) {
+      logger.debug(" Creating the GatewayEventRemoteDispatcher");
+    }
+    if (this.sender.getRemoteDSId() != GatewaySender.DEFAULT_DISTRIBUTED_SYSTEM_ID) {
+      this.dispatcher = new GatewaySenderEventRemoteDispatcher(this);
+    }
+  }
+  
+  /**
+   * Returns if corresponding receiver WAN site of this GatewaySender has
+   * GemfireVersion > 7.0.1
+   * 
+   * @param disp
+   * @return true if remote site Gemfire Version is >= 7.0.1
+   */
+  private boolean shouldSendVersionEvents(GatewaySenderEventDispatcher disp)
+      throws GatewaySenderException {
+      try {
+        GatewaySenderEventRemoteDispatcher remoteDispatcher = (GatewaySenderEventRemoteDispatcher)
disp;
+        // This will create a new connection if no batch has been sent till
+        // now.
+        Connection conn = remoteDispatcher.getConnection(false);
+        if (conn != null) {
+          short remoteSiteVersion = conn.getWanSiteVersion();
+          if (Version.GFE_701.compareTo(remoteSiteVersion) <= 0) {
+            return true;
+          }
+        }
+      } catch (GatewaySenderException e) {
+        Throwable cause = e.getCause();
+        if (cause instanceof IOException
+            || e instanceof GatewaySenderConfigurationException
+            || cause instanceof ConnectionDestroyedException) {
+          try {
+            int sleepInterval = GatewaySender.CONNECTION_RETRY_INTERVAL;
+            if (logger.isDebugEnabled()) {
+              logger.debug("Sleeping for {} milliseconds", sleepInterval);
+            }
+            Thread.sleep(sleepInterval);
+          } catch (InterruptedException ie) {
+            // log the exception
+            if (logger.isDebugEnabled()){
+              logger.debug(ie.getMessage(), ie);
+            }
+          }
+        }
+        throw e;
+      }
+    return false;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteConcurrentSerialGatewaySenderEventProcessor.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteConcurrentSerialGatewaySenderEventProcessor.java
b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteConcurrentSerialGatewaySenderEventProcessor.java
new file mode 100644
index 0000000..8a25ab6
--- /dev/null
+++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteConcurrentSerialGatewaySenderEventProcessor.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan.serial;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+public class RemoteConcurrentSerialGatewaySenderEventProcessor extends
+    ConcurrentSerialGatewaySenderEventProcessor {
+
+  private static final Logger logger = LogService.getLogger();
+  
+  public RemoteConcurrentSerialGatewaySenderEventProcessor(
+      AbstractGatewaySender sender) {
+    super(sender);
+  }
+
+  @Override
+  protected void initializeMessageQueue(String id) {
+    for (int i = 0; i < sender.getDispatcherThreads(); i++) {
+      processors.add(new RemoteSerialGatewaySenderEventProcessor(this.sender, id
+          + "." + i));
+      if (logger.isDebugEnabled()) {
+        logger.debug("Created the RemoteSerialGatewayEventProcessor_{}->{}", i, processors.get(i));
+      }
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java
b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java
new file mode 100644
index 0000000..82fa585
--- /dev/null
+++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan.serial;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventRemoteDispatcher;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventCallbackDispatcher;
+import com.gemstone.gemfire.cache.wan.GatewaySender;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+public class RemoteSerialGatewaySenderEventProcessor extends
+    SerialGatewaySenderEventProcessor {
+
+  private static final Logger logger = LogService.getLogger();
+  public RemoteSerialGatewaySenderEventProcessor(AbstractGatewaySender sender,
+      String id) {
+    super(sender, id);
+  }
+
+  public void initializeEventDispatcher() {
+    if (logger.isDebugEnabled()) {
+      logger.debug(" Creating the GatewayEventRemoteDispatcher");
+    }
+    // In case of serial there is a way to create gatewaySender and attach
+    // asyncEventListener. Not sure of the use-case but there are dunit tests
+    // To make them pass uncommenting the below condition
+    if (this.sender.getRemoteDSId() != GatewaySender.DEFAULT_DISTRIBUTED_SYSTEM_ID) {
+      this.dispatcher = new GatewaySenderEventRemoteDispatcher(this);
+    }else{
+      this.dispatcher = new GatewaySenderEventCallbackDispatcher(this);
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
new file mode 100644
index 0000000..85e1bc0
--- /dev/null
+++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan.serial;
+import java.util.Set;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener;
+import com.gemstone.gemfire.cache.wan.GatewayTransportFilter;
+import com.gemstone.gemfire.distributed.DistributedLockService;
+import com.gemstone.gemfire.distributed.internal.DistributionAdvisor.Profile;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.distributed.internal.ResourceEvent;
+import com.gemstone.gemfire.internal.cache.EntryEventImpl;
+import com.gemstone.gemfire.internal.cache.EventID;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.RegionQueue;
+import com.gemstone.gemfire.internal.cache.UpdateAttributesProcessor;
+import com.gemstone.gemfire.internal.cache.ha.ThreadIdentifier;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor;
+import com.gemstone.gemfire.internal.cache.wan.AbstractRemoteGatewaySender;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderAdvisor.GatewaySenderProfile;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderAttributes;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderConfigurationException;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+
+/**
+ * @since GemFire 7.0
+ *
+ */
+public class SerialGatewaySenderImpl extends AbstractRemoteGatewaySender {
+
+  private static final Logger logger = LogService.getLogger();
+  
+  final ThreadGroup loggerGroup = LoggingThreadGroup.createThreadGroup(
+      "Remote Site Discovery Logger Group", logger);
+
+  public SerialGatewaySenderImpl(){
+    super();
+    this.isParallel = false;
+  }
+  public SerialGatewaySenderImpl(Cache cache,
+      GatewaySenderAttributes attrs) {
+    super(cache, attrs);
+  }
+  
+  @Override
+  public void start() {
+    if (logger.isDebugEnabled()) {
+      logger.debug("Starting gatewaySender : {}", this);
+    }
+    
+    this.getLifeCycleLock().writeLock().lock();
+    try {
+      if (isRunning()) {
+        logger.warn(LocalizedMessage.create(LocalizedStrings.GatewaySender_SENDER_0_IS_ALREADY_RUNNING,
this.getId()));
+        return;
+      }
+      if (this.remoteDSId != DEFAULT_DISTRIBUTED_SYSTEM_ID) {
+        String locators = ((GemFireCacheImpl)this.cache).getDistributedSystem()
+            .getConfig().getLocators();
+        if (locators.length() == 0) {
+          throw new GatewaySenderConfigurationException(
+              LocalizedStrings.AbstractGatewaySender_LOCATOR_SHOULD_BE_CONFIGURED_BEFORE_STARTING_GATEWAY_SENDER
+                  .toLocalizedString());
+        }
+      }
+      getSenderAdvisor().initDLockService();
+      if (!isPrimary()) {
+        if (getSenderAdvisor().volunteerForPrimary()) {
+          getSenderAdvisor().makePrimary();
+        } else {
+          getSenderAdvisor().makeSecondary();
+        }
+      }
+      if (getDispatcherThreads() > 1) {
+        eventProcessor = new RemoteConcurrentSerialGatewaySenderEventProcessor(
+            SerialGatewaySenderImpl.this);
+      } else {
+        eventProcessor = new RemoteSerialGatewaySenderEventProcessor(
+            SerialGatewaySenderImpl.this, getId());
+      }
+      eventProcessor.start();
+      waitForRunningStatus();
+      this.startTime = System.currentTimeMillis();
+      
+      //Only notify the type registry if this is a WAN gateway queue
+      if(!isAsyncEventQueue()) {
+        ((GemFireCacheImpl) getCache()).getPdxRegistry().gatewaySenderStarted(this);
+      }
+      new UpdateAttributesProcessor(this).distribute(false);
+
+      
+      InternalDistributedSystem system = (InternalDistributedSystem) this.cache
+          .getDistributedSystem();
+      system.handleResourceEvent(ResourceEvent.GATEWAYSENDER_START, this);
+      
+      logger.info(LocalizedMessage.create(LocalizedStrings.SerialGatewaySenderImpl_STARTED__0,
this));
+  
+      enqueueTempEvents();
+    } finally {
+      this.getLifeCycleLock().writeLock().unlock();
+    }
+  }
+  
+  @Override
+  public void stop() {
+    if (logger.isDebugEnabled()) {
+      logger.debug("Stopping Gateway Sender : {}", this);
+    }
+    this.getLifeCycleLock().writeLock().lock();
+    try {
+      // Stop the dispatcher
+      AbstractGatewaySenderEventProcessor ev = this.eventProcessor;
+      if (ev != null && !ev.isStopped()) {
+        ev.stopProcessing();
+      }
+      
+      // Stop the proxy (after the dispatcher, so the socket is still
+      // alive until after the dispatcher has stopped)
+      stompProxyDead();
+
+      // Close the listeners
+      for (AsyncEventListener listener : this.listeners) {
+        listener.close();
+      }
+      logger.info(LocalizedMessage.create(LocalizedStrings.GatewayImpl_STOPPED__0, this));
+
+      clearTempEventsAfterSenderStopped();
+    } finally {
+      this.getLifeCycleLock().writeLock().unlock();
+    }
+   
+    if (this.isPrimary()) {
+      try {
+        DistributedLockService
+            .destroy(getSenderAdvisor().getDLockServiceName());
+      } catch (IllegalArgumentException e) {
+        // service not found... ignore
+      }
+    }
+    Set<RegionQueue> queues = getQueues();
+    if (queues != null && !queues.isEmpty()) {
+      for (RegionQueue q : queues) {
+        ((SerialGatewaySenderQueue)q).cleanUp();
+      }
+    }
+   
+    this.setIsPrimary(false);
+    new UpdateAttributesProcessor(this).distribute(false);
+    Thread lockObtainingThread = getSenderAdvisor().getLockObtainingThread();
+    if (lockObtainingThread != null && lockObtainingThread.isAlive()) {
+      // wait a while for thread to terminate
+      try {
+        lockObtainingThread.join(3000);
+      } catch (InterruptedException ex) {
+        // we allowed our join to be canceled
+        // reset interrupt bit so this thread knows it has been interrupted
+        Thread.currentThread().interrupt();
+      }
+      if (lockObtainingThread.isAlive()) {
+        logger.info(LocalizedMessage.create(LocalizedStrings.GatewaySender_COULD_NOT_STOP_LOCK_OBTAINING_THREAD_DURING_GATEWAY_SENDER_STOP));
+      }
+    }
+    
+    InternalDistributedSystem system = (InternalDistributedSystem) this.cache
+        .getDistributedSystem();
+    system.handleResourceEvent(ResourceEvent.GATEWAYSENDER_STOP, this);
+    
+    this.eventProcessor = null;
+  }
+  
+  @Override
+  public String toString() {
+    StringBuffer sb = new StringBuffer();
+    sb.append("SerialGatewaySender{");
+    sb.append("id=" + getId());
+    sb.append(",remoteDsId="+ getRemoteDSId());
+    sb.append(",isRunning ="+ isRunning());
+    sb.append(",isPrimary ="+ isPrimary());
+    sb.append("}");
+    return sb.toString();
+  }
+ 
+  @Override
+  public void fillInProfile(Profile profile) {
+    assert profile instanceof GatewaySenderProfile;
+    GatewaySenderProfile pf = (GatewaySenderProfile)profile;
+    pf.Id = getId();
+    pf.startTime = getStartTime();
+    pf.remoteDSId = getRemoteDSId();
+    pf.isRunning = isRunning();
+    pf.isPrimary = isPrimary();
+    pf.isParallel = false;
+    pf.isBatchConflationEnabled = isBatchConflationEnabled();
+    pf.isPersistenceEnabled = isPersistenceEnabled();
+    pf.alertThreshold = getAlertThreshold();
+    pf.manualStart = isManualStart();
+    for (com.gemstone.gemfire.cache.wan.GatewayEventFilter filter : getGatewayEventFilters())
{
+      pf.eventFiltersClassNames.add(filter.getClass().getName());
+    }
+    for (GatewayTransportFilter filter : getGatewayTransportFilters()) {
+      pf.transFiltersClassNames.add(filter.getClass().getName());
+    }
+    for (AsyncEventListener listener : getAsyncEventListeners()) {
+      pf.senderEventListenerClassNames.add(listener.getClass().getName());
+    }
+    pf.isDiskSynchronous = isDiskSynchronous();
+    pf.dispatcherThreads = getDispatcherThreads();
+    pf.orderPolicy = getOrderPolicy();
+    pf.serverLocation = this.getServerLocation(); 
+  }
+
+  /* (non-Javadoc)
+   * @see com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender#setModifiedEventId(com.gemstone.gemfire.internal.cache.EntryEventImpl)
+   */
+  @Override
+  protected void setModifiedEventId(EntryEventImpl clonedEvent) {
+    EventID originalEventId = clonedEvent.getEventId();
+    long originalThreadId = originalEventId.getThreadID();
+    long newThreadId = originalThreadId;
+    if (ThreadIdentifier.isWanTypeThreadID(newThreadId)) {
+      // This thread id has already been converted. Do nothing.
+    } else {
+      newThreadId = ThreadIdentifier
+        .createFakeThreadIDForParallelGSPrimaryBucket(0, originalThreadId,
+            getEventIdIndex());
+    }
+    EventID newEventId = new EventID(originalEventId.getMembershipID(),
+        newThreadId, originalEventId.getSequenceID());
+    if (logger.isDebugEnabled()) {
+      logger.debug("{}: Generated event id for event with key={}, original event id={}, originalThreadId={},
new event id={}, newThreadId={}",
+          this, clonedEvent.getKey(), originalEventId, originalThreadId, newEventId, newThreadId);
+    }
+    clonedEvent.setEventId(newEventId);
+  }
+
+}


Mime
View raw message