asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [9/9] asterixdb git commit: Feed Connection Refactoring
Date Sun, 19 Feb 2017 07:14:55 GMT
Feed Connection Refactoring

1. The feed subscription network using FeedJoint is removed.
2. FeedConnection metadata dataset is added (pkeys: dataverseName,
   feedName, datasetName).
3. Replaced the old intake job + collect job combination with one single
   job using SplitOperator.
4. Now one feed can connect to multiple datasets.
5. The disconnect feed job is replaced by ActiveManagerMessage.
6. The new feed life cycle is:
   - Create feed
   - Connect feed to dataset0, dataset1, dataset2, etc.
   - Start feed
   - Stop feed
   - Disconnect feed
 7. New feedEventListner framework by Abdullah

Change-Id: Ic36267eb9a10df21734ce1cc1f38583e23c9e8f0
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1259
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/fff200ca
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/fff200ca
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/fff200ca

Branch: refs/heads/master
Commit: fff200ca8e932edff58862fcc95502d65ceebd01
Parents: 692b8a8
Author: Abdullah Alamoudi <bamousaa@gmail.com>
Authored: Sat Feb 18 20:32:14 2017 -0800
Committer: abdullah alamoudi <bamousaa@gmail.com>
Committed: Sat Feb 18 23:13:25 2017 -0800

