asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [2/9] asterixdb git commit: Feed Connection Refactoring
Date Sun, 19 Feb 2017 07:14:48 GMT
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java
index 78a7d6f..01ade10 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java
@@ -35,6 +35,7 @@ import org.apache.asterix.metadata.entities.DatasourceAdapter;
 import org.apache.asterix.metadata.entities.Datatype;
 import org.apache.asterix.metadata.entities.Dataverse;
 import org.apache.asterix.metadata.entities.Feed;
+import org.apache.asterix.metadata.entities.FeedConnection;
 import org.apache.asterix.metadata.entities.FeedPolicyEntity;
 import org.apache.asterix.metadata.entities.Function;
 import org.apache.asterix.metadata.entities.Index;
@@ -53,28 +54,34 @@ public class MetadataCache {
     // Default life time period of a temp dataset. It is 30 days.
     private final static long TEMP_DATASET_INACTIVE_TIME_THRESHOLD = 3600 * 24 * 30 * 1000L;
     // Key is dataverse name.
-    protected final Map<String, Dataverse> dataverses = new HashMap<String, Dataverse>();
+    protected final Map<String, Dataverse> dataverses = new HashMap<>();
     // Key is dataverse name. Key of value map is dataset name.
-    protected final Map<String, Map<String, Dataset>> datasets = new HashMap<String, Map<String, Dataset>>();
+    protected final Map<String, Map<String, Dataset>> datasets = new HashMap<>();
     // Key is dataverse name. Key of value map is dataset name. Key of value map of value map is index name.
-    protected final Map<String, Map<String, Map<String, Index>>> indexes = new HashMap<String, Map<String, Map<String, Index>>>();
+    protected final Map<String, Map<String, Map<String, Index>>> indexes =
+            new HashMap<>();
     // Key is dataverse name. Key of value map is datatype name.
-    protected final Map<String, Map<String, Datatype>> datatypes = new HashMap<String, Map<String, Datatype>>();
+    protected final Map<String, Map<String, Datatype>> datatypes = new HashMap<>();
     // Key is dataverse name.
-    protected final Map<String, NodeGroup> nodeGroups = new HashMap<String, NodeGroup>();
+    protected final Map<String, NodeGroup> nodeGroups = new HashMap<>();
     // Key is function Identifier . Key of value map is function name.
-    protected final Map<FunctionSignature, Function> functions = new HashMap<FunctionSignature, Function>();
+    protected final Map<FunctionSignature, Function> functions = new HashMap<>();
     // Key is adapter dataverse. Key of value map is the adapter name
-    protected final Map<String, Map<String, DatasourceAdapter>> adapters = new HashMap<String, Map<String, DatasourceAdapter>>();
+    protected final Map<String, Map<String, DatasourceAdapter>> adapters =
+            new HashMap<>();
 
     // Key is DataverseName, Key of the value map is the Policy name
-    protected final Map<String, Map<String, FeedPolicyEntity>> feedPolicies = new HashMap<String, Map<String, FeedPolicyEntity>>();
+    protected final Map<String, Map<String, FeedPolicyEntity>> feedPolicies =
+            new HashMap<>();
     // Key is library dataverse. Key of value map is the library name
-    protected final Map<String, Map<String, Library>> libraries = new HashMap<String, Map<String, Library>>();
+    protected final Map<String, Map<String, Library>> libraries = new HashMap<>();
     // Key is library dataverse. Key of value map is the feed name
-    protected final Map<String, Map<String, Feed>> feeds = new HashMap<String, Map<String, Feed>>();
+    protected final Map<String, Map<String, Feed>> feeds = new HashMap<>();
     // Key is DataverseName, Key of the value map is the Policy name
-    protected final Map<String, Map<String, CompactionPolicy>> compactionPolicies = new HashMap<String, Map<String, CompactionPolicy>>();
+    protected final Map<String, Map<String, CompactionPolicy>> compactionPolicies =
+            new HashMap<>();
+    // Key is DataverseName, Key of value map is feedConnectionId
+    protected final Map<String, Map<String, FeedConnection>> feedConnections = new HashMap<>();
 
     // Atomically executes all metadata operations in ctx's log.
     public void commit(MetadataTransactionContext ctx) {
@@ -163,7 +170,7 @@ public class MetadataCache {
 
                 Map<String, Dataset> m = datasets.get(dataset.getDataverseName());
                 if (m == null) {
-                    m = new HashMap<String, Dataset>();
+                    m = new HashMap<>();
                     datasets.put(dataset.getDataverseName(), m);
                 }
                 if (!m.containsKey(dataset.getDatasetName())) {
@@ -184,7 +191,7 @@ public class MetadataCache {
         synchronized (datatypes) {
             Map<String, Datatype> m = datatypes.get(datatype.getDataverseName());
             if (m == null) {
-                m = new HashMap<String, Datatype>();
+                m = new HashMap<>();
                 datatypes.put(datatype.getDataverseName(), m);
             }
             if (!m.containsKey(datatype.getDatatypeName())) {
@@ -207,7 +214,7 @@ public class MetadataCache {
         synchronized (compactionPolicy) {
             Map<String, CompactionPolicy> p = compactionPolicies.get(compactionPolicy.getDataverseName());
             if (p == null) {
-                p = new HashMap<String, CompactionPolicy>();
+                p = new HashMap<>();
                 p.put(compactionPolicy.getPolicyName(), compactionPolicy);
                 compactionPolicies.put(compactionPolicy.getDataverseName(), p);
             } else {
@@ -244,7 +251,8 @@ public class MetadataCache {
                                             datatypes.remove(dataverse.getDataverseName());
                                             adapters.remove(dataverse.getDataverseName());
                                             compactionPolicies.remove(dataverse.getDataverseName());
-                                            List<FunctionSignature> markedFunctionsForRemoval = new ArrayList<FunctionSignature>();
+                                            List<FunctionSignature> markedFunctionsForRemoval =
+                                                    new ArrayList<>();
                                             for (FunctionSignature signature : functions.keySet()) {
                                                 if (signature.getNamespace().equals(dataverse.getDataverseName())) {
                                                     markedFunctionsForRemoval.add(signature);
@@ -371,7 +379,7 @@ public class MetadataCache {
     }
 
     public List<Dataset> getDataverseDatasets(String dataverseName) {
-        List<Dataset> retDatasets = new ArrayList<Dataset>();
+        List<Dataset> retDatasets = new ArrayList<>();
         synchronized (datasets) {
             Map<String, Dataset> m = datasets.get(dataverseName);
             if (m == null) {
@@ -385,7 +393,7 @@ public class MetadataCache {
     }
 
     public List<Index> getDatasetIndexes(String dataverseName, String datasetName) {
-        List<Index> retIndexes = new ArrayList<Index>();
+        List<Index> retIndexes = new ArrayList<>();
         synchronized (datasets) {
             Map<String, Index> map = indexes.get(dataverseName).get(datasetName);
             if (map == null) {
@@ -398,28 +406,13 @@ public class MetadataCache {
         }
     }
 
-    /**
-     * Represents a logical operation against the metadata.
-     */
-    protected class MetadataLogicalOperation {
-        // Entity to be added/dropped.
-        public final IMetadataEntity<?> entity;
-        // True for add, false for drop.
-        public final boolean isAdd;
-
-        public MetadataLogicalOperation(IMetadataEntity<?> entity, boolean isAdd) {
-            this.entity = entity;
-            this.isAdd = isAdd;
-        }
-    };
-
     protected void doOperation(MetadataLogicalOperation op) {
         if (op.isAdd) {
             op.entity.addToCache(this);
         } else {
             op.entity.dropFromCache(this);
         }
-    }
+    };
 
     protected void undoOperation(MetadataLogicalOperation op) {
         if (!op.isAdd) {
@@ -431,8 +424,8 @@ public class MetadataCache {
 
     public Function addFunctionIfNotExists(Function function) {
         synchronized (functions) {
-            FunctionSignature signature = new FunctionSignature(function.getDataverseName(), function.getName(),
-                    function.getArity());
+            FunctionSignature signature =
+                    new FunctionSignature(function.getDataverseName(), function.getName(), function.getArity());
             Function fun = functions.get(signature);
             if (fun == null) {
                 return functions.put(signature, function);
@@ -443,8 +436,8 @@ public class MetadataCache {
 
     public Function dropFunction(Function function) {
         synchronized (functions) {
-            FunctionSignature signature = new FunctionSignature(function.getDataverseName(), function.getName(),
-                    function.getArity());
+            FunctionSignature signature =
+                    new FunctionSignature(function.getDataverseName(), function.getName(), function.getArity());
             Function fun = functions.get(signature);
             if (fun == null) {
                 return null;
@@ -457,7 +450,7 @@ public class MetadataCache {
         synchronized (feedPolicy) {
             Map<String, FeedPolicyEntity> p = feedPolicies.get(feedPolicy.getDataverseName());
             if (p == null) {
-                p = new HashMap<String, FeedPolicyEntity>();
+                p = new HashMap<>();
                 p.put(feedPolicy.getPolicyName(), feedPolicy);
                 feedPolicies.put(feedPolicy.getDataverseName(), p);
             } else {
@@ -481,10 +474,10 @@ public class MetadataCache {
 
     public DatasourceAdapter addAdapterIfNotExists(DatasourceAdapter adapter) {
         synchronized (adapters) {
-            Map<String, DatasourceAdapter> adaptersInDataverse = adapters
-                    .get(adapter.getAdapterIdentifier().getNamespace());
+            Map<String, DatasourceAdapter> adaptersInDataverse =
+                    adapters.get(adapter.getAdapterIdentifier().getNamespace());
             if (adaptersInDataverse == null) {
-                adaptersInDataverse = new HashMap<String, DatasourceAdapter>();
+                adaptersInDataverse = new HashMap<>();
                 adapters.put(adapter.getAdapterIdentifier().getNamespace(), adaptersInDataverse);
             }
             DatasourceAdapter adapterObject = adaptersInDataverse.get(adapter.getAdapterIdentifier().getName());
@@ -497,8 +490,8 @@ public class MetadataCache {
 
     public DatasourceAdapter dropAdapter(DatasourceAdapter adapter) {
         synchronized (adapters) {
-            Map<String, DatasourceAdapter> adaptersInDataverse = adapters
-                    .get(adapter.getAdapterIdentifier().getNamespace());
+            Map<String, DatasourceAdapter> adaptersInDataverse =
+                    adapters.get(adapter.getAdapterIdentifier().getNamespace());
             if (adaptersInDataverse != null) {
                 return adaptersInDataverse.remove(adapter.getAdapterIdentifier().getName());
             }
@@ -512,7 +505,7 @@ public class MetadataCache {
             boolean needToAddd = (libsInDataverse == null || libsInDataverse.get(library.getName()) != null);
             if (needToAddd) {
                 if (libsInDataverse == null) {
-                    libsInDataverse = new HashMap<String, Library>();
+                    libsInDataverse = new HashMap<>();
                     libraries.put(library.getDataverseName(), libsInDataverse);
                 }
                 return libsInDataverse.put(library.getDataverseName(), library);
@@ -531,8 +524,37 @@ public class MetadataCache {
         }
     }
 
+    public FeedConnection addFeedConnectionIfNotExists(FeedConnection feedConnection) {
+        synchronized (feedConnections) {
+            Map<String, FeedConnection> feedConnsInDataverse = feedConnections.get(feedConnection.getDataverseName());
+            if (feedConnsInDataverse == null) {
+                feedConnections.put(feedConnection.getDataverseName(), new HashMap<>());
+                feedConnsInDataverse = feedConnections.get(feedConnection.getDataverseName());
+            }
+            return feedConnsInDataverse.put(feedConnection.getConnectionId(), feedConnection);
+        }
+    }
+
+    public FeedConnection dropFeedConnection(FeedConnection feedConnection) {
+        synchronized (feedConnections) {
+            Map<String, FeedConnection> feedConnsInDataverse = feedConnections.get(feedConnection.getDataverseName());
+            if (feedConnsInDataverse != null) {
+                return feedConnsInDataverse.remove(feedConnection.getConnectionId());
+            } else {
+                return null;
+            }
+        }
+    }
+
     public Feed addFeedIfNotExists(Feed feed) {
-        return null;
+        synchronized (feeds) {
+            Map<String, Feed> feedsInDataverse = feeds.get(feed.getDataverseName());
+            if (feedsInDataverse == null) {
+                feeds.put(feed.getDataverseName(), new HashMap<>());
+                feedsInDataverse = feeds.get(feed.getDataverseName());
+            }
+            return feedsInDataverse.put(feed.getFeedName(), feed);
+        }
     }
 
     public Feed dropFeed(Feed feed) {
@@ -548,12 +570,12 @@ public class MetadataCache {
     private Index addIndexIfNotExistsInternal(Index index) {
         Map<String, Map<String, Index>> datasetMap = indexes.get(index.getDataverseName());
         if (datasetMap == null) {
-            datasetMap = new HashMap<String, Map<String, Index>>();
+            datasetMap = new HashMap<>();
             indexes.put(index.getDataverseName(), datasetMap);
         }
         Map<String, Index> indexMap = datasetMap.get(index.getDatasetName());
         if (indexMap == null) {
-            indexMap = new HashMap<String, Index>();
+            indexMap = new HashMap<>();
             datasetMap.put(index.getDatasetName(), indexMap);
         }
         if (!indexMap.containsKey(index.getIndexName())) {
@@ -583,4 +605,19 @@ public class MetadataCache {
             }
         }
     }
+
+    /**
+     * Represents a logical operation against the metadata.
+     */
+    protected class MetadataLogicalOperation {
+        // Entity to be added/dropped.
+        public final IMetadataEntity<?> entity;
+        // True for add, false for drop.
+        public final boolean isAdd;
+
+        public MetadataLogicalOperation(IMetadataEntity<?> entity, boolean isAdd) {
+            this.entity = entity;
+            this.isAdd = isAdd;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
index 59911d1..5d44d0b 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
@@ -42,6 +42,7 @@ import org.apache.asterix.metadata.entities.DatasourceAdapter;
 import org.apache.asterix.metadata.entities.Datatype;
 import org.apache.asterix.metadata.entities.Dataverse;
 import org.apache.asterix.metadata.entities.Feed;
+import org.apache.asterix.metadata.entities.FeedConnection;
 import org.apache.asterix.metadata.entities.FeedPolicyEntity;
 import org.apache.asterix.metadata.entities.Function;
 import org.apache.asterix.metadata.entities.Index;
@@ -779,10 +780,16 @@ public class MetadataManager implements IMetadataManager {
 
     @Override
     public void dropFeed(MetadataTransactionContext ctx, String dataverse, String feedName) throws MetadataException {
-        Feed feed;
+        Feed feed = null;
+        List<FeedConnection> feedConnections = null;
         try {
             feed = metadataNode.getFeed(ctx.getJobId(), dataverse, feedName);
+            feedConnections = metadataNode.getFeedConnections(ctx.getJobId(), dataverse, feedName);
             metadataNode.dropFeed(ctx.getJobId(), dataverse, feedName);
+            for (FeedConnection feedConnection : feedConnections) {
+                metadataNode.dropFeedConnection(ctx.getJobId(), dataverse, feedName, feedConnection.getDatasetName());
+                ctx.dropFeedConnection(dataverse, feedName, feedConnection.getDatasetName());
+            }
         } catch (RemoteException e) {
             throw new MetadataException(e);
         }
@@ -800,6 +807,48 @@ public class MetadataManager implements IMetadataManager {
     }
 
     @Override
+    public void addFeedConnection(MetadataTransactionContext ctx, FeedConnection feedConnection)
+            throws MetadataException {
+        try {
+            metadataNode.addFeedConnection(ctx.getJobId(), feedConnection);
+        } catch (RemoteException e) {
+            throw new MetadataException(e);
+        }
+        ctx.addFeedConnection(feedConnection);
+    }
+
+    @Override
+    public void dropFeedConnection(MetadataTransactionContext ctx, String dataverseName, String feedName,
+            String datasetName) throws MetadataException {
+        try {
+            metadataNode.dropFeedConnection(ctx.getJobId(), dataverseName, feedName, datasetName);
+        } catch (RemoteException e) {
+            throw new MetadataException(e);
+        }
+        ctx.dropFeedConnection(dataverseName, feedName, datasetName);
+    }
+
+    @Override
+    public FeedConnection getFeedConnection(MetadataTransactionContext ctx, String dataverseName, String feedName,
+            String datasetName) throws MetadataException {
+        try {
+            return metadataNode.getFeedConnection(ctx.getJobId(), dataverseName, feedName, datasetName);
+        } catch (RemoteException e) {
+            throw new MetadataException(e);
+        }
+    }
+
+    @Override
+    public List<FeedConnection> getFeedConections(MetadataTransactionContext ctx, String dataverseName, String feedName)
+            throws MetadataException {
+        try {
+            return metadataNode.getFeedConnections(ctx.getJobId(), dataverseName, feedName);
+        } catch (RemoteException e) {
+            throw new MetadataException(e);
+        }
+    }
+
+    @Override
     public List<DatasourceAdapter> getDataverseAdapters(MetadataTransactionContext mdTxnCtx, String dataverse)
             throws MetadataException {
         List<DatasourceAdapter> dataverseAdapters;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
index 8ecc0ed..51790e6 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
@@ -60,6 +60,7 @@ import org.apache.asterix.metadata.entities.DatasourceAdapter;
 import org.apache.asterix.metadata.entities.Datatype;
 import org.apache.asterix.metadata.entities.Dataverse;
 import org.apache.asterix.metadata.entities.Feed;
+import org.apache.asterix.metadata.entities.FeedConnection;
 import org.apache.asterix.metadata.entities.FeedPolicyEntity;
 import org.apache.asterix.metadata.entities.Function;
 import org.apache.asterix.metadata.entities.Index;
@@ -73,6 +74,7 @@ import org.apache.asterix.metadata.entitytupletranslators.DatasourceAdapterTuple
 import org.apache.asterix.metadata.entitytupletranslators.DatatypeTupleTranslator;
 import org.apache.asterix.metadata.entitytupletranslators.DataverseTupleTranslator;
 import org.apache.asterix.metadata.entitytupletranslators.ExternalFileTupleTranslator;
+import org.apache.asterix.metadata.entitytupletranslators.FeedConnectionTupleTranslator;
 import org.apache.asterix.metadata.entitytupletranslators.FeedPolicyTupleTranslator;
 import org.apache.asterix.metadata.entitytupletranslators.FeedTupleTranslator;
 import org.apache.asterix.metadata.entitytupletranslators.FunctionTupleTranslator;
@@ -487,11 +489,16 @@ public class MetadataNode implements IMetadataNode {
             }
 
             List<Feed> dataverseFeeds;
+            List<FeedConnection> feedConnections;
             Feed feed;
             dataverseFeeds = getDataverseFeeds(jobId, dataverseName);
-            // Drop all datasets in this dataverse.
+            // Drop all feeds&connections in this dataverse.
             for (int i = 0; i < dataverseFeeds.size(); i++) {
                 feed = dataverseFeeds.get(i);
+                feedConnections = getFeedConnections(jobId, dataverseName, feed.getFeedName());
+                for (FeedConnection feedConnection : feedConnections) {
+                    dropFeedConnection(jobId, dataverseName, feed.getFeedName(), feedConnection.getDatasetName());
+                }
                 dropFeed(jobId, dataverseName, feed.getFeedName());
             }
 
@@ -1480,6 +1487,63 @@ public class MetadataNode implements IMetadataNode {
     }
 
     @Override
+    public void addFeedConnection(JobId jobId, FeedConnection feedConnection) throws MetadataException {
+        try {
+            FeedConnectionTupleTranslator tupleReaderWriter = new FeedConnectionTupleTranslator(true);
+            ITupleReference feedConnTuple = tupleReaderWriter.getTupleFromMetadataEntity(feedConnection);
+            insertTupleIntoIndex(jobId, MetadataPrimaryIndexes.FEED_CONNECTION_DATASET, feedConnTuple);
+        } catch (IndexException | ACIDException | IOException e) {
+            throw new MetadataException(e);
+        }
+    }
+
+    @Override
+    public List<FeedConnection> getFeedConnections(JobId jobId, String dataverseName, String feedName)
+            throws MetadataException {
+        try {
+            ITupleReference searchKey = createTuple(dataverseName, feedName);
+            FeedConnectionTupleTranslator tupleReaderWriter = new FeedConnectionTupleTranslator(false);
+            List<FeedConnection> results = new ArrayList<>();
+            IValueExtractor<FeedConnection> valueExtractor = new MetadataEntityValueExtractor<>(tupleReaderWriter);
+            searchIndex(jobId, MetadataPrimaryIndexes.FEED_CONNECTION_DATASET, searchKey, valueExtractor, results);
+            return results;
+        } catch (IndexException | IOException e) {
+            throw new MetadataException(e);
+        }
+    }
+
+    @Override
+    public FeedConnection getFeedConnection(JobId jobId, String dataverseName, String feedName, String datasetName)
+            throws MetadataException {
+        try {
+            ITupleReference searchKey = createTuple(dataverseName, feedName, datasetName);
+            FeedConnectionTupleTranslator tupleReaderWriter = new FeedConnectionTupleTranslator(false);
+            List<FeedConnection> results = new ArrayList<>();
+            IValueExtractor<FeedConnection> valueExtractor = new MetadataEntityValueExtractor<>(tupleReaderWriter);
+            searchIndex(jobId, MetadataPrimaryIndexes.FEED_CONNECTION_DATASET, searchKey, valueExtractor, results);
+            if (!results.isEmpty()) {
+                return results.get(0);
+            }
+            return null;
+        } catch (IndexException | IOException e) {
+            throw new MetadataException(e);
+        }
+    }
+
+    @Override
+    public void dropFeedConnection(JobId jobId, String dataverseName, String feedName, String datasetName)
+            throws MetadataException {
+        try {
+            ITupleReference searchKey = createTuple(dataverseName, feedName, datasetName);
+            ITupleReference tuple = getTupleToBeDeleted(jobId, MetadataPrimaryIndexes.FEED_CONNECTION_DATASET,
+                    searchKey);
+            deleteTupleFromIndex(jobId, MetadataPrimaryIndexes.FEED_CONNECTION_DATASET, tuple);
+        } catch (IndexException | IOException | ACIDException e) {
+            throw new MetadataException(e);
+        }
+    }
+
+    @Override
     public void addFeed(JobId jobId, Feed feed) throws MetadataException, RemoteException {
         try {
             // Insert into the 'Feed' dataset.

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java
index 87c1c473..b2ec7f2 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java
@@ -24,14 +24,13 @@ import java.util.ArrayList;
 import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.common.transactions.JobId;
 import org.apache.asterix.external.dataset.adapter.AdapterIdentifier;
-import org.apache.asterix.external.feed.api.IFeed;
-import org.apache.asterix.external.feed.api.IFeed.FeedType;
 import org.apache.asterix.metadata.entities.CompactionPolicy;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.DatasourceAdapter;
 import org.apache.asterix.metadata.entities.Datatype;
 import org.apache.asterix.metadata.entities.Dataverse;
 import org.apache.asterix.metadata.entities.Feed;
+import org.apache.asterix.metadata.entities.FeedConnection;
 import org.apache.asterix.metadata.entities.FeedPolicyEntity;
 import org.apache.asterix.metadata.entities.Function;
 import org.apache.asterix.metadata.entities.Index;
@@ -231,17 +230,26 @@ public class MetadataTransactionContext extends MetadataCache {
     public void addFeed(Feed feed) {
         droppedCache.dropFeed(feed);
         logAndApply(new MetadataLogicalOperation(feed, true));
-
     }
 
-    public void dropFeed(String dataverseName, String feedName, IFeed.FeedType feedType) {
+    public void dropFeed(String dataverseName, String feedName) {
         Feed feed = null;
-        feed = new Feed(dataverseName, feedName, null, feedType, (feedType == FeedType.PRIMARY) ? feedName : null,
-                null, null);
+        feed = new Feed(dataverseName, feedName, null, null);
         droppedCache.addFeedIfNotExists(feed);
         logAndApply(new MetadataLogicalOperation(feed, false));
     }
 
+    public void addFeedConnection(FeedConnection feedConnection) {
+        droppedCache.dropFeedConnection(feedConnection);
+        logAndApply(new MetadataLogicalOperation(feedConnection, true));
+    }
+
+    public void dropFeedConnection(String dataverseName, String feedName, String datasetName) {
+        FeedConnection feedConnection = new FeedConnection(dataverseName, feedName, datasetName, null, null, null);
+        droppedCache.addFeedConnectionIfNotExists(feedConnection);
+        logAndApply(new MetadataLogicalOperation(feedConnection, false));
+    }
+
     @Override
     public void clear() {
         super.clear();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
index feb4db0..bd1c7d1 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
@@ -34,6 +34,7 @@ import org.apache.asterix.metadata.entities.DatasourceAdapter;
 import org.apache.asterix.metadata.entities.Datatype;
 import org.apache.asterix.metadata.entities.Dataverse;
 import org.apache.asterix.metadata.entities.Feed;
+import org.apache.asterix.metadata.entities.FeedConnection;
 import org.apache.asterix.metadata.entities.FeedPolicyEntity;
 import org.apache.asterix.metadata.entities.Function;
 import org.apache.asterix.metadata.entities.Index;
@@ -431,7 +432,6 @@ public interface IMetadataManager extends IMetadataBootstrap {
     void dropAdapter(MetadataTransactionContext ctx, String dataverseName, String name) throws MetadataException;
 
     /**
-     *
      * @param ctx
      *            MetadataTransactionContext of an active metadata transaction.
      * @param dataverseName
@@ -691,4 +691,19 @@ public interface IMetadataManager extends IMetadataBootstrap {
      * rebind it
      */
     void rebindMetadataNode();
+
+    /**
+     * Feed Connection Related Metadata operations
+     */
+    void addFeedConnection(MetadataTransactionContext ctx, FeedConnection feedConnection)
+            throws MetadataException;
+
+    void dropFeedConnection(MetadataTransactionContext ctx, String dataverseName, String feedName,
+            String datasetName) throws MetadataException;
+
+    FeedConnection getFeedConnection(MetadataTransactionContext ctx, String dataverseName, String feedName,
+            String datasetName) throws MetadataException;
+
+    List<FeedConnection> getFeedConections(MetadataTransactionContext ctx, String dataverseName, String feedName)
+            throws MetadataException;
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
index 41d0b6a..21b170d 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
@@ -35,6 +35,7 @@ import org.apache.asterix.metadata.entities.DatasourceAdapter;
 import org.apache.asterix.metadata.entities.Datatype;
 import org.apache.asterix.metadata.entities.Dataverse;
 import org.apache.asterix.metadata.entities.Feed;
+import org.apache.asterix.metadata.entities.FeedConnection;
 import org.apache.asterix.metadata.entities.FeedPolicyEntity;
 import org.apache.asterix.metadata.entities.Function;
 import org.apache.asterix.metadata.entities.Index;
@@ -764,4 +765,14 @@ public interface IMetadataNode extends Remote, Serializable {
     <T extends IExtensionMetadataEntity> List<T> getEntities(JobId jobId, IExtensionMetadataSearchKey searchKey)
             throws MetadataException, RemoteException;
 
+    void addFeedConnection(JobId jobId, FeedConnection feedConnection) throws MetadataException, RemoteException;
+
+    FeedConnection getFeedConnection(JobId jobId, String dataverseName, String feedName, String datasetName)
+            throws MetadataException, RemoteException;
+
+    void dropFeedConnection(JobId jobId, String dataverseName, String feedName, String datasetName)
+            throws MetadataException, RemoteException;
+
+    List<FeedConnection> getFeedConnections(JobId jobId, String dataverseName, String feedName)
+            throws MetadataException, RemoteException;
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index 6cd1f8b..02a092d 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -123,7 +123,7 @@ public class MetadataBootstrap {
                     MetadataPrimaryIndexes.FUNCTION_DATASET, MetadataPrimaryIndexes.DATASOURCE_ADAPTER_DATASET,
                     MetadataPrimaryIndexes.FEED_DATASET, MetadataPrimaryIndexes.FEED_POLICY_DATASET,
                     MetadataPrimaryIndexes.LIBRARY_DATASET, MetadataPrimaryIndexes.COMPACTION_POLICY_DATASET,
-                    MetadataPrimaryIndexes.EXTERNAL_FILE_DATASET };
+                    MetadataPrimaryIndexes.EXTERNAL_FILE_DATASET, MetadataPrimaryIndexes.FEED_CONNECTION_DATASET };
 
     private MetadataBootstrap() {
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java
index 833f3e5..d2a4b1c 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java
@@ -53,8 +53,8 @@ public class MetadataPrimaryIndexes {
             new MetadataIndexImmutableProperties("Library", 9, 9);
     public static final MetadataIndexImmutableProperties PROPERTIES_FEED =
             new MetadataIndexImmutableProperties("Feed", 10, 10);
-    public static final MetadataIndexImmutableProperties PROPERTIES_FEED_ACTIVITY_DATASET_ID =
-            new MetadataIndexImmutableProperties("FeedActivity", 11, 11);
+    public static final MetadataIndexImmutableProperties PROPERTIES_FEED_CONNECTION =
+            new MetadataIndexImmutableProperties("FeedConnection", 11, 11);
     public static final MetadataIndexImmutableProperties PROPERTIES_FEED_POLICY =
             new MetadataIndexImmutableProperties("FeedPolicy", 12, 12);
     public static final MetadataIndexImmutableProperties PROPERTIES_COMPACTION_POLICY =
@@ -129,6 +129,13 @@ public class MetadataPrimaryIndexes {
                     Arrays.asList(MetadataRecordTypes.FIELD_NAME_FILE_NUMBER)),
             0, MetadataRecordTypes.EXTERNAL_FILE_RECORDTYPE, true, new int[] { 0, 1, 2 });
 
+    public static final IMetadataIndex FEED_CONNECTION_DATASET = new MetadataIndex(PROPERTIES_FEED_CONNECTION, 4,
+            new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING },
+            Arrays.asList(Arrays.asList(MetadataRecordTypes.FIELD_NAME_DATAVERSE_NAME),
+                    Arrays.asList(MetadataRecordTypes.FIELD_NAME_FEED_NAME),
+                    Arrays.asList(MetadataRecordTypes.FIELD_NAME_DATASET_NAME)),
+            0, MetadataRecordTypes.FEED_CONNECTION_RECORDTYPE, true, new int[] { 0, 1, 2 });
+
     private MetadataPrimaryIndexes() {
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java
index a783c63..2a04b58 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java
@@ -61,7 +61,6 @@ public final class MetadataRecordTypes {
     public static final String FIELD_NAME_FILE_NUMBER = "FileNumber";
     public static final String FIELD_NAME_FILE_SIZE = "FileSize";
     public static final String FIELD_NAME_FILE_STRUCTURE = "FileStructure";
-    public static final String FIELD_NAME_FUNCTION = "Function";
     public static final String FIELD_NAME_GROUP_NAME = "GroupName";
     public static final String FIELD_NAME_HINTS = "Hints";
     public static final String FIELD_NAME_INDEX_NAME = "IndexName";
@@ -87,13 +86,10 @@ public final class MetadataRecordTypes {
     public static final String FIELD_NAME_PENDING_OP = "PendingOp";
     public static final String FIELD_NAME_POLICY_NAME = "PolicyName";
     public static final String FIELD_NAME_PRIMARY_KEY = "PrimaryKey";
-    public static final String FIELD_NAME_PRIMARY_TYPE_DETAILS = "PrimaryTypeDetails";
     public static final String FIELD_NAME_PROPERTIES = "Properties";
     public static final String FIELD_NAME_RECORD = "Record";
     public static final String FIELD_NAME_RETURN_TYPE = "ReturnType";
     public static final String FIELD_NAME_SEARCH_KEY = "SearchKey";
-    public static final String FIELD_NAME_SECONDARY_TYPE_DETAILS = "SecondaryTypeDetails";
-    public static final String FIELD_NAME_SOURCE_FEED_NAME = "SourceFeedName";
     public static final String FIELD_NAME_STATUS = "Status";
     public static final String FIELD_NAME_TAG = "Tag";
     public static final String FIELD_NAME_TIMESTAMP = "Timestamp";
@@ -102,6 +98,7 @@ public final class MetadataRecordTypes {
     public static final String FIELD_NAME_UNORDERED_LIST = "UnorderedList";
     public static final String FIELD_NAME_VALUE = "Value";
     public static final String FIELD_NAME_WORKING_MEMORY_SIZE = "WorkingMemorySize";
+    public static final String FIELD_NAME_APPLIED_FUNCTIONS = "AppliedFunctions";
 
     //---------------------------------- Record Types Creation ----------------------------------//
     //--------------------------------------- Properties ----------------------------------------//
@@ -148,35 +145,6 @@ public final class MetadataRecordTypes {
                     BuiltinType.ADATETIME, BuiltinType.AINT32 },
             //IsOpen?
             true);
-    //-------------------------------------- Feed Details ---------------------------------------//
-    public static final int FEED_DETAILS_ARECORD_FILESTRUCTURE_FIELD_INDEX = 0;
-    public static final int FEED_DETAILS_ARECORD_PARTITIONSTRATEGY_FIELD_INDEX = 1;
-    public static final int FEED_DETAILS_ARECORD_PARTITIONKEY_FIELD_INDEX = 2;
-    public static final int FEED_DETAILS_ARECORD_PRIMARYKEY_FIELD_INDEX = 3;
-    public static final int FEED_DETAILS_ARECORD_GROUPNAME_FIELD_INDEX = 4;
-    public static final int FEED_DETAILS_ARECORD_DATASOURCE_ADAPTER_FIELD_INDEX = 5;
-    public static final int FEED_DETAILS_ARECORD_PROPERTIES_FIELD_INDEX = 6;
-    public static final int FEED_DETAILS_ARECORD_FUNCTION_FIELD_INDEX = 7;
-    public static final int FEED_DETAILS_ARECORD_STATE_FIELD_INDEX = 8;
-    public static final int FEED_DETAILS_ARECORD_COMPACTION_POLICY_FIELD_INDEX = 9;
-    public static final int FEED_DETAILS_ARECORD_COMPACTION_POLICY_PROPERTIES_FIELD_INDEX = 10;
-    public static final ARecordType FEED_DETAILS_RECORDTYPE = createRecordType(
-            // RecordTypeName
-            null,
-            // FieldNames
-            new String[] { FIELD_NAME_FILE_STRUCTURE, FIELD_NAME_PARTITIONING_STRATEGY, FIELD_NAME_PARTITIONING_KEY,
-                    FIELD_NAME_PRIMARY_KEY, FIELD_NAME_GROUP_NAME, FIELD_NAME_DATASOURCE_ADAPTER, FIELD_NAME_PROPERTIES,
-                    FIELD_NAME_FUNCTION, FIELD_NAME_STATUS, FIELD_NAME_COMPACTION_POLICY,
-                    FIELD_NAME_COMPACTION_POLICY_PROPERTIES },
-            // FieldTypes
-            new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, new AOrderedListType(BuiltinType.ASTRING, null),
-                    new AOrderedListType(BuiltinType.ASTRING, null), BuiltinType.ASTRING, BuiltinType.ASTRING,
-                    new AOrderedListType(DATASOURCE_ADAPTER_PROPERTIES_RECORDTYPE, null),
-                    AUnionType.createUnknownableType(BuiltinType.ASTRING), BuiltinType.ASTRING, BuiltinType.ASTRING,
-                    new AOrderedListType(COMPACTION_POLICY_PROPERTIES_RECORDTYPE, null) },
-            //IsOpen?
-            true);
-
     //---------------------------------------- Dataset ------------------------------------------//
     public static final String RECORD_NAME_DATASET = "DatasetRecordType";
     public static final int DATASET_ARECORD_DATAVERSENAME_FIELD_INDEX = 0;
@@ -383,60 +351,45 @@ public final class MetadataRecordTypes {
                     BuiltinType.ASTRING },
             //IsOpen?
             true);
-    //---------------------------------- Primary Feed Details -----------------------------------//
-    public static final int FEED_TYPE_PRIMARY_ARECORD_ADAPTER_NAME_FIELD_INDEX = 0;
-    public static final int FEED_TYPE_PRIMARY_ARECORD_ADAPTER_CONFIGURATION_FIELD_INDEX = 1;
-    public static final ARecordType PRIMARY_FEED_DETAILS_RECORDTYPE = createRecordType(
-            // RecordTypeName
-            null,
-            // FieldNames
-            new String[] { FIELD_NAME_ADAPTER_NAME, FIELD_NAME_ADAPTER_CONFIGURATION },
-            // FieldTypes
-            new IAType[] { BuiltinType.ASTRING,
-                    new AUnorderedListType(DATASOURCE_ADAPTER_PROPERTIES_RECORDTYPE, null) },
-            //IsOpen?
-            true);
-    //--------------------------------- Secondary Feed Details ----------------------------------//
-    public static final int FEED_TYPE_SECONDARY_ARECORD_SOURCE_FEED_NAME_FIELD_INDEX = 0;
-    public static final ARecordType SECONDARY_FEED_DETAILS_RECORDTYPE = createRecordType(
-            // RecordTypeName
-            null,
-            // FieldNames
-            new String[] { FIELD_NAME_SOURCE_FEED_NAME },
-            // FieldTypes
-            new IAType[] { BuiltinType.ASTRING },
-            //IsOpen?
-            true);
-    //---------------------------------------- Feed Activity ------------------------------------//
+
+    //---------------------------------------- Feed Details ------------------------------------//
     public static final String RECORD_NAME_FEED = "FeedRecordType";
-    public static final int FEED_ACTIVITY_ARECORD_DATAVERSE_NAME_FIELD_INDEX = 0;
-    public static final int FEED_ACTIVITY_ARECORD_FEED_NAME_FIELD_INDEX = 1;
-    public static final int FEED_ACTIVITY_ARECORD_DATASET_NAME_FIELD_INDEX = 2;
-    public static final int FEED_ACTIVITY_ARECORD_ACTIVITY_ID_FIELD_INDEX = 3;
-    public static final int FEED_ACTIVITY_ARECORD_ACTIVITY_TYPE_FIELD_INDEX = 4;
-    public static final int FEED_ACTIVITY_ARECORD_DETAILS_FIELD_INDEX = 5;
-    public static final int FEED_ACTIVITY_ARECORD_LAST_UPDATE_TIMESTAMP_FIELD_INDEX = 6;
     public static final int FEED_ARECORD_DATAVERSE_NAME_FIELD_INDEX = 0;
     public static final int FEED_ARECORD_FEED_NAME_FIELD_INDEX = 1;
-    public static final int FEED_ARECORD_FUNCTION_FIELD_INDEX = 2;
-    public static final int FEED_ARECORD_FEED_TYPE_FIELD_INDEX = 3;
-    public static final int FEED_ARECORD_PRIMARY_TYPE_DETAILS_FIELD_INDEX = 4;
-    public static final int FEED_ARECORD_SECONDARY_TYPE_DETAILS_FIELD_INDEX = 5;
-    public static final int FEED_ARECORD_TIMESTAMP_FIELD_INDEX = 6;
-    public static final int FEED_ARECORD_PRIMARY_FIELD_DETAILS_ADAPTOR_NAME_FIELD_INDEX = 0;
-    public static final int FEED_ARECORD_PRIMARY_FIELD_DETAILS_ADAPTOR_CONFIGURATION_FIELD_INDEX = 1;
-    public static final int FEED_ARECORD_SECONDARY_FIELD_DETAILS_SOURCE_FEED_NAME_FIELD_INDEX = 0;
+    public static final int FEED_ARECORD_ADAPTOR_NAME_INDEX = 2;
+    public static final int FEED_ARECORD_ADAPTOR_CONFIG_INDEX = 3;
+    public static final int FEED_ARECORD_TIMESTAMP_FIELD_INDEX = 4;
     public static final ARecordType FEED_RECORDTYPE = createRecordType(
             // RecordTypeName
             RECORD_NAME_FEED,
             // FieldNames
-            new String[] { FIELD_NAME_DATAVERSE_NAME, FIELD_NAME_FEED_NAME, FIELD_NAME_FUNCTION, FIELD_NAME_FEED_TYPE,
-                    FIELD_NAME_PRIMARY_TYPE_DETAILS, FIELD_NAME_SECONDARY_TYPE_DETAILS, FIELD_NAME_TIMESTAMP },
+            new String[] { FIELD_NAME_DATAVERSE_NAME, FIELD_NAME_FEED_NAME, FIELD_NAME_ADAPTER_NAME,
+                    FIELD_NAME_ADAPTER_CONFIGURATION, FIELD_NAME_TIMESTAMP },
             // FieldTypes
-            new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING,
-                    AUnionType.createUnknownableType(BuiltinType.ASTRING), BuiltinType.ASTRING,
-                    AUnionType.createUnknownableType(PRIMARY_FEED_DETAILS_RECORDTYPE),
-                    AUnionType.createUnknownableType(SECONDARY_FEED_DETAILS_RECORDTYPE), BuiltinType.ASTRING },
+            new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
+                    new AUnorderedListType(FEED_ADAPTER_CONFIGURATION_RECORDTYPE, null), BuiltinType.ASTRING },
+            //IsOpen?
+            true);
+
+    //------------------------------------- Feed Connection ---------------------------------------//
+    public static final String RECORD_NAME_FEED_CONNECTION = "FeedConnectionRecordType";
+    public static final int FEED_CONN_DATAVERSE_NAME_FIELD_INDEX = 0;
+    public static final int FEED_CONN_FEED_NAME_FIELD_INDEX = 1;
+    public static final int FEED_CONN_DATASET_NAME_FIELD_INDEX = 2;
+    public static final int FEED_CONN_OUTPUT_TYPE_INDEX = 3;
+    public static final int FEED_CONN_APPLIED_FUNCTIONS_FIELD_INDEX = 4;
+    public static final int FEED_CONN_POLICY_FIELD_INDEX = 5;
+
+
+    public static final ARecordType FEED_CONNECTION_RECORDTYPE = createRecordType(
+            // RecordTypeName
+            RECORD_NAME_FEED_CONNECTION,
+            // FieldNames
+            new String[] { FIELD_NAME_DATAVERSE_NAME, FIELD_NAME_FEED_NAME, FIELD_NAME_DATASET_NAME,
+                    FIELD_NAME_RETURN_TYPE, FIELD_NAME_APPLIED_FUNCTIONS, FIELD_NAME_POLICY_NAME },
+            // FieldTypes
+            new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
+                    new AUnorderedListType(BuiltinType.ASTRING, null), BuiltinType.ASTRING},
             //IsOpen?
             true);
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
index 0d3d06d..26cec1e 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
@@ -28,6 +28,7 @@ import org.apache.asterix.external.operators.FeedCollectOperatorDescriptor;
 import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
 import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
 import org.apache.asterix.metadata.entities.Feed;
+import org.apache.asterix.metadata.entities.FeedConnection;
 import org.apache.asterix.metadata.entities.FeedPolicyEntity;
 import org.apache.asterix.metadata.feeds.BuiltinFeedPolicies;
 import org.apache.asterix.om.types.ARecordType;
@@ -54,29 +55,29 @@ public class FeedDataSource extends DataSource implements IMutationDataSource {
 
     private final Feed feed;
     private final EntityId sourceFeedId;
-    private final IFeed.FeedType sourceFeedType;
     private final FeedRuntimeType location;
     private final String targetDataset;
     private final String[] locations;
     private final int computeCardinality;
     private final List<IAType> pkTypes;
     private final List<ScalarFunctionCallExpression> keyAccessExpression;
+    private final FeedConnection feedConnection;
 
     public FeedDataSource(Feed feed, DataSourceId id, String targetDataset, IAType itemType, IAType metaType,
             List<IAType> pkTypes, List<List<String>> partitioningKeys,
             List<ScalarFunctionCallExpression> keyAccessExpression, EntityId sourceFeedId,
-            IFeed.FeedType sourceFeedType, FeedRuntimeType location, String[] locations, INodeDomain domain)
+            FeedRuntimeType location, String[] locations, INodeDomain domain, FeedConnection feedConnection)
             throws AlgebricksException {
         super(id, itemType, metaType, Type.FEED, domain);
         this.feed = feed;
         this.targetDataset = targetDataset;
         this.sourceFeedId = sourceFeedId;
-        this.sourceFeedType = sourceFeedType;
         this.location = location;
         this.locations = locations;
         this.pkTypes = pkTypes;
         this.keyAccessExpression = keyAccessExpression;
         this.computeCardinality = ClusterStateManager.INSTANCE.getParticipantNodes().size();
+        this.feedConnection = feedConnection;
         initFeedDataSource();
     }
 
@@ -120,10 +121,6 @@ public class FeedDataSource extends DataSource implements IMutationDataSource {
         }
     }
 
-    public IFeed.FeedType getSourceFeedType() {
-        return sourceFeedType;
-    }
-
     public int getComputeCardinality() {
         return computeCardinality;
     }
@@ -196,7 +193,7 @@ public class FeedDataSource extends DataSource implements IMutationDataSource {
             FeedConnectionId feedConnectionId = new FeedConnectionId(getId().getDataverseName(),
                     getId().getDatasourceName(), getTargetDataset());
             FeedCollectOperatorDescriptor feedCollector = new FeedCollectOperatorDescriptor(jobSpec, feedConnectionId,
-                    getSourceFeedId(), feedOutputType, feedDesc, feedPolicy.getProperties(), getLocation());
+                    feedOutputType, feedDesc, feedPolicy.getProperties(), getLocation());
 
             return new Pair<>(feedCollector, new AlgebricksAbsolutePartitionConstraint(getLocations()));
 
@@ -209,4 +206,8 @@ public class FeedDataSource extends DataSource implements IMutationDataSource {
     public boolean isScanAccessPathALeaf() {
         return true;
     }
+
+    public FeedConnection getFeedConnection() {
+        return feedConnection;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java
index f1a90c7..b647bb7 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java
@@ -29,6 +29,7 @@ import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.DatasourceAdapter;
 import org.apache.asterix.metadata.entities.Datatype;
 import org.apache.asterix.metadata.entities.Feed;
+import org.apache.asterix.metadata.entities.FeedConnection;
 import org.apache.asterix.metadata.entities.FeedPolicyEntity;
 import org.apache.asterix.metadata.entities.Index;
 import org.apache.asterix.metadata.entities.NodeGroup;
@@ -132,6 +133,15 @@ public class MetadataManagerUtil {
         }
     }
 
+    public static FeedConnection findFeedConnection(MetadataTransactionContext mdTxnCtx, String dataverse,
+            String feedName, String datasetName) throws AlgebricksException {
+        try {
+            return MetadataManager.INSTANCE.getFeedConnection(mdTxnCtx, dataverse, feedName, datasetName);
+        } catch (MetadataException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
     public static FeedPolicyEntity findFeedPolicy(MetadataTransactionContext mdTxnCtx, String dataverse,
             String policyName) throws AlgebricksException {
         try {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 1beaed0..f5c6d9a 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -68,6 +68,7 @@ import org.apache.asterix.metadata.entities.DatasourceAdapter;
 import org.apache.asterix.metadata.entities.Dataverse;
 import org.apache.asterix.metadata.entities.ExternalDatasetDetails;
 import org.apache.asterix.metadata.entities.Feed;
+import org.apache.asterix.metadata.entities.FeedConnection;
 import org.apache.asterix.metadata.entities.FeedPolicyEntity;
 import org.apache.asterix.metadata.entities.Index;
 import org.apache.asterix.metadata.entities.InternalDatasetDetails;
@@ -329,6 +330,11 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
         return MetadataManagerUtil.findFeed(mdTxnCtx, dataverse, feedName);
     }
 
+    public FeedConnection findFeedConnection(String dataverseName, String feedName, String datasetName)
+            throws AlgebricksException {
+        return MetadataManagerUtil.findFeedConnection(mdTxnCtx, dataverseName, feedName, datasetName);
+    }
+
     public FeedPolicyEntity findFeedPolicy(String dataverse, String policyName) throws AlgebricksException {
         return MetadataManagerUtil.findFeedPolicy(mdTxnCtx, dataverse, policyName);
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index d55fde5..2e328f9 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -29,6 +29,7 @@ import java.util.logging.Logger;
 import org.apache.asterix.active.ActiveJobNotificationHandler;
 import org.apache.asterix.active.IActiveEntityEventsListener;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.metadata.IDataset;
 import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
@@ -93,7 +94,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 /**
  * Metadata describing a dataset.
  */
-public class Dataset implements IMetadataEntity<Dataset> {
+public class Dataset implements IMetadataEntity<Dataset>, IDataset {
 
     /*
      * Constants
@@ -278,7 +279,9 @@ public class Dataset implements IMetadataEntity<Dataset> {
             // prepare job spec(s) that would disconnect any active feeds involving the dataset.
             IActiveEntityEventsListener[] activeListeners = ActiveJobNotificationHandler.INSTANCE.getEventListeners();
             for (IActiveEntityEventsListener listener : activeListeners) {
-                if (listener.isEntityActive() && listener.isEntityUsingDataset(dataverseName, datasetName)) {
+                IDataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(),
+                        dataverseName, datasetName);
+                if (listener.isEntityUsingDataset(ds)) {
                     throw new CompilationException(ErrorCode.COMPILATION_CANT_DROP_ACTIVE_DATASET,
                             RecordUtil.toFullyQualifiedName(dataverseName, datasetName),
                             listener.getEntityId().toString());

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Feed.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Feed.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Feed.java
index 1343e53..ea0e4eb 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Feed.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Feed.java
@@ -22,7 +22,6 @@ package org.apache.asterix.metadata.entities;
 import java.util.Map;
 
 import org.apache.asterix.active.EntityId;
-import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.external.feed.api.IFeed;
 import org.apache.asterix.metadata.MetadataCache;
 import org.apache.asterix.metadata.api.IMetadataEntity;
@@ -36,28 +35,18 @@ public class Feed implements IMetadataEntity<Feed>, IFeed {
 
     /** A unique identifier for the feed */
     private EntityId feedId;
-    /** The function that is to be applied on each incoming feed tuple **/
-    private FunctionSignature appliedFunction;
-    /** The type {@code FeedType} associated with the feed. **/
-    private IFeed.FeedType feedType;
     /** A string representation of the instance **/
     private String displayName;
     /** A string representation of the adapter name **/
     private String adapterName;
     /** Adapter configuration */
     private Map<String, String> adapterConfiguration;
-    /** Source primary feed */
-    private String sourceFeedName;
 
-    public Feed(String dataverseName, String feedName, FunctionSignature appliedFunction, IFeed.FeedType feedType,
-            String sourceFeedName, String adapterName, Map<String, String> configuration) {
+    public Feed(String dataverseName, String feedName,String adapterName, Map<String, String> configuration) {
         this.feedId = new EntityId(EXTENSION_NAME, dataverseName, feedName);
-        this.appliedFunction = appliedFunction;
-        this.feedType = feedType;
-        this.displayName = feedType + "(" + feedId + ")";
+        this.displayName = "(" + feedId + ")";
         this.adapterName = adapterName;
         this.adapterConfiguration = configuration;
-        this.sourceFeedName = sourceFeedName;
     }
 
     @Override
@@ -76,16 +65,6 @@ public class Feed implements IMetadataEntity<Feed>, IFeed {
     }
 
     @Override
-    public FunctionSignature getAppliedFunction() {
-        return appliedFunction;
-    }
-
-    @Override
-    public IFeed.FeedType getFeedType() {
-        return feedType;
-    }
-
-    @Override
     public boolean equals(Object other) {
         if (this == other) {
             return true;
@@ -104,7 +83,7 @@ public class Feed implements IMetadataEntity<Feed>, IFeed {
 
     @Override
     public String toString() {
-        return feedType + "(" + feedId + ")";
+        return feedId.toString();
     }
 
     @Override
@@ -126,8 +105,4 @@ public class Feed implements IMetadataEntity<Feed>, IFeed {
     public Map<String, String> getAdapterConfiguration() {
         return adapterConfiguration;
     }
-
-    public String getSourceFeedName() {
-        return sourceFeedName;
-    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/FeedConnection.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/FeedConnection.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/FeedConnection.java
new file mode 100644
index 0000000..b05e61e
--- /dev/null
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/FeedConnection.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.metadata.entities;
+
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.common.functions.FunctionSignature;
+import org.apache.asterix.external.util.FeedUtils;
+import org.apache.asterix.metadata.MetadataCache;
+import org.apache.asterix.metadata.api.IMetadataEntity;
+import org.apache.asterix.metadata.feeds.BuiltinFeedPolicies;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Feed connection records the feed --> dataset mapping.
+ */
+public class FeedConnection implements IMetadataEntity<FeedConnection> {
+
+    private static final long serialVersionUID = 1L;
+
+    private EntityId feedId;
+    private String connectionId;
+    private String dataverseName;
+    private String feedName;
+    private String datasetName;
+    private String policyName;
+    private String outputType;
+    private List<FunctionSignature> appliedFunctions;
+
+    public FeedConnection(String dataverseName, String feedName, String datasetName,
+            List<FunctionSignature> appliedFunctions, String policyName, String outputType) {
+        this.dataverseName = dataverseName;
+        this.feedName = feedName;
+        this.datasetName = datasetName;
+        this.appliedFunctions = appliedFunctions;
+        this.connectionId = feedName + ":" + datasetName;
+        this.policyName = policyName;
+        this.outputType = outputType;
+        this.feedId = new EntityId(FeedUtils.FEED_EXTENSION_NAME, dataverseName, feedName);
+    }
+
+    public List<FunctionSignature> getAppliedFunctions() {
+        return appliedFunctions;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+        if (!(other instanceof FeedConnection)) {
+            return false;
+        }
+        return ((FeedConnection) other).getConnectionId().equals(connectionId);
+    }
+
+    @Override
+    public int hashCode() {
+        return connectionId.hashCode();
+    }
+
+    @Override
+    public FeedConnection addToCache(MetadataCache cache) {
+        return null;
+    }
+
+    @Override
+    public FeedConnection dropFromCache(MetadataCache cache) {
+        return null;
+    }
+
+    public String getDataverseName() {
+        return dataverseName;
+    }
+
+    public String getDatasetName() {
+        return datasetName;
+    }
+
+    public String getConnectionId() {
+        return connectionId;
+    }
+
+    public String getFeedName() {
+        return feedName;
+    }
+
+    public String getPolicyName() {
+        return policyName;
+    }
+
+    public String getOutputType() {
+        return outputType;
+    }
+
+    public EntityId getFeedId() {
+        return feedId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedConnectionTupleTranslator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedConnectionTupleTranslator.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedConnectionTupleTranslator.java
new file mode 100644
index 0000000..e7fe5b4
--- /dev/null
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedConnectionTupleTranslator.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.metadata.entitytupletranslators;
+
+import org.apache.asterix.builders.IARecordBuilder;
+import org.apache.asterix.builders.UnorderedListBuilder;
+import org.apache.asterix.common.functions.FunctionSignature;
+import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
+import org.apache.asterix.metadata.MetadataException;
+import org.apache.asterix.metadata.bootstrap.MetadataPrimaryIndexes;
+import org.apache.asterix.metadata.bootstrap.MetadataRecordTypes;
+import org.apache.asterix.metadata.entities.FeedConnection;
+import org.apache.asterix.om.base.*;
+import org.apache.asterix.om.types.AUnorderedListType;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class FeedConnectionTupleTranslator extends AbstractTupleTranslator<FeedConnection> {
+
+    public static final int FEED_CONN_DATAVERSE_NAME_FIELD_INDEX = 0;
+    public static final int FEED_CONN_FEED_NAME_FIELD_INDEX = 1;
+    public static final int FEED_CONN_DATASET_NAME_FIELD_INDEX = 2;
+
+    public static final int FEED_CONN_PAYLOAD_TUPLE_FIELD_INDEX = 3;
+
+    private ISerializerDeserializer<ARecord> recordSerDes = SerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(MetadataRecordTypes.FEED_CONNECTION_RECORDTYPE);
+
+    public FeedConnectionTupleTranslator(boolean getTuple) {
+        super(getTuple, MetadataPrimaryIndexes.FEED_CONNECTION_DATASET.getFieldCount());
+    }
+
+    @Override
+    public FeedConnection getMetadataEntityFromTuple(ITupleReference frameTuple) throws MetadataException, IOException {
+        byte[] serRecord = frameTuple.getFieldData(FEED_CONN_PAYLOAD_TUPLE_FIELD_INDEX);
+        int recordStartOffset = frameTuple.getFieldStart(FEED_CONN_PAYLOAD_TUPLE_FIELD_INDEX);
+        int recordLength = frameTuple.getFieldLength(FEED_CONN_PAYLOAD_TUPLE_FIELD_INDEX);
+        ByteArrayInputStream stream = new ByteArrayInputStream(serRecord, recordStartOffset, recordLength);
+        DataInput in = new DataInputStream(stream);
+        ARecord feedConnRecord = recordSerDes.deserialize(in);
+        return createFeedConnFromRecord(feedConnRecord);
+    }
+
+    private FeedConnection createFeedConnFromRecord(ARecord feedConnRecord) {
+        String dataverseName = ((AString) feedConnRecord
+                .getValueByPos(MetadataRecordTypes.FEED_CONN_DATAVERSE_NAME_FIELD_INDEX)).getStringValue();
+        String feedName = ((AString) feedConnRecord.getValueByPos(MetadataRecordTypes.FEED_CONN_FEED_NAME_FIELD_INDEX))
+                .getStringValue();
+        String datasetName = ((AString) feedConnRecord
+                .getValueByPos(MetadataRecordTypes.FEED_CONN_DATASET_NAME_FIELD_INDEX)).getStringValue();
+        String outputType = ((AString) feedConnRecord.getValueByPos(MetadataRecordTypes.FEED_CONN_OUTPUT_TYPE_INDEX))
+                .getStringValue();
+        String policyName = ((AString) feedConnRecord.getValueByPos(MetadataRecordTypes.FEED_CONN_POLICY_FIELD_INDEX))
+                .getStringValue();
+        ArrayList<FunctionSignature> appliedFunctions = null;
+        Object o = feedConnRecord.getValueByPos(MetadataRecordTypes.FEED_CONN_APPLIED_FUNCTIONS_FIELD_INDEX);
+        IACursor cursor;
+
+        if (!(o instanceof ANull) && !(o instanceof AMissing)) {
+            appliedFunctions = new ArrayList<>();
+            FunctionSignature functionSignature;
+            cursor = ((AUnorderedList) feedConnRecord
+                    .getValueByPos(MetadataRecordTypes.FEED_CONN_APPLIED_FUNCTIONS_FIELD_INDEX)).getCursor();
+            while (cursor.next()) {
+                //TODO: allow different arity
+                functionSignature = new FunctionSignature(dataverseName, ((AString) cursor.get()).getStringValue(), 1);
+                appliedFunctions.add(functionSignature);
+            }
+        }
+
+        return new FeedConnection(dataverseName, feedName, datasetName, appliedFunctions, policyName, outputType);
+    }
+
+    @Override
+    public ITupleReference getTupleFromMetadataEntity(FeedConnection me) throws MetadataException, IOException {
+        tupleBuilder.reset();
+
+        // key: dataverse
+        aString.setValue(me.getDataverseName());
+        stringSerde.serialize(aString, tupleBuilder.getDataOutput());
+        tupleBuilder.addFieldEndOffset();
+
+        // key: feedName
+        aString.setValue(me.getFeedName());
+        stringSerde.serialize(aString, tupleBuilder.getDataOutput());
+        tupleBuilder.addFieldEndOffset();
+
+        // key: dataset
+        aString.setValue(me.getDatasetName());
+        stringSerde.serialize(aString, tupleBuilder.getDataOutput());
+        tupleBuilder.addFieldEndOffset();
+
+        recordBuilder.reset(MetadataRecordTypes.FEED_CONNECTION_RECORDTYPE);
+        // field dataverse
+        fieldValue.reset();
+        aString.setValue(me.getDataverseName());
+        stringSerde.serialize(aString, fieldValue.getDataOutput());
+        recordBuilder.addField(MetadataRecordTypes.FEED_CONN_DATAVERSE_NAME_FIELD_INDEX, fieldValue);
+
+        // field: feedId
+        fieldValue.reset();
+        aString.setValue(me.getFeedName());
+        stringSerde.serialize(aString, fieldValue.getDataOutput());
+        recordBuilder.addField(MetadataRecordTypes.FEED_CONN_FEED_NAME_FIELD_INDEX, fieldValue);
+
+        // field: dataset
+        fieldValue.reset();
+        aString.setValue(me.getDatasetName());
+        stringSerde.serialize(aString, fieldValue.getDataOutput());
+        recordBuilder.addField(MetadataRecordTypes.FEED_CONN_DATASET_NAME_FIELD_INDEX, fieldValue);
+
+        // field: outputType
+        fieldValue.reset();
+        aString.setValue(me.getOutputType());
+        stringSerde.serialize(aString, fieldValue.getDataOutput());
+        recordBuilder.addField(MetadataRecordTypes.FEED_CONN_OUTPUT_TYPE_INDEX, fieldValue);
+
+        // field: appliedFunctions
+        fieldValue.reset();
+        writeAppliedFunctionsField(recordBuilder, me, fieldValue);
+
+        // field: policyName
+        fieldValue.reset();
+        aString.setValue(me.getPolicyName());
+        stringSerde.serialize(aString, fieldValue.getDataOutput());
+        recordBuilder.addField(MetadataRecordTypes.FEED_CONN_POLICY_FIELD_INDEX, fieldValue);
+
+        recordBuilder.write(tupleBuilder.getDataOutput(), true);
+        tupleBuilder.addFieldEndOffset();
+
+        tuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray());
+        return tuple;
+    }
+
+    private void writeAppliedFunctionsField(IARecordBuilder rb, FeedConnection fc, ArrayBackedValueStorage buffer)
+            throws HyracksDataException {
+        UnorderedListBuilder listBuilder = new UnorderedListBuilder();
+        ArrayBackedValueStorage listEleBuffer = new ArrayBackedValueStorage();
+
+        listBuilder.reset((AUnorderedListType) MetadataRecordTypes.FEED_CONNECTION_RECORDTYPE
+                .getFieldTypes()[MetadataRecordTypes.FEED_CONN_APPLIED_FUNCTIONS_FIELD_INDEX]);
+        if (fc.getAppliedFunctions() != null) {
+            List<FunctionSignature> appliedFunctions = fc.getAppliedFunctions();
+            for (FunctionSignature af : appliedFunctions) {
+                aString.setValue(af.getName());
+                stringSerde.serialize(aString, listEleBuffer.getDataOutput());
+                listBuilder.addItem(listEleBuffer);
+            }
+        }
+        listBuilder.write(buffer.getDataOutput(), true);
+        rb.addField(MetadataRecordTypes.FEED_CONN_APPLIED_FUNCTIONS_FIELD_INDEX, buffer);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedTupleTranslator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedTupleTranslator.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedTupleTranslator.java
index dc0b9c9..4503e09 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedTupleTranslator.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedTupleTranslator.java
@@ -29,19 +29,14 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.asterix.builders.IARecordBuilder;
-import org.apache.asterix.builders.OrderedListBuilder;
 import org.apache.asterix.builders.RecordBuilder;
-import org.apache.asterix.common.functions.FunctionSignature;
-import org.apache.asterix.external.feed.api.IFeed;
-import org.apache.asterix.external.feed.api.IFeed.FeedType;
+import org.apache.asterix.builders.UnorderedListBuilder;
 import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
 import org.apache.asterix.metadata.MetadataException;
 import org.apache.asterix.metadata.bootstrap.MetadataPrimaryIndexes;
 import org.apache.asterix.metadata.bootstrap.MetadataRecordTypes;
 import org.apache.asterix.metadata.entities.Feed;
-import org.apache.asterix.om.base.AMissing;
 import org.apache.asterix.om.base.AMutableString;
-import org.apache.asterix.om.base.ANull;
 import org.apache.asterix.om.base.ARecord;
 import org.apache.asterix.om.base.AString;
 import org.apache.asterix.om.base.AUnorderedList;
@@ -67,8 +62,8 @@ public class FeedTupleTranslator extends AbstractTupleTranslator<Feed> {
     public static final int FEED_PAYLOAD_TUPLE_FIELD_INDEX = 2;
 
     @SuppressWarnings("unchecked")
-    private ISerializerDeserializer<ARecord> recordSerDes =
-            SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(MetadataRecordTypes.FEED_RECORDTYPE);
+    private ISerializerDeserializer<ARecord> recordSerDes = SerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(MetadataRecordTypes.FEED_RECORDTYPE);
 
     protected FeedTupleTranslator(boolean getTuple) {
         super(getTuple, MetadataPrimaryIndexes.FEED_DATASET.getFieldCount());
@@ -86,65 +81,30 @@ public class FeedTupleTranslator extends AbstractTupleTranslator<Feed> {
     }
 
     private Feed createFeedFromARecord(ARecord feedRecord) {
-        Feed feed = null;
-        String dataverseName =
-                ((AString) feedRecord.getValueByPos(MetadataRecordTypes.FEED_ARECORD_DATAVERSE_NAME_FIELD_INDEX))
-                        .getStringValue();
+        Feed feed;
+        String dataverseName = ((AString) feedRecord
+                .getValueByPos(MetadataRecordTypes.FEED_ARECORD_DATAVERSE_NAME_FIELD_INDEX)).getStringValue();
         String feedName = ((AString) feedRecord.getValueByPos(MetadataRecordTypes.FEED_ARECORD_FEED_NAME_FIELD_INDEX))
                 .getStringValue();
 
-        Object o = feedRecord.getValueByPos(MetadataRecordTypes.FEED_ARECORD_FUNCTION_FIELD_INDEX);
-        FunctionSignature signature = null;
-        if (!(o instanceof ANull) && !(o instanceof AMissing)) {
-            String functionName = ((AString) o).getStringValue();
-            signature = new FunctionSignature(dataverseName, functionName, 1);
+        AUnorderedList feedConfig = (AUnorderedList) feedRecord
+                .getValueByPos(MetadataRecordTypes.FEED_ARECORD_ADAPTOR_CONFIG_INDEX);
+        String adapterName = ((AString) feedRecord
+                .getValueByPos(MetadataRecordTypes.FEED_ARECORD_ADAPTOR_NAME_INDEX)).getStringValue();
+
+        IACursor cursor = feedConfig.getCursor();
+
+        // restore configurations
+        String key;
+        String value;
+        Map<String, String> adaptorConfiguration = new HashMap<>();
+        while (cursor.next()) {
+            ARecord field = (ARecord) cursor.get();
+            key = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_NAME_FIELD_INDEX)).getStringValue();
+            value = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_VALUE_FIELD_INDEX)).getStringValue();
+            adaptorConfiguration.put(key, value);
         }
-
-        String feedType = ((AString) feedRecord.getValueByPos(MetadataRecordTypes.FEED_ARECORD_FEED_TYPE_FIELD_INDEX))
-                .getStringValue();
-
-        IFeed.FeedType feedTypeEnum = IFeed.FeedType.valueOf(feedType.toUpperCase());
-        switch (feedTypeEnum) {
-            case PRIMARY: {
-                ARecord feedTypeDetailsRecord = (ARecord) feedRecord
-                        .getValueByPos(MetadataRecordTypes.FEED_ARECORD_PRIMARY_TYPE_DETAILS_FIELD_INDEX);
-                String adapterName = ((AString) feedTypeDetailsRecord
-                        .getValueByPos(MetadataRecordTypes.FEED_ARECORD_PRIMARY_FIELD_DETAILS_ADAPTOR_NAME_FIELD_INDEX))
-                                .getStringValue();
-
-                IACursor cursor = ((AUnorderedList) feedTypeDetailsRecord.getValueByPos(
-                        MetadataRecordTypes.FEED_ARECORD_PRIMARY_FIELD_DETAILS_ADAPTOR_CONFIGURATION_FIELD_INDEX))
-                                .getCursor();
-                String key;
-                String value;
-                Map<String, String> adaptorConfiguration = new HashMap<String, String>();
-                while (cursor.next()) {
-                    ARecord field = (ARecord) cursor.get();
-                    key = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_NAME_FIELD_INDEX))
-                            .getStringValue();
-                    value = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_VALUE_FIELD_INDEX))
-                            .getStringValue();
-                    adaptorConfiguration.put(key, value);
-                }
-                feed = new Feed(dataverseName, feedName, signature, FeedType.PRIMARY, feedName, adapterName,
-                        adaptorConfiguration);
-
-            }
-                break;
-            case SECONDARY: {
-                ARecord feedTypeDetailsRecord = (ARecord) feedRecord
-                        .getValueByPos(MetadataRecordTypes.FEED_ARECORD_SECONDARY_TYPE_DETAILS_FIELD_INDEX);
-
-                String sourceFeedName = ((AString) feedTypeDetailsRecord
-                        .getValueByPos(MetadataRecordTypes.FEED_TYPE_SECONDARY_ARECORD_SOURCE_FEED_NAME_FIELD_INDEX))
-                                .getStringValue();
-
-                feed = new Feed(dataverseName, feedName, signature, FeedType.SECONDARY, sourceFeedName, null, null);
-
-            }
-                break;
-        }
-
+        feed = new Feed(dataverseName, feedName, adapterName, adaptorConfiguration);
         return feed;
     }
 
@@ -162,37 +122,29 @@ public class FeedTupleTranslator extends AbstractTupleTranslator<Feed> {
 
         recordBuilder.reset(MetadataRecordTypes.FEED_RECORDTYPE);
 
-        // write field 0
+        // write dataverse name
         fieldValue.reset();
         aString.setValue(feed.getDataverseName());
         stringSerde.serialize(aString, fieldValue.getDataOutput());
         recordBuilder.addField(MetadataRecordTypes.FEED_ARECORD_DATAVERSE_NAME_FIELD_INDEX, fieldValue);
 
-        // write field 1
+        // write feed name
         fieldValue.reset();
         aString.setValue(feed.getFeedName());
         stringSerde.serialize(aString, fieldValue.getDataOutput());
         recordBuilder.addField(MetadataRecordTypes.FEED_ARECORD_FEED_NAME_FIELD_INDEX, fieldValue);
 
-        // write field 2
+        // adaptor name
         fieldValue.reset();
-        if (feed.getAppliedFunction() != null) {
-            aString.setValue(feed.getAppliedFunction().getName());
-            stringSerde.serialize(aString, fieldValue.getDataOutput());
-            recordBuilder.addField(MetadataRecordTypes.FEED_ARECORD_FUNCTION_FIELD_INDEX, fieldValue);
-        }
-
-        // write field 3
-        fieldValue.reset();
-        aString.setValue(feed.getFeedType().name().toUpperCase());
+        aString.setValue(feed.getAdapterName());
         stringSerde.serialize(aString, fieldValue.getDataOutput());
-        recordBuilder.addField(MetadataRecordTypes.FEED_ARECORD_FEED_TYPE_FIELD_INDEX, fieldValue);
+        recordBuilder.addField(MetadataRecordTypes.FEED_ARECORD_ADAPTOR_NAME_INDEX, fieldValue);
 
-        // write field 4/5
+        // write adaptor configuration
         fieldValue.reset();
-        writeFeedTypeDetailsRecordType(recordBuilder, feed, fieldValue);
+        writeFeedAdaptorField(recordBuilder, feed, fieldValue);
 
-        // write field 6
+        // write timestamp
         fieldValue.reset();
         aString.setValue(Calendar.getInstance().getTime().toString());
         stringSerde.serialize(aString, fieldValue.getDataOutput());
@@ -206,81 +158,32 @@ public class FeedTupleTranslator extends AbstractTupleTranslator<Feed> {
         return tuple;
     }
 
-    @SuppressWarnings("unchecked")
-    private void writeFeedTypeDetailsRecordType(IARecordBuilder recordBuilder, Feed feed,
-            ArrayBackedValueStorage fieldValue) throws HyracksDataException {
-
-        switch (feed.getFeedType()) {
-            case PRIMARY: {
-
-                IARecordBuilder primaryDetailsRecordBuilder = new RecordBuilder();
-                OrderedListBuilder listBuilder = new OrderedListBuilder();
-                ArrayBackedValueStorage primaryRecordfieldValue = new ArrayBackedValueStorage();
-                ArrayBackedValueStorage primaryRecordItemValue = new ArrayBackedValueStorage();
-                primaryDetailsRecordBuilder.reset(MetadataRecordTypes.PRIMARY_FEED_DETAILS_RECORDTYPE);
-
-                AMutableString aString = new AMutableString("");
-                ISerializerDeserializer<AString> stringSerde =
-                        SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING);
-
-                // write field 0
-                fieldValue.reset();
-                aString.setValue(feed.getAdapterName());
-                stringSerde.serialize(aString, primaryRecordfieldValue.getDataOutput());
-                primaryDetailsRecordBuilder.addField(
-                        MetadataRecordTypes.FEED_ARECORD_PRIMARY_FIELD_DETAILS_ADAPTOR_NAME_FIELD_INDEX,
-                        primaryRecordfieldValue);
-
-                // write field 1
-                listBuilder.reset((AUnorderedListType) MetadataRecordTypes.PRIMARY_FEED_DETAILS_RECORDTYPE
-                        .getFieldTypes()[MetadataRecordTypes.FEED_ARECORD_PRIMARY_FIELD_DETAILS_ADAPTOR_CONFIGURATION_FIELD_INDEX]);
-                for (Map.Entry<String, String> property : feed.getAdapterConfiguration().entrySet()) {
-                    String name = property.getKey();
-                    String value = property.getValue();
-                    primaryRecordItemValue.reset();
-                    writePropertyTypeRecord(name, value, primaryRecordItemValue.getDataOutput());
-                    listBuilder.addItem(primaryRecordItemValue);
-                }
-                primaryRecordfieldValue.reset();
-                listBuilder.write(primaryRecordfieldValue.getDataOutput(), true);
-                primaryDetailsRecordBuilder.addField(
-                        MetadataRecordTypes.FEED_ARECORD_PRIMARY_FIELD_DETAILS_ADAPTOR_CONFIGURATION_FIELD_INDEX,
-                        primaryRecordfieldValue);
-
-                primaryDetailsRecordBuilder.write(fieldValue.getDataOutput(), true);
-
-                recordBuilder.addField(MetadataRecordTypes.FEED_ARECORD_PRIMARY_TYPE_DETAILS_FIELD_INDEX, fieldValue);
-            }
-                break;
-
-            case SECONDARY:
-                IARecordBuilder secondaryDetailsRecordBuilder = new RecordBuilder();
-                ArrayBackedValueStorage secondaryFieldValue = new ArrayBackedValueStorage();
-                secondaryDetailsRecordBuilder.reset(MetadataRecordTypes.SECONDARY_FEED_DETAILS_RECORDTYPE);
-
-                // write field 0
-                fieldValue.reset();
-                aString.setValue(feed.getSourceFeedName());
-                stringSerde.serialize(aString, secondaryFieldValue.getDataOutput());
-                secondaryDetailsRecordBuilder.addField(
-                        MetadataRecordTypes.FEED_ARECORD_SECONDARY_FIELD_DETAILS_SOURCE_FEED_NAME_FIELD_INDEX,
-                        secondaryFieldValue);
-
-                secondaryDetailsRecordBuilder.write(fieldValue.getDataOutput(), true);
-                recordBuilder.addField(MetadataRecordTypes.FEED_ARECORD_SECONDARY_TYPE_DETAILS_FIELD_INDEX, fieldValue);
-                break;
+    private void writeFeedAdaptorField(IARecordBuilder recordBuilder, Feed feed,
+            ArrayBackedValueStorage fieldValueBuffer) throws HyracksDataException {
+        UnorderedListBuilder listBuilder = new UnorderedListBuilder();
+        ArrayBackedValueStorage listEleBuffer = new ArrayBackedValueStorage();
+
+        listBuilder.reset((AUnorderedListType) MetadataRecordTypes.FEED_RECORDTYPE
+                .getFieldTypes()[MetadataRecordTypes.FEED_ARECORD_ADAPTOR_CONFIG_INDEX]);
+        for (Map.Entry<String, String> property : feed.getAdapterConfiguration().entrySet()) {
+            String name = property.getKey();
+            String value = property.getValue();
+            listEleBuffer.reset();
+            writePropertyTypeRecord(name, value, listEleBuffer.getDataOutput());
+            listBuilder.addItem(listEleBuffer);
         }
-
+        listBuilder.write(fieldValueBuffer.getDataOutput(), true);
+        recordBuilder.addField(MetadataRecordTypes.FEED_ARECORD_ADAPTOR_CONFIG_INDEX, fieldValueBuffer);
     }
 
     @SuppressWarnings("unchecked")
     public void writePropertyTypeRecord(String name, String value, DataOutput out) throws HyracksDataException {
         IARecordBuilder propertyRecordBuilder = new RecordBuilder();
         ArrayBackedValueStorage fieldValue = new ArrayBackedValueStorage();
-        propertyRecordBuilder.reset(MetadataRecordTypes.FEED_ADAPTER_CONFIGURATION_RECORDTYPE);
+        propertyRecordBuilder.reset(MetadataRecordTypes.DATASOURCE_ADAPTER_PROPERTIES_RECORDTYPE);
         AMutableString aString = new AMutableString("");
-        ISerializerDeserializer<AString> stringSerde =
-                SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING);
+        ISerializerDeserializer<AString> stringSerde = SerializerDeserializerProvider.INSTANCE
+                .getSerializerDeserializer(BuiltinType.ASTRING);
 
         // write field 0
         fieldValue.reset();


Mime
View raw message