asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From il...@apache.org
Subject asterixdb git commit: Enable Feed Changes to work with BAD project
Date Wed, 01 Mar 2017 20:26:49 GMT
Repository: asterixdb
Updated Branches:
  refs/heads/master beb5dc746 -> 7152182a3


Enable Feed Changes to work with BAD project

Extracts the ActiveListener
Enables listeners to survive after job executions

Change-Id: Ib62184b67aff564475ef9b58790ff96409195b77
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1524
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/7152182a
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/7152182a
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/7152182a

Branch: refs/heads/master
Commit: 7152182a34b4845d0261f08d16f34695fc23acc4
Parents: beb5dc7
Author: Steven Glenn Jacobs <sjaco002@ucr.edu>
Authored: Thu Feb 23 21:11:21 2017 -0800
Committer: Steven Jacobs <sjaco002@ucr.edu>
Committed: Wed Mar 1 12:20:57 2017 -0800

----------------------------------------------------------------------
 .../active/ActiveJobNotificationHandler.java    | 14 +++--
 .../management/ActiveEntityEventsListener.java  | 55 ++++++++++++++++++++
 .../feed/management/FeedEventsListener.java     | 25 ++-------
 3 files changed, 68 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7152182a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
index e4491bd..d7998f8 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
@@ -55,14 +55,15 @@ public class ActiveJobNotificationHandler implements Runnable {
                 if (entityId != null) {
                     IActiveEntityEventsListener listener = entityEventListeners.get(entityId);
                     LOGGER.log(Level.FINER, "Next event is of type " + event.getEventKind());
-                    LOGGER.log(Level.FINER, "Notifying the listener");
-                    listener.notify(event);
                     if (event.getEventKind() == Kind.JOB_FINISHED) {
                         LOGGER.log(Level.FINER, "Removing the job");
                         jobId2ActiveJobInfos.remove(event.getJobId());
-                        LOGGER.log(Level.FINER, "Removing the listener since it is not active
anymore");
-                        entityEventListeners.remove(listener.getEntityId());
                     }
+                    if (listener != null) {
+                        LOGGER.log(Level.FINER, "Notifying the listener");
+                        listener.notify(event);
+                    }
+
                 } else {
                     LOGGER.log(Level.SEVERE, "Entity not found for received message for job
" + event.getJobId());
                 }
@@ -75,6 +76,11 @@ public class ActiveJobNotificationHandler implements Runnable {
         LOGGER.log(Level.INFO, "Stopped " + ActiveJobNotificationHandler.class.getSimpleName());
     }
 
+    public synchronized void removeListener(IActiveEntityEventsListener listener) throws
HyracksDataException {
+        LOGGER.log(Level.FINER, "Removing the listener since it is not active anymore");
+        unregisterListener(listener);
+    }
+
     public IActiveEntityEventsListener getActiveEntityListener(EntityId entityId) {
         if (DEBUG) {
             LOGGER.log(Level.WARNING, "getActiveEntityListener(EntityId entityId) was called
with entity " + entityId);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7152182a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java
new file mode 100644
index 0000000..365c3ce
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.management;
+
+import java.util.List;
+
+import org.apache.asterix.active.ActivityState;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.active.IActiveEntityEventsListener;
+import org.apache.asterix.common.metadata.IDataset;
+import org.apache.hyracks.api.job.JobId;
+
+public abstract class ActiveEntityEventsListener implements IActiveEntityEventsListener {
+
+    // members
+    protected EntityId entityId;
+    protected List<IDataset> datasets;
+    protected volatile ActivityState state;
+    protected JobId jobId;
+
+    @Override
+    public EntityId getEntityId() {
+        return entityId;
+    }
+
+    @Override
+    public ActivityState getState() {
+        return state;
+    }
+
+    @Override
+    public boolean isEntityUsingDataset(IDataset dataset) {
+        return datasets.contains(dataset);
+    }
+
+    public JobId getJobId() {
+        return jobId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7152182a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
index 2a87cab..f49da3c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
@@ -25,9 +25,9 @@ import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.active.ActiveEvent;
+import org.apache.asterix.active.ActiveJobNotificationHandler;
 import org.apache.asterix.active.ActivityState;
 import org.apache.asterix.active.EntityId;
-import org.apache.asterix.active.IActiveEntityEventsListener;
 import org.apache.asterix.active.IActiveEventSubscriber;
 import org.apache.asterix.active.message.ActivePartitionMessage;
 import org.apache.asterix.common.metadata.IDataset;
@@ -36,20 +36,15 @@ import org.apache.asterix.external.feed.watch.NoOpSubscriber;
 import org.apache.asterix.runtime.utils.AppContextInfo;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobStatus;
 
-public class FeedEventsListener implements IActiveEntityEventsListener {
+public class FeedEventsListener extends ActiveEntityEventsListener {
     // constants
     private static final Logger LOGGER = Logger.getLogger(FeedEventsListener.class.getName());
     // members
-    private final EntityId entityId;
-    private final List<IDataset> datasets;
     private final String[] sources;
     private final List<IActiveEventSubscriber> subscribers;
-    private volatile ActivityState state;
     private int numRegistered;
-    private JobId jobId;
 
     public FeedEventsListener(EntityId entityId, List<IDataset> datasets, String[]
sources) {
         this.entityId = entityId;
@@ -111,6 +106,7 @@ public class FeedEventsListener implements IActiveEntityEventsListener
{
         IHyracksClientConnection hcc = AppContextInfo.INSTANCE.getHcc();
         JobStatus status = hcc.getJobStatus(jobId);
         state = status.equals(JobStatus.FAILURE) ? ActivityState.FAILED : ActivityState.STOPPED;
+        ActiveJobNotificationHandler.INSTANCE.removeListener(this);
     }
 
     private void start(ActiveEvent event) {
@@ -119,16 +115,6 @@ public class FeedEventsListener implements IActiveEntityEventsListener
{
     }
 
     @Override
-    public EntityId getEntityId() {
-        return entityId;
-    }
-
-    @Override
-    public ActivityState getState() {
-        return state;
-    }
-
-    @Override
     public IActiveEventSubscriber subscribe(ActivityState state) throws HyracksDataException
{
         if (state != ActivityState.STARTED && state != ActivityState.STOPPED) {
             throw new HyracksDataException("Can only wait for STARTED or STOPPED state");
@@ -150,11 +136,6 @@ public class FeedEventsListener implements IActiveEntityEventsListener
{
         return subscriber;
     }
 
-    @Override
-    public boolean isEntityUsingDataset(IDataset dataset) {
-        return datasets.contains(dataset);
-    }
-
     public String[] getSources() {
         return sources;
     }


Mime
View raw message