----------------------------------------------------------------------
 .../org/apache/asterix/active/ActiveEvent.java  |  27 +-
 .../org/apache/asterix/active/ActiveJob.java    |  92 ---
 .../active/ActiveJobNotificationHandler.java    | 147 ++---
 .../asterix/active/ActiveLifecycleListener.java |  11 +-
 .../apache/asterix/active/ActivityState.java    |  26 +-
 .../active/IActiveEntityEventsListener.java     |  43 +-
 .../asterix/active/IActiveEventSubscriber.java  |  48 ++
 ...ceRandomPartitioningFeedComputationRule.java |   5 +-
 .../optimizer/rules/UnnestToDataScanRule.java   |   7 +-
 .../apache/asterix/app/external/FeedJoint.java  | 212 ------
 .../app/external/FeedWorkCollection.java        | 140 ----
 .../asterix/app/translator/QueryTranslator.java | 491 +++++---------
 .../apache/asterix/utils/FeedOperations.java    | 397 ++++++++++++
 .../metadata_dataset/metadata_dataset.1.adm     |  27 +-
 .../metadata_datatype/metadata_datatype.1.adm   | 131 ++--
 .../basic/metadata_index/metadata_index.1.adm   |  27 +-
 .../metadata_selfjoin/metadata_selfjoin.1.adm   |   1 +
 .../big_object_feed_20M.2.update.aql            |   2 +
 .../typed_adapter/typed_adapter.4.update.aql    |   2 +
 .../change-feed-with-meta-csv.2.update.aql      |   4 +-
 ...a-pk-in-meta-index-after-ingest.2.update.aql |   2 +
 ...ta-pk-in-meta-index-after-ingest.4.query.aql |   2 +-
 ...h-meta-pk-in-meta-index-in-meta.2.update.aql |   2 +
 ...index-with-missing-after-ingest.2.update.aql |   2 +
 ...-pk-in-meta-open-index-in-value.2.update.aql |   2 +
 ...in-meta-open-index-with-missing.2.update.aql |   2 +
 ...hange-feed-with-meta-pk-in-meta.2.update.aql |   2 +
 ...feed-with-meta-with-mixed-index.2.update.aql |   2 +
 .../feeds/change-feed/change-feed.2.update.aql  |   2 +
 .../feeds/connect-feed/connect-feed.0.ddl.aql   |  46 ++
 .../connect-feed/connect-feed.1.update.aql      |  29 +
 .../connect-feed/connect-feed.2.update.aql      |  20 +
 .../feeds/connect-feed/connect-feed.3.query.aql |  20 +
 .../feeds/connect-feed/connect-feed.4.ddl.aql   |  19 +
 .../feed-push-socket.2.update.aql               |   2 +
 .../feed-push-socket.3.server.aql               |   2 +-
 .../feed-push-socket.5.update.aql               |   1 +
 .../feed-with-external-function.1.ddl.aql       |   3 +-
 .../feed-with-external-function.3.update.aql    |   5 +-
 ...external-parser-with-open-index.4.update.aql |   2 +
 ...al-parser-with-two-open-indexes.4.update.aql |   2 +
 .../feed-with-external-parser.4.update.aql      |   2 +
 .../feed-with-filtered-dataset.2.update.aql     |   2 +
 .../feed-with-meta-pk-in-meta.2.update.aql      |   2 +
 .../feed-with-multiple-indexes.4.update.aql     |   2 +
 .../feeds/feeds_02/feeds_02.2.update.aql        |   6 +-
 .../queries/feeds/feeds_03/feeds_03.1.ddl.aql   |   7 +-
 .../feeds/feeds_03/feeds_03.2.update.aql        |   5 +
 .../queries/feeds/feeds_03/feeds_03.3.query.aql |   3 +-
 .../feeds/feeds_07/feeds_07.2.update.aql        |   6 +-
 .../feeds/feeds_08/feeds_08.2.update.aql        |   4 +-
 .../feeds/feeds_09/feeds_09.2.update.aql        |   6 +-
 .../feeds/feeds_10/feeds_10.2.update.aql        |   4 +-
 .../feeds/feeds_11/feeds_11.2.update.aql        |   8 +-
 .../feeds/feeds_12/feeds_12.2.update.aql        |   8 +-
 .../issue_230_feeds.2.update.aql                |   8 +-
 .../revised-tweet-parser.2.update.aql           |   2 +
 .../feeds/start-feed/start-feed.1.ddl.aql       |  59 ++
 .../feeds/start-feed/start-feed.2.update.aql    |  33 +
 .../feeds/start-feed/start-feed.3.server.aql    |  26 +
 .../feeds/start-feed/start-feed.4.sleep.aql     |  26 +
 .../feeds/start-feed/start-feed.5.update.aql    |  30 +
 .../feeds/start-feed/start-feed.6.query.aql     |  32 +
 .../feeds/start-feed/start-feed.7.server.aql    |  27 +
 .../feeds/start-feed/start-feed.8.ddl.aql       |  28 +
 .../start-started-feed.0.ddl.aql                |  40 ++
 .../start-started-feed.1.update.aql             |  21 +
 .../start-started-feed.2.sleep.aql              |  26 +
 .../start-started-feed.3.update.aql             |  21 +
 .../start-started-feed.4.ddl.aql                |  21 +
 .../stop-stopped-feed.0.ddl.aql                 |  41 ++
 .../stop-stopped-feed.1.update.aql              |  21 +
 .../stop-stopped-feed.2.sleep.aql               |  19 +
 .../stop-stopped-feed.3.update.aql              |  21 +
 .../stop-stopped-feed.4.sleep.aql               |  20 +
 .../stop-stopped-feed.5.update.aql              |  21 +
 .../twitter-feed/twitter-feed.2.update.aql      |   4 +-
 .../feeds/upsert-feed/upsert-feed.2.update.aql  |   2 +
 .../feeds/upsert-feed/upsert-feed.3.server.aql  |   2 +-
 .../feeds/upsert-feed/upsert-feed.5.update.aql  |   2 +-
 .../issue_251_dataset_hint_7.2.update.aql       |   6 +-
 ...taset_with_meta_primary_index-1.2.update.aql |   4 +-
 .../feeds/feeds_01/feeds_01.1.ddl.sqlpp         |   2 +-
 .../feeds/feeds_02/feeds_02.1.ddl.sqlpp         |   2 +-
 .../feeds/feeds_02/feeds_02.2.update.sqlpp      |   1 +
 .../feeds/feeds_03/feeds_03.1.ddl.sqlpp         |   4 +-
 .../feeds/feeds_03/feeds_03.2.update.sqlpp      |   4 +
 .../feeds/feeds_03/feeds_03.3.query.sqlpp       |   2 +-
 .../feeds/feeds_07/feeds_07.1.ddl.sqlpp         |   4 +-
 .../feeds/feeds_07/feeds_07.2.update.sqlpp      |   3 +-
 .../feeds/feeds_08/feeds_08.1.ddl.sqlpp         |   2 +-
 .../feeds/feeds_08/feeds_08.2.update.sqlpp      |   2 +
 .../feeds/feeds_09/feeds_09.1.ddl.sqlpp         |   2 +-
 .../feeds/feeds_09/feeds_09.2.update.sqlpp      |   1 +
 .../feeds/feeds_10/feeds_10.1.ddl.sqlpp         |   2 +-
 .../feeds/feeds_10/feeds_10.2.update.sqlpp      |   1 +
 .../feeds/feeds_11/feeds_11.1.ddl.sqlpp         |   2 +-
 .../feeds/feeds_11/feeds_11.2.update.sqlpp      |   1 +
 .../feeds/feeds_12/feeds_12.1.ddl.sqlpp         |   2 +-
 .../feeds/feeds_12/feeds_12.2.update.sqlpp      |   1 +
 .../issue_230_feeds/issue_230_feeds.1.ddl.sqlpp |   2 +-
 .../issue_230_feeds.2.update.sqlpp              |   1 +
 .../upsert-feed/upsert-feed.2.update.sqlpp      |   1 +
 .../upsert-feed/upsert-feed.5.update.sqlpp      |   1 +
 .../issue_251_dataset_hint_7.1.ddl.sqlpp        |   2 +-
 .../issue_251_dataset_hint_7.2.update.sqlpp     |   2 +
 .../change-feed-with-meta-pk-in-meta.4.adm      |   2 +-
 .../change-feed-with-meta-pk-in-meta.5.adm      |   2 +-
 .../feeds/connect-feed/connect-feed.1.adm       |   3 +
 .../results/feeds/feeds_01/feeds_01.1.adm       |   2 +-
 .../results/feeds/feeds_03/feeds_03.1.adm       |   2 +-
 .../results/feeds/start-feed/start-feed.1.adm   |   2 +
 .../results/types/any-object/any-object.2.adm   | 137 ++--
 .../src/test/resources/runtimets/testsuite.xml  |  22 +
 .../asterix/common/metadata/IDataset.java       |  23 +
 .../client/FileFeedSocketAdapterClient.java     |  13 +-
 .../external/feed/api/FeedOperationCounter.java |  59 --
 .../api/IActiveLifecycleEventSubscriber.java    |  40 --
 .../apache/asterix/external/feed/api/IFeed.java |   6 -
 .../asterix/external/feed/api/IFeedJoint.java   | 122 ----
 .../asterix/external/feed/api/IFeedWork.java    |  28 -
 .../feed/api/IFeedWorkEventListener.java        |  41 --
 .../external/feed/api/IFeedWorkManager.java     |  25 -
 .../external/feed/api/ISubscribableRuntime.java |  42 --
 .../ActiveLifecycleEventSubscriber.java         |  69 --
 .../feed/management/FeedConnectionId.java       |   2 +-
 .../feed/management/FeedConnectionRequest.java  |  59 +-
 .../feed/management/FeedEventsListener.java     | 646 +++----------------
 .../external/feed/management/FeedInfo.java      |  53 --
 .../external/feed/management/FeedJointKey.java  |  83 ---
 .../feed/management/FeedWorkManager.java        |  50 --
 .../external/feed/message/EndFeedMessage.java   | 100 ---
 .../external/feed/message/FeedMessage.java      |  42 --
 .../feed/runtime/CollectionRuntime.java         | 100 ---
 .../external/feed/runtime/IngestionRuntime.java |  70 +-
 .../feed/runtime/SubscribableRuntime.java       |  59 --
 .../external/feed/watch/FeedConnectJobInfo.java | 108 ----
 .../feed/watch/FeedEventSubscriber.java         |  64 ++
 .../external/feed/watch/FeedIntakeInfo.java     |  68 --
 .../external/feed/watch/NoOpSubscriber.java     |  54 ++
 .../twitter/TwitterRecordReaderFactory.java     |   9 +
 .../factory/SocketServerInputStreamFactory.java |   4 +-
 .../FeedCollectOperatorDescriptor.java          |  25 +-
 .../FeedCollectOperatorNodePushable.java        |  51 +-
 .../operators/FeedIntakeOperatorDescriptor.java |  46 +-
 .../FeedIntakeOperatorNodePushable.java         |  65 +-
 .../FeedMessageOperatorDescriptor.java          |  56 --
 .../FeedMessageOperatorNodePushable.java        | 173 -----
 .../apache/asterix/external/util/FeedUtils.java |   2 +
 .../external/feed/test/InputHandlerTest.java    |   3 +-
 .../typed_adapter/typed_adapter.2.update.aql    |   2 +
 .../feed_ingest/feed_ingest.1.ddl.aql           |   7 +-
 .../feed_ingest/feed_ingest.2.update.aql        |   9 +-
 .../feed_ingest/feed_ingest.4.query.aql         |   4 +-
 .../issue-1636/issue-1636.02.ddl.aql            |   2 +
 .../issue-1636/issue-1636.04.ddl.aql            |   1 +
 .../issue-1636/issue-1636.08.ddl.aql            |   2 +-
 .../dataset-with-meta-record.3.update.aql       |   3 +-
 .../dataset-with-meta-record.5.adm              |   2 +-
 .../aql/statement/SubscribeFeedStatement.java   |  51 +-
 .../asterix-lang-aql/src/main/javacc/AQL.jj     |  37 +-
 .../asterix/lang/common/base/Statement.java     |  13 +-
 .../common/statement/ConnectFeedStatement.java  |  40 +-
 .../common/statement/CreateFeedStatement.java   |  40 +-
 .../statement/CreatePrimaryFeedStatement.java   |  59 --
 .../statement/CreateSecondaryFeedStatement.java |  62 --
 .../common/statement/StartFeedStatement.java    |  61 ++
 .../common/statement/StopFeedStatement.java     |  59 ++
 .../lang/common/visitor/FormatPrintVisitor.java |  48 +-
 .../base/AbstractQueryExpressionVisitor.java    |  14 +-
 .../lang/common/visitor/base/ILangVisitor.java  |  11 +-
 .../asterix-lang-sqlpp/src/main/javacc/SQLPP.jj |  70 +-
 .../apache/asterix/metadata/MetadataCache.java  | 131 ++--
 .../asterix/metadata/MetadataManager.java       |  51 +-
 .../apache/asterix/metadata/MetadataNode.java   |  66 +-
 .../metadata/MetadataTransactionContext.java    |  20 +-
 .../asterix/metadata/api/IMetadataManager.java  |  17 +-
 .../asterix/metadata/api/IMetadataNode.java     |  11 +
 .../metadata/bootstrap/MetadataBootstrap.java   |   2 +-
 .../bootstrap/MetadataPrimaryIndexes.java       |  11 +-
 .../metadata/bootstrap/MetadataRecordTypes.java | 111 +---
 .../metadata/declared/FeedDataSource.java       |  17 +-
 .../metadata/declared/MetadataManagerUtil.java  |  10 +
 .../metadata/declared/MetadataProvider.java     |   6 +
 .../asterix/metadata/entities/Dataset.java      |   7 +-
 .../apache/asterix/metadata/entities/Feed.java  |  31 +-
 .../metadata/entities/FeedConnection.java       | 117 ++++
 .../FeedConnectionTupleTranslator.java          | 179 +++++
 .../FeedTupleTranslator.java                    | 195 ++----
 .../metadata/feeds/FeedMetadataUtil.java        | 250 -------
 .../asterix/metadata/feeds/FeedOperations.java  | 182 ------
 .../metadata/utils/MetadataLockManager.java     |  29 +
 .../typed_adapter/typed_adapter.1.ddl.aql       |   4 +-
 .../typed_adapter/typed_adapter.2.update.aql    |   6 +-
 .../typed_adapter/typed_adapter.3.query.aql     |   4 +-
 .../feed_ingest/feed_ingest.1.ddl.aql           |  11 +-
 .../feed_ingest/feed_ingest.2.update.aql        |   7 +-
 .../feed_ingest/feed_ingest.3.query.aql         |   4 +-
 198 files changed, 3105 insertions(+), 4287 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java
index c907a36..2669990 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java
@@ -22,25 +22,27 @@ import org.apache.hyracks.api.job.JobId;
 
 public class ActiveEvent {
 
+    public enum Kind {
+        JOB_CREATED,
+        JOB_STARTED,
+        JOB_FINISHED,
+        PARTITION_EVENT,
+        EXTENSION_EVENT
+    }
+
     private final JobId jobId;
     private final EntityId entityId;
-    private final EventKind eventKind;
+    private final Kind eventKind;
     private final Object eventObject;
 
-    public enum EventKind {
-        JOB_START,
-        JOB_FINISH,
-        PARTITION_EVENT
-    }
-
-    public ActiveEvent(JobId jobId, ActiveEvent.EventKind eventKind, EntityId entityId, Object eventObject) {
+    public ActiveEvent(JobId jobId, Kind eventKind, EntityId entityId, Object eventObject) {
         this.jobId = jobId;
         this.entityId = entityId;
         this.eventKind = eventKind;
         this.eventObject = eventObject;
     }
 
-    public ActiveEvent(JobId jobId, ActiveEvent.EventKind eventKind, EntityId entityId) {
+    public ActiveEvent(JobId jobId, Kind eventKind, EntityId entityId) {
         this(jobId, eventKind, entityId, null);
     }
 
@@ -52,11 +54,16 @@ public class ActiveEvent {
         return entityId;
     }
 
-    public EventKind getEventKind() {
+    public Kind getEventKind() {
         return eventKind;
     }
 
     public Object getEventObject() {
         return eventObject;
     }
+
+    @Override
+    public String toString() {
+        return "JobId:" + jobId + ", " + "EntityId:" + entityId + ", " + "Kind" + eventKind;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJob.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJob.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJob.java
deleted file mode 100644
index 1e3eca1..0000000
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJob.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.active;
-
-import java.io.Serializable;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobSpecification;
-
-public class ActiveJob implements Serializable {
-    private static final long serialVersionUID = 1L;
-    private static final Logger LOGGER = Logger.getLogger(ActiveJob.class.getName());
-    protected final EntityId entityId;
-    protected JobId jobId;
-    protected final Serializable jobObject;
-    protected ActivityState state;
-    protected JobSpecification spec;
-
-    public ActiveJob(EntityId entityId, JobId jobId, ActivityState state, Serializable jobInfo, JobSpecification spec) {
-        this.entityId = entityId;
-        this.state = state;
-        this.jobId = jobId;
-        this.jobObject = jobInfo;
-        this.spec = spec;
-    }
-
-    public ActiveJob(EntityId entityId, ActivityState state, Serializable jobInfo, JobSpecification spec) {
-        this.entityId = entityId;
-        this.state = state;
-        this.jobObject = jobInfo;
-        this.spec = spec;
-    }
-
-    public JobId getJobId() {
-        return jobId;
-    }
-
-    public void setJobId(JobId jobId) {
-        this.jobId = jobId;
-    }
-
-    public ActivityState getState() {
-        return state;
-    }
-
-    public void setState(ActivityState state) {
-        this.state = state;
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info(this + " is in " + state + " state.");
-        }
-    }
-
-    public Object getJobObject() {
-        return jobObject;
-    }
-
-    public JobSpecification getSpec() {
-        return spec;
-    }
-
-    public void setSpec(JobSpecification spec) {
-        this.spec = spec;
-    }
-
-    @Override
-    public String toString() {
-        return jobId + " [" + jobObject + "]";
-    }
-
-    public EntityId getEntityId() {
-        return entityId;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
index 0d1d8ab..e4491bd 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
@@ -24,7 +24,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.asterix.active.ActiveEvent.EventKind;
+import org.apache.asterix.active.ActiveEvent.Kind;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
@@ -36,7 +36,7 @@ public class ActiveJobNotificationHandler implements Runnable {
     private static final boolean DEBUG = false;
     private final LinkedBlockingQueue<ActiveEvent> eventInbox;
     private final Map<EntityId, IActiveEntityEventsListener> entityEventListeners;
-    private final Map<JobId, ActiveJob> jobId2ActiveJobInfos;
+    private final Map<JobId, EntityId> jobId2ActiveJobInfos;
 
     private ActiveJobNotificationHandler() {
         this.eventInbox = new LinkedBlockingQueue<>();
@@ -51,16 +51,20 @@ public class ActiveJobNotificationHandler implements Runnable {
         while (!Thread.interrupted()) {
             try {
                 ActiveEvent event = getEventInbox().take();
-                ActiveJob jobInfo = jobId2ActiveJobInfos.get(event.getJobId());
-                EntityId entityId = jobInfo.getEntityId();
-                IActiveEntityEventsListener listener = entityEventListeners.get(entityId);
-                if (DEBUG) {
-                    LOGGER.log(Level.WARNING, "Next event is of type " + event.getEventKind());
-                    LOGGER.log(Level.WARNING, "Notifying the listener");
-                }
-                listener.notify(event);
-                if (event.getEventKind() == EventKind.JOB_FINISH) {
-                    removeJob(event.getJobId(), listener);
+                EntityId entityId = jobId2ActiveJobInfos.get(event.getJobId());
+                if (entityId != null) {
+                    IActiveEntityEventsListener listener = entityEventListeners.get(entityId);
+                    LOGGER.log(Level.FINER, "Next event is of type " + event.getEventKind());
+                    LOGGER.log(Level.FINER, "Notifying the listener");
+                    listener.notify(event);
+                    if (event.getEventKind() == Kind.JOB_FINISHED) {
+                        LOGGER.log(Level.FINER, "Removing the job");
+                        jobId2ActiveJobInfos.remove(event.getJobId());
+                        LOGGER.log(Level.FINER, "Removing the listener since it is not active anymore");
+                        entityEventListeners.remove(listener.getEntityId());
+                    }
+                } else {
+                    LOGGER.log(Level.SEVERE, "Entity not found for received message for job " + event.getJobId());
                 }
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
@@ -71,29 +75,6 @@ public class ActiveJobNotificationHandler implements Runnable {
         LOGGER.log(Level.INFO, "Stopped " + ActiveJobNotificationHandler.class.getSimpleName());
     }
 
-    public void removeJob(JobId jobId, IActiveEntityEventsListener listener) {
-        removeFinishedJob(jobId, listener);
-        removeInactiveListener(listener);
-    }
-
-    private void removeFinishedJob(JobId jobId, IActiveEntityEventsListener listener) {
-        if (!listener.isEntityActive()) {
-            if (DEBUG) {
-                LOGGER.log(Level.WARNING, "Remove job" + jobId);
-            }
-            jobId2ActiveJobInfos.remove(jobId);
-        }
-    }
-
-    private void removeInactiveListener(IActiveEntityEventsListener listener) {
-        if (!listener.isEntityActive()) {
-            if (DEBUG) {
-                LOGGER.log(Level.WARNING, "Removing the listener since it is not active anymore");
-            }
-            entityEventListeners.remove(listener.getEntityId());
-        }
-    }
-
     public IActiveEntityEventsListener getActiveEntityListener(EntityId entityId) {
         if (DEBUG) {
             LOGGER.log(Level.WARNING, "getActiveEntityListener(EntityId entityId) was called with entity " + entityId);
@@ -103,61 +84,27 @@ public class ActiveJobNotificationHandler implements Runnable {
         return entityEventListeners.get(entityId);
     }
 
-    public synchronized ActiveJob[] getActiveJobs() {
-        if (DEBUG) {
-            LOGGER.log(Level.WARNING, "getActiveJobs()  was called");
-            LOGGER.log(Level.WARNING, "Number of jobs found: " + jobId2ActiveJobInfos.size());
-        }
-        return jobId2ActiveJobInfos.values().toArray(new ActiveJob[jobId2ActiveJobInfos.size()]);
-    }
-
-    public boolean isActiveJob(JobId jobId) {
-        if (DEBUG) {
-            LOGGER.log(Level.WARNING, "isActiveJob(JobId jobId) called with jobId: " + jobId);
-            boolean found = jobId2ActiveJobInfos.get(jobId) != null;
-            LOGGER.log(Level.WARNING, "Job was found to be: " + (found ? "Active" : "Inactive"));
-        }
-        return jobId2ActiveJobInfos.get(jobId) != null;
-    }
-
     public EntityId getEntity(JobId jobId) {
-        ActiveJob jobInfo = jobId2ActiveJobInfos.get(jobId);
-        return jobInfo == null ? null : jobInfo.getEntityId();
+        return jobId2ActiveJobInfos.get(jobId);
     }
 
     public void notifyJobCreation(JobId jobId, JobSpecification jobSpecification) {
-        if (DEBUG) {
-            LOGGER.log(Level.WARNING,
-                    "notifyJobCreation(JobId jobId, JobSpecification jobSpecification) was called with jobId = "
-                            + jobId);
-        }
+        LOGGER.log(Level.FINER,
+                "notifyJobCreation(JobId jobId, JobSpecification jobSpecification) was called with jobId = " + jobId);
         Object property = jobSpecification.getProperty(ACTIVE_ENTITY_PROPERTY_NAME);
-        if (property == null || !(property instanceof ActiveJob)) {
-            if (DEBUG) {
-                LOGGER.log(Level.WARNING, "Job was is not active. property found to be: " + property);
-            }
+        if (property == null || !(property instanceof EntityId)) {
+            LOGGER.log(Level.FINER, "Job was is not active. property found to be: " + property);
             return;
-        } else {
-            monitorJob(jobId, (ActiveJob) property);
         }
-        if (DEBUG) {
-            boolean found = jobId2ActiveJobInfos.get(jobId) != null;
-            LOGGER.log(Level.WARNING, "Job was found to be: " + (found ? "Active" : "Inactive"));
-        }
-        ActiveJob jobInfo = jobId2ActiveJobInfos.get(jobId);
-        if (jobInfo != null) {
-            EntityId entityId = jobInfo.getEntityId();
-            IActiveEntityEventsListener listener = entityEventListeners.get(entityId);
-            listener.notifyJobCreation(jobId, jobSpecification);
-            if (DEBUG) {
-                LOGGER.log(Level.WARNING, "Listener was notified" + jobId);
-            }
-        } else {
-            if (DEBUG) {
-                LOGGER.log(Level.WARNING,
-                        "Listener was not notified since it was not registered for the job " + jobId);
-            }
+        EntityId entityId = (EntityId) property;
+        monitorJob(jobId, entityId);
+        boolean found = jobId2ActiveJobInfos.get(jobId) != null;
+        LOGGER.log(Level.FINER, "Job was found to be: " + (found ? "Active" : "Inactive"));
+        IActiveEntityEventsListener listener = entityEventListeners.get(entityId);
+        if (listener != null) {
+            listener.notify(new ActiveEvent(jobId, Kind.JOB_CREATED, entityId, jobSpecification));
         }
+        LOGGER.log(Level.FINER, "Listener was notified" + jobId);
     }
 
     public LinkedBlockingQueue<ActiveEvent> getEventInbox() {
@@ -174,9 +121,8 @@ public class ActiveJobNotificationHandler implements Runnable {
 
     public synchronized void registerListener(IActiveEntityEventsListener listener) throws HyracksDataException {
         if (DEBUG) {
-            LOGGER.log(Level.WARNING,
-                    "registerListener(IActiveEntityEventsListener listener) was called for the entity "
-                            + listener.getEntityId());
+            LOGGER.log(Level.FINER, "registerListener(IActiveEntityEventsListener listener) was called for the entity "
+                    + listener.getEntityId());
         }
         if (entityEventListeners.containsKey(listener.getEntityId())) {
             throw new HyracksDataException(
@@ -185,13 +131,23 @@ public class ActiveJobNotificationHandler implements Runnable {
         entityEventListeners.put(listener.getEntityId(), listener);
     }
 
-    public synchronized void monitorJob(JobId jobId, ActiveJob activeJob) {
+    public synchronized void unregisterListener(IActiveEntityEventsListener listener) throws HyracksDataException {
+        LOGGER.log(Level.FINER, "unregisterListener(IActiveEntityEventsListener listener) was called for the entity "
+                + listener.getEntityId());
+        IActiveEntityEventsListener registeredListener = entityEventListeners.remove(listener.getEntityId());
+        if (registeredListener == null) {
+            throw new HyracksDataException(
+                    "Active Entity Listener " + listener.getEntityId() + " hasn't been registered");
+        }
+    }
+
+    public synchronized void monitorJob(JobId jobId, EntityId activeJob) {
         if (DEBUG) {
             LOGGER.log(Level.WARNING, "monitorJob(JobId jobId, ActiveJob activeJob) called with job id: " + jobId);
             boolean found = jobId2ActiveJobInfos.get(jobId) != null;
             LOGGER.log(Level.WARNING, "Job was found to be: " + (found ? "Active" : "Inactive"));
         }
-        if (entityEventListeners.containsKey(activeJob.getEntityId())) {
+        if (entityEventListeners.containsKey(activeJob)) {
             if (jobId2ActiveJobInfos.containsKey(jobId)) {
                 LOGGER.severe("Job is already being monitored for job: " + jobId);
                 return;
@@ -199,22 +155,9 @@ public class ActiveJobNotificationHandler implements Runnable {
             if (DEBUG) {
                 LOGGER.log(Level.WARNING, "monitoring started for job id: " + jobId);
             }
-            jobId2ActiveJobInfos.put(jobId, activeJob);
         } else {
-            LOGGER.severe("No listener was found for the entity: " + activeJob.getEntityId());
-        }
-    }
-
-    public synchronized void unregisterListener(IActiveEntityEventsListener listener) throws HyracksDataException {
-        if (DEBUG) {
-            LOGGER.log(Level.WARNING,
-                    "unregisterListener(IActiveEntityEventsListener listener) was called for the entity "
-                            + listener.getEntityId());
-        }
-        IActiveEntityEventsListener registeredListener = entityEventListeners.remove(listener.getEntityId());
-        if (registeredListener == null) {
-            throw new HyracksDataException(
-                    "Active Entity Listener " + listener.getEntityId() + " hasn't been registered");
+            LOGGER.severe("No listener was found for the entity: " + activeJob);
         }
+        jobId2ActiveJobInfos.put(jobId, activeJob);
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveLifecycleListener.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveLifecycleListener.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveLifecycleListener.java
index fad30fa..6a10b0c 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveLifecycleListener.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveLifecycleListener.java
@@ -24,6 +24,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.asterix.active.ActiveEvent.Kind;
 import org.apache.asterix.active.message.ActivePartitionMessage;
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.IJobLifecycleListener;
@@ -48,7 +49,7 @@ public class ActiveLifecycleListener implements IJobLifecycleListener {
     public synchronized void notifyJobStart(JobId jobId) throws HyracksException {
         EntityId entityId = ActiveJobNotificationHandler.INSTANCE.getEntity(jobId);
         if (entityId != null) {
-            jobEventInbox.add(new ActiveEvent(jobId, ActiveEvent.EventKind.JOB_START, entityId));
+            jobEventInbox.add(new ActiveEvent(jobId, Kind.JOB_STARTED, entityId));
         }
     }
 
@@ -56,7 +57,7 @@ public class ActiveLifecycleListener implements IJobLifecycleListener {
     public synchronized void notifyJobFinish(JobId jobId) throws HyracksException {
         EntityId entityId = ActiveJobNotificationHandler.INSTANCE.getEntity(jobId);
         if (entityId != null) {
-            jobEventInbox.add(new ActiveEvent(jobId, ActiveEvent.EventKind.JOB_FINISH, entityId));
+            jobEventInbox.add(new ActiveEvent(jobId, Kind.JOB_FINISHED, entityId));
         } else {
             if (LOGGER.isLoggable(Level.INFO)) {
                 LOGGER.info("NO NEED TO NOTIFY JOB FINISH!");
@@ -70,10 +71,8 @@ public class ActiveLifecycleListener implements IJobLifecycleListener {
     }
 
     public void receive(ActivePartitionMessage message) {
-        if (ActiveJobNotificationHandler.INSTANCE.isActiveJob(message.getJobId())) {
-            jobEventInbox.add(new ActiveEvent(message.getJobId(), ActiveEvent.EventKind.PARTITION_EVENT,
-                    message.getActiveRuntimeId().getEntityId(), message));
-        }
+        jobEventInbox.add(new ActiveEvent(message.getJobId(), Kind.PARTITION_EVENT,
+                message.getActiveRuntimeId().getEntityId(), message));
     }
 
     public void stop() {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java
index 1301535..c8abb84 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java
@@ -18,11 +18,25 @@
  */
 package org.apache.asterix.active;
 
-// TODO: Document the state machine and its transition.
 public enum ActivityState {
-    CREATED,
-    INACTIVE,
-    ACTIVE,
-    UNDER_RECOVERY,
-    ENDED
+    /**
+     * The starting state and a possible terminal state. Next state can only be {@code ActivityState.STARTING}
+     */
+    STOPPED,
+    /**
+     * A terminal state
+     */
+    FAILED,
+    /**
+     * An intermediate state. Next state can only be {@code ActivityState.STARTED} or {@code ActivityState.FAILED}
+     */
+    STARTING,
+    /**
+     * An intermediate state. Next state can only be {@code ActivityState.STOPPING} or {@code ActivityState.FAILED}
+     */
+    STARTED,
+    /**
+     * An intermediate state. Next state can only be {@code ActivityState.STOPPED} or {@code ActivityState.FAILED}
+     */
+    STOPPING
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
index d0fb5e8..ee8e776 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
@@ -18,19 +18,48 @@
  */
 package org.apache.asterix.active;
 
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.asterix.common.metadata.IDataset;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public interface IActiveEntityEventsListener {
 
-    public void notify(ActiveEvent message);
+    /**
+     * Notify the listener that an event related to the entity has taken place
+     * Examples of such events include
+     * 1. Job created
+     * 2. Job completed
+     * 3. Partition event
+     *
+     * @param event
+     *            the event that took place
+     */
+    void notify(ActiveEvent event);
 
-    public void notifyJobCreation(JobId jobId, JobSpecification jobSpecification);
+    /**
+     * @return the state of the entity
+     */
+    ActivityState getState();
 
-    public boolean isEntityActive();
+    /**
+     * get a subscriber that waits till state has been reached.
+     *
+     * @param state
+     *            the desired state
+     * @throws HyracksDataException
+     *             a failure happened while waiting for the state
+     */
+    IActiveEventSubscriber subscribe(ActivityState state) throws HyracksDataException;
 
-    public EntityId getEntityId();
+    /**
+     * @return the active entity id
+     */
+    EntityId getEntityId();
 
-    public boolean isEntityUsingDataset(String dataverseName, String datasetName);
+    /**
+     * dataset
+     *
+     * @return
+     */
+    boolean isEntityUsingDataset(IDataset dataset);
 
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEventSubscriber.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEventSubscriber.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEventSubscriber.java
new file mode 100644
index 0000000..7be5737
--- /dev/null
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEventSubscriber.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active;
+
+/**
+ * An active event subscriber that subscribe to events related to active entity
+ */
+public interface IActiveEventSubscriber {
+
+    /**
+     * Notify the subscriber of a new event
+     * @param event
+     */
+    void notify(ActiveEvent event);
+
+    /**
+     * Checkcs whether the subscriber is done receiving events
+     * @return
+     */
+    boolean done();
+
+    /**
+     * Wait until the terminal event has been received
+     * @throws InterruptedException
+     */
+    void sync() throws InterruptedException;
+
+    /**
+     * Stop watching events
+     */
+    void unsubscribe();
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
index 6ff8a56..1a0ecd9 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
@@ -21,6 +21,7 @@ package org.apache.asterix.optimizer.rules;
 import org.apache.asterix.metadata.declared.DataSource;
 import org.apache.asterix.metadata.declared.FeedDataSource;
 import org.apache.asterix.metadata.entities.Feed;
+import org.apache.asterix.metadata.entities.FeedConnection;
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -59,8 +60,8 @@ public class IntroduceRandomPartitioningFeedComputationRule implements IAlgebrai
         }
 
         final FeedDataSource feedDataSource = (FeedDataSource) dataSource;
-        Feed feed = feedDataSource.getFeed();
-        if (feed.getAppliedFunction() == null) {
+        FeedConnection feedConnection = feedDataSource.getFeedConnection();
+        if (feedConnection.getAppliedFunctions() == null) {
             return false;
         }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
index 74e6ec5..46b421b 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
@@ -33,6 +33,7 @@ import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.Dataverse;
 import org.apache.asterix.metadata.entities.Feed;
+import org.apache.asterix.metadata.entities.FeedConnection;
 import org.apache.asterix.metadata.entities.FeedPolicyEntity;
 import org.apache.asterix.metadata.entities.InternalDatasetDetails;
 import org.apache.asterix.metadata.feeds.BuiltinFeedPolicies;
@@ -214,6 +215,8 @@ public class UnnestToDataScanRule implements IAlgebraicRewriteRule {
         Dataset dataset = metadataProvider.findDataset(aqlId.getDataverseName(), targetDataset);
         ARecordType feedOutputType = (ARecordType) metadataProvider.findType(aqlId.getDataverseName(), outputType);
         Feed sourceFeed = metadataProvider.findFeed(aqlId.getDataverseName(), sourceFeedName);
+        FeedConnection feedConnection = metadataProvider.findFeedConnection(aqlId.getDataverseName(), sourceFeedName,
+                targetDataset);
         ARecordType metaType = null;
         // Does dataset have meta?
         if (dataset.hasMetaPart()) {
@@ -260,8 +263,8 @@ public class UnnestToDataScanRule implements IAlgebraicRewriteRule {
         }
         FeedDataSource feedDataSource = new FeedDataSource(sourceFeed, aqlId, targetDataset, feedOutputType, metaType,
                 pkTypes, partitioningKeys, keyAccessScalarFunctionCallExpression, sourceFeed.getFeedId(),
-                sourceFeed.getFeedType(), FeedRuntimeType.valueOf(subscriptionLocation), locations.split(","),
-                context.getComputationNodeDomain());
+                 FeedRuntimeType.valueOf(subscriptionLocation), locations.split(","),
+                context.getComputationNodeDomain(), feedConnection);
         feedDataSource.getProperties().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, feedPolicy);
         return feedDataSource;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedJoint.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedJoint.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedJoint.java
deleted file mode 100644
index 0614d14..0000000
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedJoint.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.app.external;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.active.ActiveJobNotificationHandler;
-import org.apache.asterix.active.EntityId;
-import org.apache.asterix.app.external.FeedWorkCollection.SubscribeFeedWork;
-import org.apache.asterix.external.feed.api.IFeedJoint;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.management.FeedConnectionRequest;
-import org.apache.asterix.external.feed.management.FeedEventsListener;
-import org.apache.asterix.external.feed.management.FeedJointKey;
-import org.apache.asterix.external.feed.management.FeedWorkManager;
-import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
-
-public class FeedJoint implements IFeedJoint {
-
-    private static final long serialVersionUID = 1L;
-
-    private static final Logger LOGGER = Logger.getLogger(FeedJoint.class.getName());
-
-    /** A unique key associated with the feed point **/
-    private final FeedJointKey key;
-
-    /** The state associated with the FeedJoint **/
-    private State state;
-
-    /** A list of subscribers that receive data from this FeedJoint **/
-    private final List<FeedConnectionId> receivers;
-
-    /** The feedId on which the feedPoint resides **/
-    private final EntityId ownerFeedId;
-
-    /** A list of feed subscription requests submitted for subscribing to the FeedPoint's data **/
-    private final List<FeedConnectionRequest> connectionRequests;
-
-    private final FeedRuntimeType connectionLocation;
-
-    private final FeedJointType type;
-
-    private FeedConnectionId provider;
-
-    public FeedJoint(FeedJointKey key, EntityId ownerFeedId, FeedRuntimeType subscriptionLocation, FeedJointType type,
-            FeedConnectionId provider) {
-        this.key = key;
-        this.ownerFeedId = ownerFeedId;
-        this.type = type;
-        this.receivers = new ArrayList<FeedConnectionId>();
-        this.state = State.CREATED;
-        this.connectionLocation = subscriptionLocation;
-        this.connectionRequests = new ArrayList<FeedConnectionRequest>();
-        this.provider = provider;
-    }
-
-    @Override
-    public int hashCode() {
-        return key.hashCode();
-    }
-
-    @Override
-    public void addReceiver(FeedConnectionId connectionId) {
-        receivers.add(connectionId);
-    }
-
-    @Override
-    public void removeReceiver(FeedConnectionId connectionId) {
-        receivers.remove(connectionId);
-    }
-
-    @Override
-    public synchronized void addConnectionRequest(FeedConnectionRequest request) {
-        connectionRequests.add(request);
-        if (state.equals(State.ACTIVE)) {
-            handlePendingConnectionRequest();
-        }
-    }
-
-    @Override
-    public synchronized void setState(State state) {
-        if (this.state.equals(state)) {
-            return;
-        }
-        this.state = state;
-        if (this.state.equals(State.ACTIVE)) {
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Feed joint " + this + " is now " + State.ACTIVE);
-            }
-            handlePendingConnectionRequest();
-        }
-    }
-
-    private void handlePendingConnectionRequest() {
-        for (FeedConnectionRequest connectionRequest : connectionRequests) {
-            FeedConnectionId connectionId =
-                    new FeedConnectionId(connectionRequest.getReceivingFeedId(), connectionRequest.getTargetDataset());
-            try {
-                FeedEventsListener listener = (FeedEventsListener) ActiveJobNotificationHandler.INSTANCE
-                        .getActiveEntityListener(connectionId.getFeedId());
-                SubscribeFeedWork work = new SubscribeFeedWork(
-                        listener.getConnectionLocations(this, connectionRequest).toArray(new String[] {}),
-                        connectionRequest);
-                FeedWorkManager.INSTANCE.submitWork(work, new SubscribeFeedWork.FeedSubscribeWorkEventListener());
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Submitted feed connection request " + connectionRequest + " at feed joint " + this);
-                }
-                addReceiver(connectionId);
-            } catch (Exception e) {
-                if (LOGGER.isLoggable(Level.WARNING)) {
-                    LOGGER.warning("Unsuccessful attempt at submitting connection request " + connectionRequest
-                            + " at feed joint " + this + ". Message " + e.getMessage());
-                }
-                e.printStackTrace();
-            }
-        }
-        connectionRequests.clear();
-    }
-
-    @Override
-    public FeedConnectionId getReceiver(FeedConnectionId connectionId) {
-        for (FeedConnectionId cid : receivers) {
-            if (cid.equals(connectionId)) {
-                return cid;
-            }
-        }
-        return null;
-    }
-
-    @Override
-    public String toString() {
-        return key.toString() + " [" + connectionLocation + "]" + "[" + state + "]";
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (o == null) {
-            return false;
-        }
-        if (o == this) {
-            return true;
-        }
-        if (!(o instanceof FeedJoint)) {
-            return false;
-        }
-        return ((FeedJoint) o).getFeedJointKey().equals(this.key);
-    }
-
-    @Override
-    public EntityId getOwnerFeedId() {
-        return ownerFeedId;
-    }
-
-    @Override
-    public List<FeedConnectionRequest> getConnectionRequests() {
-        return connectionRequests;
-    }
-
-    @Override
-    public FeedRuntimeType getConnectionLocation() {
-        return connectionLocation;
-    }
-
-    @Override
-    public FeedJointType getType() {
-        return type;
-    }
-
-    @Override
-    public FeedConnectionId getProvider() {
-        return provider;
-    }
-
-    @Override
-    public List<FeedConnectionId> getReceivers() {
-        return receivers;
-    }
-
-    public FeedJointKey getKey() {
-        return key;
-    }
-
-    @Override
-    public synchronized State getState() {
-        return state;
-    }
-
-    @Override
-    public FeedJointKey getFeedJointKey() {
-        return key;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkCollection.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkCollection.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkCollection.java
deleted file mode 100644
index ec7c239..0000000
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkCollection.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.app.external;
-
-import java.io.PrintWriter;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.asterix.app.translator.DefaultStatementExecutorFactory;
-import org.apache.asterix.app.translator.QueryTranslator;
-import org.apache.asterix.common.context.IStorageComponentProvider;
-import org.apache.asterix.compiler.provider.AqlCompilationProvider;
-import org.apache.asterix.compiler.provider.ILangCompilationProvider;
-import org.apache.asterix.external.feed.api.IFeedWork;
-import org.apache.asterix.external.feed.api.IFeedWorkEventListener;
-import org.apache.asterix.external.feed.management.FeedConnectionRequest;
-import org.apache.asterix.external.feed.management.FeedConnectionRequest.ConnectionStatus;
-import org.apache.asterix.file.StorageComponentProvider;
-import org.apache.asterix.lang.aql.statement.SubscribeFeedStatement;
-import org.apache.asterix.lang.common.base.Statement;
-import org.apache.asterix.lang.common.statement.DataverseDecl;
-import org.apache.asterix.lang.common.struct.Identifier;
-import org.apache.asterix.runtime.utils.AppContextInfo;
-import org.apache.asterix.translator.IStatementExecutor;
-import org.apache.asterix.translator.SessionConfig;
-import org.apache.asterix.translator.SessionConfig.OutputFormat;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-
-/**
- * A collection of feed management related task, each represented as an implementation of {@code IFeedWork}.
- */
-public class FeedWorkCollection {
-
-    private static Logger LOGGER = Logger.getLogger(FeedWorkCollection.class.getName());
-    private static final ILangCompilationProvider compilationProvider = new AqlCompilationProvider();
-    private static final IStorageComponentProvider storageComponentProvider = new StorageComponentProvider();
-
-    /**
-     * The task of subscribing to a feed to obtain data.
-     */
-    public static class SubscribeFeedWork implements IFeedWork {
-
-        private final Runnable runnable;
-
-        private final FeedConnectionRequest request;
-
-        @Override
-        public Runnable getRunnable() {
-            return runnable;
-        }
-
-        public SubscribeFeedWork(String[] locations, FeedConnectionRequest request) {
-            this.runnable = new SubscribeFeedWorkRunnable(locations, request);
-            this.request = request;
-        }
-
-        private static class SubscribeFeedWorkRunnable implements Runnable {
-
-            private static final DefaultStatementExecutorFactory qtFactory = new DefaultStatementExecutorFactory();
-            private final FeedConnectionRequest request;
-            private final String[] locations;
-
-            public SubscribeFeedWorkRunnable(String[] locations, FeedConnectionRequest request) {
-                this.request = request;
-                this.locations = locations;
-            }
-
-            @Override
-            public void run() {
-                try {
-                    //TODO(amoudi): route PrintWriter to log file
-                    PrintWriter writer = new PrintWriter(System.err, true);
-                    SessionConfig pc = new SessionConfig(writer, OutputFormat.ADM);
-                    DataverseDecl dataverseDecl = new DataverseDecl(
-                            new Identifier(request.getReceivingFeedId().getDataverse()));
-                    SubscribeFeedStatement subscribeStmt = new SubscribeFeedStatement(locations, request);
-                    List<Statement> statements = new ArrayList<>();
-                    statements.add(dataverseDecl);
-                    statements.add(subscribeStmt);
-                    IStatementExecutor translator = qtFactory.create(statements, pc, compilationProvider,
-                            storageComponentProvider);
-                    translator.compileAndExecute(AppContextInfo.INSTANCE.getHcc(), null,
-                            QueryTranslator.ResultDelivery.IMMEDIATE);
-                    if (LOGGER.isEnabledFor(Level.INFO)) {
-                        LOGGER.info("Submitted connection requests for execution: " + request);
-                    }
-                } catch (Exception e) {
-                    if (LOGGER.isEnabledFor(Level.FATAL)) {
-                        LOGGER.fatal("Exception in executing " + request, e);
-                    }
-                }
-            }
-        }
-
-        public static class FeedSubscribeWorkEventListener implements IFeedWorkEventListener {
-
-            @Override
-            public void workFailed(IFeedWork work, Exception e) {
-                if (LOGGER.isEnabledFor(Level.WARN)) {
-                    LOGGER.warn(" Feed subscription request " + ((SubscribeFeedWork) work).request
-                            + " failed with exception " + e);
-                }
-            }
-
-            @Override
-            public void workCompleted(IFeedWork work) {
-                ((SubscribeFeedWork) work).request.setSubscriptionStatus(ConnectionStatus.ACTIVE);
-                if (LOGGER.isEnabledFor(Level.INFO)) {
-                    LOGGER.info(" Feed subscription request " + ((SubscribeFeedWork) work).request + " completed ");
-                }
-            }
-        }
-
-        public FeedConnectionRequest getRequest() {
-            return request;
-        }
-
-        @Override
-        public String toString() {
-            return "SubscribeFeedWork for [" + request + "]";
-        }
-    }
-}


Mime
View raw message