Return-Path: X-Original-To: apmail-asterixdb-commits-archive@minotaur.apache.org Delivered-To: apmail-asterixdb-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7B017175B9 for ; Mon, 29 Jun 2015 19:46:56 +0000 (UTC) Received: (qmail 20955 invoked by uid 500); 29 Jun 2015 19:46:56 -0000 Delivered-To: apmail-asterixdb-commits-archive@asterixdb.apache.org Received: (qmail 20922 invoked by uid 500); 29 Jun 2015 19:46:56 -0000 Mailing-List: contact commits-help@asterixdb.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@asterixdb.incubator.apache.org Delivered-To: mailing list commits@asterixdb.incubator.apache.org Received: (qmail 20865 invoked by uid 99); 29 Jun 2015 19:46:56 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 29 Jun 2015 19:46:56 +0000 X-ASF-Spam-Status: No, hits=-2000.7 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Mon, 29 Jun 2015 19:43:13 +0000 Received: (qmail 7755 invoked by uid 99); 29 Jun 2015 19:44:59 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 29 Jun 2015 19:44:59 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 06466E35DD; Mon, 29 Jun 2015 19:44:59 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sjaco002@apache.org To: commits@asterixdb.incubator.apache.org Date: Mon, 29 Jun 2015 19:45:04 -0000 Message-Id: <6b3897c540a644b49cbb5d7887084690@git.apache.org> In-Reply-To: <5b12a571b4c242a99d375806e33e0bed@git.apache.org> References: <5b12a571b4c242a99d375806e33e0bed@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [07/24] incubator-asterixdb git commit: Introduces Feeds 2.0 X-Virus-Checked: Checked by ClamAV on apache.org http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java ---------------------------------------------------------------------- diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java index 56fbcb5..ec727a9 100644 --- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java +++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java @@ -21,7 +21,6 @@ import java.rmi.RemoteException; import java.util.List; import edu.uci.ics.asterix.common.exceptions.ACIDException; -import edu.uci.ics.asterix.common.feeds.FeedConnectionId; import edu.uci.ics.asterix.common.functions.FunctionSignature; import edu.uci.ics.asterix.common.transactions.JobId; import edu.uci.ics.asterix.metadata.MetadataException; @@ -32,8 +31,6 @@ import edu.uci.ics.asterix.metadata.entities.Datatype; import edu.uci.ics.asterix.metadata.entities.Dataverse; import edu.uci.ics.asterix.metadata.entities.ExternalFile; import edu.uci.ics.asterix.metadata.entities.Feed; -import edu.uci.ics.asterix.metadata.entities.FeedActivity; -import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType; import edu.uci.ics.asterix.metadata.entities.FeedPolicy; import edu.uci.ics.asterix.metadata.entities.Function; import edu.uci.ics.asterix.metadata.entities.Index; @@ -497,16 +494,6 @@ public interface IMetadataNode extends Remote, Serializable { public void addCompactionPolicy(JobId jobId, CompactionPolicy compactionPolicy) throws MetadataException, RemoteException; - public FeedActivity getRecentFeedActivity(JobId jobId, FeedConnectionId feedId, - FeedActivityType... feedActivityFilter) throws MetadataException, RemoteException; - - /** - * @param jobId - * @throws MetadataException - * @throws RemoteException - */ - public void initializeFeedActivityIdFactory(JobId jobId) throws MetadataException, RemoteException; - /** * @param jobId * @param dataverse @@ -559,16 +546,7 @@ public interface IMetadataNode extends Remote, Serializable { */ public void dropFeed(JobId jobId, String dataverse, String feedName) throws MetadataException, RemoteException; - /** - * @param jobId - * A globally unique id for an active metadata transaction. - * @param feedId - * A unique id for the feed - * @param feedActivity - */ - public void registerFeedActivity(JobId jobId, FeedConnectionId feedId, FeedActivity feedActivity) - throws MetadataException, RemoteException; - + /** * @param jobId * @param feedPolicy @@ -588,17 +566,7 @@ public interface IMetadataNode extends Remote, Serializable { public FeedPolicy getFeedPolicy(JobId jobId, String dataverse, String policy) throws MetadataException, RemoteException; - /** - * @param jobId - * @param dataverse - * @param dataset - * @return - * @throws MetadataException - * @throws RemoteException - */ - public List getActiveFeeds(JobId jobId, String dataverse, String dataset) throws MetadataException, - RemoteException; - + /** * Removes a library , acquiring local locks on behalf of the given * transaction id. @@ -667,16 +635,28 @@ public interface IMetadataNode extends Remote, Serializable { public List getDataverseFeeds(JobId jobId, String dataverseName) throws MetadataException, RemoteException; /** + * delete a give feed (ingestion) policy + * * @param jobId * @param dataverseName - * @param deedName + * @param policyName + * @return + * @throws RemoteException + * @throws MetadataException + */ + public void dropFeedPolicy(JobId jobId, String dataverseName, String policyName) throws MetadataException, + RemoteException; + + /** + * @param jobId + * @param dataverse * @return * @throws MetadataException * @throws RemoteException */ - public List getDatasetsServedByFeed(JobId jobId, String dataverseName, String deedName) - throws MetadataException, RemoteException; - + public List getDataversePolicies(JobId jobId, String dataverse) throws MetadataException, + RemoteException; + /** * @param jobId * A globally unique id for an active metadata transaction. http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java ---------------------------------------------------------------------- diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java index c93c29e..0ac211e 100644 --- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java +++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java @@ -57,9 +57,9 @@ import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails.FileStructur import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails.PartitioningStrategy; import edu.uci.ics.asterix.metadata.entities.Node; import edu.uci.ics.asterix.metadata.entities.NodeGroup; +import edu.uci.ics.asterix.metadata.external.IAdapterFactory; import edu.uci.ics.asterix.metadata.feeds.AdapterIdentifier; import edu.uci.ics.asterix.metadata.feeds.BuiltinFeedPolicies; -import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory; import edu.uci.ics.asterix.om.types.BuiltinType; import edu.uci.ics.asterix.om.types.IAType; import edu.uci.ics.asterix.runtime.formats.NonTaggedDataFormat; @@ -123,7 +123,7 @@ public class MetadataBootstrap { MetadataPrimaryIndexes.INDEX_DATASET, MetadataPrimaryIndexes.NODE_DATASET, MetadataPrimaryIndexes.NODEGROUP_DATASET, MetadataPrimaryIndexes.FUNCTION_DATASET, MetadataPrimaryIndexes.DATASOURCE_ADAPTER_DATASET, MetadataPrimaryIndexes.FEED_DATASET, - MetadataPrimaryIndexes.FEED_ACTIVITY_DATASET, MetadataPrimaryIndexes.FEED_POLICY_DATASET, + MetadataPrimaryIndexes.FEED_POLICY_DATASET, MetadataPrimaryIndexes.LIBRARY_DATASET, MetadataPrimaryIndexes.COMPACTION_POLICY_DATASET, MetadataPrimaryIndexes.EXTERNAL_FILE_DATASET }; http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java ---------------------------------------------------------------------- diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java index 7eae5cd..04e56b1 100644 --- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java +++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java @@ -15,7 +15,6 @@ package edu.uci.ics.asterix.metadata.bootstrap; -import java.util.ArrayList; import java.util.Arrays; import edu.uci.ics.asterix.metadata.MetadataException; http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java ---------------------------------------------------------------------- diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java index 30ae9fa..b726ed6 100644 --- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java +++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java @@ -50,19 +50,21 @@ public final class MetadataRecordTypes { public static ARecordType FUNCTION_RECORDTYPE; public static ARecordType DATASOURCE_ADAPTER_RECORDTYPE; public static ARecordType FEED_RECORDTYPE; + public static ARecordType PRIMARY_FEED_DETAILS_RECORDTYPE; + public static ARecordType SECONDARY_FEED_DETAILS_RECORDTYPE; public static ARecordType FEED_ADAPTER_CONFIGURATION_RECORDTYPE; public static ARecordType FEED_ACTIVITY_RECORDTYPE; public static ARecordType FEED_POLICY_RECORDTYPE; public static ARecordType POLICY_PARAMS_RECORDTYPE; - public static ARecordType FEED_ACTIVITY_DETAILS_RECORDTYPE; public static ARecordType LIBRARY_RECORDTYPE; public static ARecordType COMPACTION_POLICY_RECORDTYPE; public static ARecordType EXTERNAL_FILE_RECORDTYPE; /** * Create all metadata record types. + * @throws HyracksDataException */ - public static void init() throws MetadataException { + public static void init() throws MetadataException, HyracksDataException { // Attention: The order of these calls is important because some types // depend on other types being created first. // These calls are one "dependency chain". @@ -90,10 +92,10 @@ public final class MetadataRecordTypes { FUNCTION_RECORDTYPE = createFunctionRecordType(); DATASOURCE_ADAPTER_RECORDTYPE = createDatasourceAdapterRecordType(); - FEED_RECORDTYPE = createFeedRecordType(); FEED_ADAPTER_CONFIGURATION_RECORDTYPE = createPropertiesRecordType(); - FEED_ACTIVITY_DETAILS_RECORDTYPE = createPropertiesRecordType(); - FEED_ACTIVITY_RECORDTYPE = createFeedActivityRecordType(); + PRIMARY_FEED_DETAILS_RECORDTYPE = createPrimaryFeedDetailsRecordType(); + SECONDARY_FEED_DETAILS_RECORDTYPE = createSecondaryFeedDetailsRecordType(); + FEED_RECORDTYPE = createFeedRecordType(); FEED_POLICY_RECORDTYPE = createFeedPolicyRecordType(); LIBRARY_RECORDTYPE = createLibraryRecordType(); @@ -498,50 +500,67 @@ public final class MetadataRecordTypes { public static final int FEED_ACTIVITY_ARECORD_ACTIVITY_TYPE_FIELD_INDEX = 4; public static final int FEED_ACTIVITY_ARECORD_DETAILS_FIELD_INDEX = 5; public static final int FEED_ACTIVITY_ARECORD_LAST_UPDATE_TIMESTAMP_FIELD_INDEX = 6; - - private static ARecordType createFeedActivityRecordType() throws AsterixException { - AUnorderedListType unorderedPropertyListType = new AUnorderedListType(FEED_ACTIVITY_DETAILS_RECORDTYPE, null); - String[] fieldNames = { "DataverseName", "FeedName", "DatasetName", "ActivityId", "ActivityType", "Details", - "Timestamp" }; - IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.AINT32, - BuiltinType.ASTRING, unorderedPropertyListType, BuiltinType.ASTRING }; - try { - return new ARecordType("FeedActivityRecordType", fieldNames, fieldTypes, true); - } catch (HyracksDataException e) { - throw new AsterixException(e); - } - } + public static final int FEED_ARECORD_DATAVERSE_NAME_FIELD_INDEX = 0; public static final int FEED_ARECORD_FEED_NAME_FIELD_INDEX = 1; - public static final int FEED_ARECORD_ADAPTER_NAME_FIELD_INDEX = 2; - public static final int FEED_ARECORD_ADAPTER_CONFIGURATION_FIELD_INDEX = 3; - public static final int FEED_ARECORD_FUNCTION_FIELD_INDEX = 4; - public static final int FEED_ARECORD_TIMESTAMP_FIELD_INDEX = 5; + public static final int FEED_ARECORD_FUNCTION_FIELD_INDEX = 2; + public static final int FEED_ARECORD_FEED_TYPE_FIELD_INDEX = 3; + public static final int FEED_ARECORD_PRIMARY_TYPE_DETAILS_FIELD_INDEX = 4; + public static final int FEED_ARECORD_SECONDARY_TYPE_DETAILS_FIELD_INDEX = 5; + public static final int FEED_ARECORD_TIMESTAMP_FIELD_INDEX = 6; - private static ARecordType createFeedRecordType() throws AsterixException { + + public static final int FEED_ARECORD_PRIMARY_FIELD_DETAILS_ADAPTOR_NAME_FIELD_INDEX = 0; + public static final int FEED_ARECORD_PRIMARY_FIELD_DETAILS_ADAPTOR_CONFIGURATION_FIELD_INDEX = 1; - AUnorderedListType unorderedAdapterPropertyListType = new AUnorderedListType( - DATASOURCE_ADAPTER_PROPERTIES_RECORDTYPE, null); + public static final int FEED_ARECORD_SECONDARY_FIELD_DETAILS_SOURCE_FEED_NAME_FIELD_INDEX = 0; + + private static ARecordType createFeedRecordType() throws AsterixException, HyracksDataException { List feedFunctionUnionList = new ArrayList(); feedFunctionUnionList.add(BuiltinType.ANULL); feedFunctionUnionList.add(BuiltinType.ASTRING); AUnionType feedFunctionUnion = new AUnionType(feedFunctionUnionList, null); - String[] fieldNames = { "DataverseName", "FeedName", "AdapterName", "AdapterConfiguration", "Function", - "Timestamp" }; - IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, - unorderedAdapterPropertyListType, feedFunctionUnion, BuiltinType.ASTRING }; + List primaryFeedTypeDetailsRecordUnionList = new ArrayList(); + primaryFeedTypeDetailsRecordUnionList.add(BuiltinType.ANULL); + primaryFeedTypeDetailsRecordUnionList.add(PRIMARY_FEED_DETAILS_RECORDTYPE); + AUnionType primaryRecordUnion = new AUnionType(primaryFeedTypeDetailsRecordUnionList, null); - try { - return new ARecordType("FeedRecordType", fieldNames, fieldTypes, true); - } catch (HyracksDataException e) { - throw new AsterixException(e); - } + List secondaryFeedTypeDetailsRecordUnionList = new ArrayList(); + secondaryFeedTypeDetailsRecordUnionList.add(BuiltinType.ANULL); + secondaryFeedTypeDetailsRecordUnionList.add(SECONDARY_FEED_DETAILS_RECORDTYPE); + AUnionType secondaryRecordUnion = new AUnionType(secondaryFeedTypeDetailsRecordUnionList, null); + + String[] fieldNames = { "DataverseName", "FeedName", "Function", "FeedType", "PrimaryTypeDetails", + "SecondaryTypeDetails", "Timestamp" }; + IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, feedFunctionUnion, BuiltinType.ASTRING, + primaryRecordUnion, secondaryRecordUnion, BuiltinType.ASTRING }; + return new ARecordType("FeedRecordType", fieldNames, fieldTypes, true); } + public static final int FEED_TYPE_PRIMARY_ARECORD_ADAPTER_NAME_FIELD_INDEX = 0; + public static final int FEED_TYPE_PRIMARY_ARECORD_ADAPTER_CONFIGURATION_FIELD_INDEX = 1; + + private static final ARecordType createPrimaryFeedDetailsRecordType() throws AsterixException, HyracksDataException { + AUnorderedListType unorderedAdaptorPropertyListType = new AUnorderedListType( + DATASOURCE_ADAPTER_PROPERTIES_RECORDTYPE, null); + + String[] fieldNames = { "AdapterName", "AdapterConfiguration" }; + IAType[] fieldTypes = { BuiltinType.ASTRING, unorderedAdaptorPropertyListType }; + return new ARecordType(null, fieldNames, fieldTypes, true); + } + + public static final int FEED_TYPE_SECONDARY_ARECORD_SOURCE_FEED_NAME_FIELD_INDEX = 0; + + private static final ARecordType createSecondaryFeedDetailsRecordType() throws AsterixException, HyracksDataException { + String[] fieldNames = { "SourceFeedName" }; + IAType[] fieldTypes = { BuiltinType.ASTRING }; + return new ARecordType(null, fieldNames, fieldTypes, true); + } + public static final int LIBRARY_ARECORD_DATAVERSENAME_FIELD_INDEX = 0; public static final int LIBRARY_ARECORD_NAME_FIELD_INDEX = 1; public static final int LIBRARY_ARECORD_TIMESTAMP_FIELD_INDEX = 2; http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AbstractClusterManagementWork.java ---------------------------------------------------------------------- diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AbstractClusterManagementWork.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AbstractClusterManagementWork.java index 6948dbc..037b04d 100644 --- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AbstractClusterManagementWork.java +++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AbstractClusterManagementWork.java @@ -16,8 +16,8 @@ package edu.uci.ics.asterix.metadata.cluster; import java.util.concurrent.atomic.AtomicInteger; -import edu.uci.ics.asterix.metadata.api.IClusterEventsSubscriber; -import edu.uci.ics.asterix.metadata.api.IClusterManagementWork; +import edu.uci.ics.asterix.common.api.IClusterEventsSubscriber; +import edu.uci.ics.asterix.common.api.IClusterManagementWork; public abstract class AbstractClusterManagementWork implements IClusterManagementWork { @@ -35,6 +35,8 @@ public abstract class AbstractClusterManagementWork implements IClusterManagemen this.workId = WorkIdGenerator.getNextWorkId(); } + + private static class WorkIdGenerator { private static AtomicInteger workId = new AtomicInteger(0); http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AddNodeWork.java ---------------------------------------------------------------------- diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AddNodeWork.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AddNodeWork.java index 68dcc4c..1157562 100644 --- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AddNodeWork.java +++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AddNodeWork.java @@ -1,27 +1,49 @@ +/* + * Copyright 2009-2013 by The Regents of the University of California + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * you may obtain a copy of the License from + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package edu.uci.ics.asterix.metadata.cluster; -import edu.uci.ics.asterix.metadata.api.IClusterEventsSubscriber; +import java.util.Set; + +import edu.uci.ics.asterix.common.api.IClusterEventsSubscriber; public class AddNodeWork extends AbstractClusterManagementWork { - private final int numberOfNodes; + private final int numberOfNodesRequested; + private final Set deadNodes; @Override public WorkType getClusterManagementWorkType() { return WorkType.ADD_NODE; } - public AddNodeWork(int numberOfNodes, IClusterEventsSubscriber subscriber) { + public AddNodeWork(Set deadNodes, int numberOfNodesRequested, IClusterEventsSubscriber subscriber) { super(subscriber); - this.numberOfNodes = numberOfNodes; + this.deadNodes = deadNodes; + this.numberOfNodesRequested = numberOfNodesRequested; + } + + public int getNumberOfNodesRequested() { + return numberOfNodesRequested; } - public int getNumberOfNodes() { - return numberOfNodes; + public Set getDeadNodes() { + return deadNodes; } @Override public String toString() { - return WorkType.ADD_NODE + " " + numberOfNodes + " requested by " + subscriber; + return WorkType.ADD_NODE + " " + numberOfNodesRequested + " requested by " + subscriber; } } http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManagementWorkResponse.java ---------------------------------------------------------------------- diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManagementWorkResponse.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManagementWorkResponse.java index d578a77..7f3b575 100644 --- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManagementWorkResponse.java +++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManagementWorkResponse.java @@ -1,6 +1,7 @@ package edu.uci.ics.asterix.metadata.cluster; -import edu.uci.ics.asterix.metadata.api.IClusterManagementWork; +import edu.uci.ics.asterix.common.api.IClusterManagementWork; +import edu.uci.ics.asterix.common.api.IClusterManagementWorkResponse; public class ClusterManagementWorkResponse implements IClusterManagementWorkResponse { @@ -13,6 +14,7 @@ public class ClusterManagementWorkResponse implements IClusterManagementWorkResp this.status = Status.IN_PROGRESS; } + @Override public IClusterManagementWork getWork() { return work; http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManager.java ---------------------------------------------------------------------- diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManager.java index fe7f4a4..47ca953 100644 --- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManager.java +++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManager.java @@ -25,6 +25,7 @@ import java.util.logging.Logger; import javax.xml.bind.JAXBContext; import javax.xml.bind.Unmarshaller; +import edu.uci.ics.asterix.common.api.IClusterEventsSubscriber; import edu.uci.ics.asterix.common.config.AsterixExternalProperties; import edu.uci.ics.asterix.common.exceptions.AsterixException; import edu.uci.ics.asterix.event.management.AsterixEventServiceClient; @@ -39,7 +40,6 @@ import edu.uci.ics.asterix.event.service.ILookupService; import edu.uci.ics.asterix.event.service.ServiceProvider; import edu.uci.ics.asterix.event.util.PatternCreator; import edu.uci.ics.asterix.installer.schema.conf.Configuration; -import edu.uci.ics.asterix.metadata.api.IClusterEventsSubscriber; import edu.uci.ics.asterix.metadata.api.IClusterManager; import edu.uci.ics.asterix.om.util.AsterixAppContextInfo; import edu.uci.ics.asterix.om.util.AsterixClusterProperties; http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/IClusterManagementWorkResponse.java ---------------------------------------------------------------------- diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/IClusterManagementWorkResponse.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/IClusterManagementWorkResponse.java deleted file mode 100644 index dfc88ac..0000000 --- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/IClusterManagementWorkResponse.java +++ /dev/null @@ -1,28 +0,0 @@ -package edu.uci.ics.asterix.metadata.cluster; - -import edu.uci.ics.asterix.metadata.api.IClusterManagementWork; - -public interface IClusterManagementWorkResponse { - - public enum Status { - IN_PROGRESS, - SUCCESS, - FAILURE - } - - /** - * @return - */ - public IClusterManagementWork getWork(); - - /** - * @return - */ - public Status getStatus(); - - /** - * @param status - */ - public void setStatus(Status status); - -} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/RemoveNodeWork.java ---------------------------------------------------------------------- diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/RemoveNodeWork.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/RemoveNodeWork.java index 90683d1..8b4f2aa 100644 --- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/RemoveNodeWork.java +++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/RemoveNodeWork.java @@ -2,7 +2,7 @@ package edu.uci.ics.asterix.metadata.cluster; import java.util.Set; -import edu.uci.ics.asterix.metadata.api.IClusterEventsSubscriber; +import edu.uci.ics.asterix.common.api.IClusterEventsSubscriber; public class RemoveNodeWork extends AbstractClusterManagementWork { http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlDataSource.java ---------------------------------------------------------------------- diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlDataSource.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlDataSource.java index ef23135..5df2c95 100644 --- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlDataSource.java +++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlDataSource.java @@ -43,8 +43,7 @@ import edu.uci.ics.hyracks.algebricks.core.algebra.properties.UnorderedPartition public abstract class AqlDataSource implements IDataSource { private final AqlSourceId id; - private final String datasourceDataverse; - private final String datasourceName; + private final IAType itemType; private final AqlDataSourceType datasourceType; protected IAType[] schemaTypes; protected INodeDomain domain; @@ -58,19 +57,18 @@ public abstract class AqlDataSource implements IDataSource { } public AqlDataSource(AqlSourceId id, String datasourceDataverse, String datasourceName, - AqlDataSourceType datasourceType) throws AlgebricksException { + IAType itemType, AqlDataSourceType datasourceType) throws AlgebricksException { this.id = id; - this.datasourceDataverse = datasourceDataverse; - this.datasourceName = datasourceName; + this.itemType = itemType; this.datasourceType = datasourceType; } public String getDatasourceDataverse() { - return datasourceDataverse; + return id.getDataverseName(); } public String getDatasourceName() { - return datasourceName; + return id.getDatasourceName(); } @Override @@ -196,7 +194,10 @@ public abstract class AqlDataSource implements IDataSource { public Map getProperties() { return properties; } - + + public IAType getItemType() { + return itemType; + } public void setProperties(Map properties) { this.properties = properties; } http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java ---------------------------------------------------------------------- diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java index 5055cea..0ec098d 100644 --- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java +++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java @@ -18,6 +18,7 @@ package edu.uci.ics.asterix.metadata.declared; import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -37,7 +38,12 @@ import edu.uci.ics.asterix.common.dataflow.AsterixLSMInvertedIndexInsertDeleteOp import edu.uci.ics.asterix.common.dataflow.AsterixLSMTreeInsertDeleteOperatorDescriptor; import edu.uci.ics.asterix.common.dataflow.IAsterixApplicationContextInfo; import edu.uci.ics.asterix.common.exceptions.AsterixException; +import edu.uci.ics.asterix.common.feeds.FeedActivity; +import edu.uci.ics.asterix.common.feeds.FeedActivity.FeedActivityDetails; import edu.uci.ics.asterix.common.feeds.FeedConnectionId; +import edu.uci.ics.asterix.common.feeds.FeedConstants; +import edu.uci.ics.asterix.common.feeds.FeedPolicyAccessor; +import edu.uci.ics.asterix.common.feeds.api.ICentralFeedManager; import edu.uci.ics.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory; import edu.uci.ics.asterix.common.ioopcallbacks.LSMBTreeWithBuddyIOOperationCallbackFactory; import edu.uci.ics.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory; @@ -65,23 +71,19 @@ import edu.uci.ics.asterix.metadata.entities.Dataverse; import edu.uci.ics.asterix.metadata.entities.ExternalDatasetDetails; import edu.uci.ics.asterix.metadata.entities.ExternalFile; import edu.uci.ics.asterix.metadata.entities.Feed; -import edu.uci.ics.asterix.metadata.entities.FeedActivity; -import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityDetails; import edu.uci.ics.asterix.metadata.entities.FeedPolicy; import edu.uci.ics.asterix.metadata.entities.Index; import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails; +import edu.uci.ics.asterix.metadata.entities.PrimaryFeed; +import edu.uci.ics.asterix.metadata.external.IAdapterFactory; +import edu.uci.ics.asterix.metadata.external.IAdapterFactory.SupportedOperation; import edu.uci.ics.asterix.metadata.external.IndexingConstants; import edu.uci.ics.asterix.metadata.feeds.BuiltinFeedPolicies; -import edu.uci.ics.asterix.metadata.feeds.EndFeedMessage; import edu.uci.ics.asterix.metadata.feeds.ExternalDataScanOperatorDescriptor; +import edu.uci.ics.asterix.metadata.feeds.FeedCollectOperatorDescriptor; import edu.uci.ics.asterix.metadata.feeds.FeedIntakeOperatorDescriptor; -import edu.uci.ics.asterix.metadata.feeds.FeedMessageOperatorDescriptor; import edu.uci.ics.asterix.metadata.feeds.FeedUtil; -import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory; -import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory.SupportedOperation; -import edu.uci.ics.asterix.metadata.feeds.IFeedMessage; -import edu.uci.ics.asterix.metadata.feeds.IGenericAdapterFactory; -import edu.uci.ics.asterix.metadata.feeds.ITypedAdapterFactory; +import edu.uci.ics.asterix.metadata.feeds.IFeedAdapterFactory; import edu.uci.ics.asterix.metadata.utils.DatasetUtils; import edu.uci.ics.asterix.metadata.utils.ExternalDatasetsRegistry; import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions; @@ -187,6 +189,7 @@ public class AqlMetadataProvider implements IMetadataProvider buildFeedCollectRuntime(JobSpecification jobSpec, + IDataSource dataSource) throws AlgebricksException { + + FeedDataSource feedDataSource = (FeedDataSource) dataSource; + FeedCollectOperatorDescriptor feedCollector = null; + + try { + ARecordType feedOutputType = (ARecordType) feedDataSource.getItemType(); + ISerializerDeserializer payloadSerde = NonTaggedDataFormat.INSTANCE.getSerdeProvider() + .getSerializerDeserializer(feedOutputType); + RecordDescriptor feedDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde }); + + FeedPolicy feedPolicy = (FeedPolicy) ((AqlDataSource) dataSource).getProperties().get( + BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY); + if (feedPolicy == null) { + throw new AlgebricksException("Feed not configured with a policy"); + } + feedPolicy.getProperties().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, feedPolicy.getPolicyName()); + FeedConnectionId feedConnectionId = new FeedConnectionId(feedDataSource.getId().getDataverseName(), + feedDataSource.getId().getDatasourceName(), feedDataSource.getTargetDataset()); + feedCollector = new FeedCollectOperatorDescriptor(jobSpec, feedConnectionId, + feedDataSource.getSourceFeedId(), (ARecordType) feedOutputType, feedDesc, + feedPolicy.getProperties(), feedDataSource.getLocation()); + + return new Pair(feedCollector, + determineLocationConstraint(feedDataSource)); + + } catch (Exception e) { + throw new AlgebricksException(e); + } +} + +private AlgebricksAbsolutePartitionConstraint determineLocationConstraint(FeedDataSource feedDataSource) + throws AsterixException { + String[] locationArray = null; + String locations = null;; + switch (feedDataSource.getSourceFeedType()) { + case PRIMARY: + switch (feedDataSource.getLocation()) { + case SOURCE_FEED_COMPUTE_STAGE: + if (feedDataSource.getFeed().getFeedId().equals(feedDataSource.getSourceFeedId())) { + locationArray = feedDataSource.getLocations(); + } else { + Collection activities = centralFeedManager.getFeedLoadManager() + .getFeedActivities(); + Iterator it = activities.iterator(); + FeedActivity activity = null; + while (it.hasNext()) { + activity = it.next(); + if (activity.getDataverseName().equals(feedDataSource.getSourceFeedId().getDataverse()) + && activity.getFeedName() + .equals(feedDataSource.getSourceFeedId().getFeedName())) { + locations = activity.getFeedActivityDetails().get( + FeedActivityDetails.COMPUTE_LOCATIONS); + locationArray = locations.split(","); + break; + } + } + } + break; + case SOURCE_FEED_INTAKE_STAGE: + locationArray = feedDataSource.getLocations(); + break; + } + break; + case SECONDARY: + Collection activities = centralFeedManager.getFeedLoadManager().getFeedActivities(); + Iterator it = activities.iterator(); + FeedActivity activity = null; + while (it.hasNext()) { + activity = it.next(); + if (activity.getDataverseName().equals(feedDataSource.getSourceFeedId().getDataverse()) + && activity.getFeedName().equals(feedDataSource.getSourceFeedId().getFeedName())) { + switch (feedDataSource.getLocation()) { + case SOURCE_FEED_INTAKE_STAGE: + locations = activity.getFeedActivityDetails() + .get(FeedActivityDetails.COLLECT_LOCATIONS); + break; + case SOURCE_FEED_COMPUTE_STAGE: + locations = activity.getFeedActivityDetails() + .get(FeedActivityDetails.COMPUTE_LOCATIONS); + break; + } + break; + } + } + + if (locations != null) { + locationArray = locations.split(","); + } else { + String message = "Unable to discover location(s) for source feed data hand-off " + + feedDataSource.getSourceFeedId(); + if (LOGGER.isLoggable(Level.SEVERE)) { + LOGGER.severe(message); + } + throw new AsterixException(message); + } + break; + } + AlgebricksAbsolutePartitionConstraint locationConstraint = new AlgebricksAbsolutePartitionConstraint( + locationArray); + return locationConstraint; +} private Pair buildLoadableDatasetScan(JobSpecification jobSpec, LoadableDataSource alds, IAdapterFactory adapterFactory, RecordDescriptor rDesc, boolean isPKAutoGenerated, List> primaryKeys, ARecordType recType, int pkIndex) throws AlgebricksException { @@ -411,7 +517,7 @@ public class AqlMetadataProvider implements IMetadataProvider configuration, IAType itemType, boolean isPKAutoGenerated, - List> primaryKeys) throws AlgebricksException { + Map configuration, IAType itemType, boolean isPKAutoGenerated, List> primaryKeys) + throws AlgebricksException { IAdapterFactory adapterFactory; DatasourceAdapter adapterEntity; String adapterFactoryClassname; @@ -458,6 +564,8 @@ public class AqlMetadataProvider implements IMetadataProvider(scanner, apc); } - @SuppressWarnings("rawtypes") - public Pair buildFeedIntakeRuntime(JobSpecification jobSpec, - IDataSource dataSource) throws AlgebricksException { - - FeedDataSource feedDataSource = (FeedDataSource) dataSource; + public Triple buildFeedIntakeRuntime( + JobSpecification jobSpec, PrimaryFeed primaryFeed, FeedPolicyAccessor policyAccessor) throws Exception { + Triple factoryOutput = null; + factoryOutput = FeedUtil.getPrimaryFeedFactoryAndOutput(primaryFeed, policyAccessor, mdTxnCtx); + IFeedAdapterFactory adapterFactory = factoryOutput.first; FeedIntakeOperatorDescriptor feedIngestor = null; - Triple factoryOutput = null; - AlgebricksPartitionConstraint constraint = null; - - try { - factoryOutput = FeedUtil.getFeedFactoryAndOutput(feedDataSource.getFeed(), mdTxnCtx); - IAdapterFactory adapterFactory = factoryOutput.first; - ARecordType adapterOutputType = factoryOutput.second; - AdapterType adapterType = factoryOutput.third; - - ISerializerDeserializer payloadSerde = NonTaggedDataFormat.INSTANCE.getSerdeProvider() - .getSerializerDeserializer(adapterOutputType); - RecordDescriptor feedDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde }); - - FeedPolicy feedPolicy = (FeedPolicy) ((AqlDataSource) dataSource).getProperties().get( - BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY); - if (feedPolicy == null) { - throw new AlgebricksException("Feed not configured with a policy"); - } - feedPolicy.getProperties().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, feedPolicy.getPolicyName()); - switch (adapterType) { - case INTERNAL: - feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, new FeedConnectionId( - feedDataSource.getDatasourceDataverse(), feedDataSource.getDatasourceName(), feedDataSource - .getFeedConnectionId().getDatasetName()), adapterFactory, adapterOutputType, - feedDesc, feedPolicy.getProperties()); - break; - case EXTERNAL: - String libraryName = feedDataSource.getFeed().getAdapterName().split("#")[0]; - feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, feedDataSource.getFeedConnectionId(), - libraryName, adapterFactory.getClass().getName(), feedDataSource.getFeed() - .getAdapterConfiguration(), adapterOutputType, feedDesc, feedPolicy.getProperties()); - break; - } - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Cofigured feed intake operator with " + adapterType + " adapter"); - } - constraint = factoryOutput.first.getPartitionConstraint(); - } catch (Exception e) { - throw new AlgebricksException(e); + switch (factoryOutput.third) { + case INTERNAL: + feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, primaryFeed, adapterFactory, + factoryOutput.second, policyAccessor); + break; + case EXTERNAL: + String libraryName = primaryFeed.getAdaptorName().trim() + .split(FeedConstants.NamingConstants.LIBRARY_NAME_SEPARATOR)[0]; + feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, primaryFeed, libraryName, adapterFactory + .getClass().getName(), factoryOutput.second, policyAccessor); + break; } - return new Pair(feedIngestor, constraint); - } - - public Pair buildSendFeedMessageRuntime( - JobSpecification jobSpec, String dataverse, String feedName, String dataset, IFeedMessage feedMessage, - String[] locations) throws AlgebricksException { - AlgebricksPartitionConstraint partitionConstraint = new AlgebricksAbsolutePartitionConstraint(locations); - FeedMessageOperatorDescriptor feedMessenger = new FeedMessageOperatorDescriptor(jobSpec, dataverse, feedName, - dataset, feedMessage); - return new Pair(feedMessenger, partitionConstraint); - } - public Pair buildDisconnectFeedMessengerRuntime( - JobSpecification jobSpec, String dataverse, String feedName, String dataset, FeedActivity feedActivity) - throws AlgebricksException { - List feedLocations = new ArrayList(); - String[] ingestLocs = feedActivity.getFeedActivityDetails().get(FeedActivityDetails.INGEST_LOCATIONS) - .split(","); - for (String loc : ingestLocs) { - feedLocations.add(loc); - } - FeedConnectionId feedId = new FeedConnectionId(dataverse, feedName, dataset); - String[] locations = feedLocations.toArray(new String[] {}); - IFeedMessage feedMessage = new EndFeedMessage(feedId); - return buildSendFeedMessageRuntime(jobSpec, dataverse, feedName, dataset, feedMessage, locations); + AlgebricksPartitionConstraint partitionConstraint = adapterFactory.getPartitionConstraint(); + return new Triple(feedIngestor, + partitionConstraint, adapterFactory); } + public Pair buildBtreeRuntime(JobSpecification jobSpec, List outputVars, IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv, @@ -934,7 +987,7 @@ public class AqlMetadataProvider implements IMetadataProvider additionalNonKeyFields, JobGenContext context, JobSpecification spec) throws AlgebricksException { String dataverseName = dataSource.getId().getDataverseName(); - String datasetName = dataSource.getId().getDatasetName(); + String datasetName = dataSource.getId().getDatasourceName(); Dataset dataset = findDataset(dataverseName, datasetName); if (dataset == null) { @@ -1055,7 +1108,7 @@ public class AqlMetadataProvider implements IMetadataProvider 0) { http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlSourceId.java ---------------------------------------------------------------------- diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlSourceId.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlSourceId.java index 4cee35d..be9f0e2 100644 --- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlSourceId.java +++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlSourceId.java @@ -20,23 +20,23 @@ import java.io.File; public class AqlSourceId { private String dataverseName; - private String datasetName; + private String datasourceName; - public AqlSourceId(String dataverseName, String datasetName) { + public AqlSourceId(String dataverseName, String datasourceName) { this.dataverseName = dataverseName; - this.datasetName = datasetName; + this.datasourceName = datasourceName; } @Override public String toString() { - return dataverseName + File.pathSeparator + datasetName; + return dataverseName + File.pathSeparator + datasourceName; } public String getDataverseName() { return dataverseName; } - public String getDatasetName() { - return datasetName; + public String getDatasourceName() { + return datasourceName; } } http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/DatasetDataSource.java ---------------------------------------------------------------------- diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/DatasetDataSource.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/DatasetDataSource.java index 059a10c..fd06e99 100644 --- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/DatasetDataSource.java +++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/DatasetDataSource.java @@ -21,7 +21,7 @@ public class DatasetDataSource extends AqlDataSource { public DatasetDataSource(AqlSourceId id, String datasourceDataverse, String datasourceName, IAType itemType, AqlDataSourceType datasourceType) throws AlgebricksException { - super(id, datasourceDataverse, datasourceName, datasourceType); + super(id, datasourceDataverse, datasourceName, itemType, datasourceType); MetadataTransactionContext ctx = null; try { ctx = MetadataManager.INSTANCE.beginTransaction(); http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FeedDataSource.java ---------------------------------------------------------------------- diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FeedDataSource.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FeedDataSource.java index 695ae31..44402fc 100644 --- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FeedDataSource.java +++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FeedDataSource.java @@ -14,32 +14,42 @@ */ package edu.uci.ics.asterix.metadata.declared; -import edu.uci.ics.asterix.common.feeds.FeedConnectionId; +import edu.uci.ics.asterix.common.feeds.FeedId; +import edu.uci.ics.asterix.common.feeds.api.IFeedLifecycleListener.ConnectionLocation; import edu.uci.ics.asterix.metadata.MetadataManager; import edu.uci.ics.asterix.metadata.MetadataTransactionContext; import edu.uci.ics.asterix.metadata.entities.Feed; +import edu.uci.ics.asterix.metadata.entities.Feed.FeedType; import edu.uci.ics.asterix.om.types.IAType; +import edu.uci.ics.asterix.om.util.AsterixClusterProperties; import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException; import edu.uci.ics.hyracks.algebricks.core.algebra.properties.INodeDomain; public class FeedDataSource extends AqlDataSource { private Feed feed; - private final FeedConnectionId feedConnectionId; + private final FeedId sourceFeedId; + private final FeedType sourceFeedType; + private final ConnectionLocation location; + private final String targetDataset; + private final String[] locations; + private final int computeCardinality; - public FeedDataSource(AqlSourceId id, FeedConnectionId feedId, IAType itemType, AqlDataSourceType dataSourceType) + public FeedDataSource(AqlSourceId id, String targetDataset, IAType itemType, AqlDataSourceType dataSourceType, + FeedId sourceFeedId, FeedType sourceFeedType, ConnectionLocation location, String[] locations) throws AlgebricksException { - super(id, feedId.getDataverse(), feedId.getFeedName(), dataSourceType); - this.feedConnectionId = feedId; - feed = null; + super(id, id.getDataverseName(), id.getDatasourceName(), itemType, dataSourceType); + this.targetDataset = targetDataset; + this.sourceFeedId = sourceFeedId; + this.sourceFeedType = sourceFeedType; + this.location = location; + this.locations = locations; + this.computeCardinality = AsterixClusterProperties.INSTANCE.getParticipantNodes().size(); MetadataTransactionContext ctx = null; try { MetadataManager.INSTANCE.acquireReadLatch(); ctx = MetadataManager.INSTANCE.beginTransaction(); - feed = MetadataManager.INSTANCE.getFeed(ctx, feedId.getDataverse(), feedId.getFeedName()); - if (feed == null) { - throw new AlgebricksException("Unknown feed " + feedId); - } + this.feed = MetadataManager.INSTANCE.getFeed(ctx, id.getDataverseName(), id.getDatasourceName()); MetadataManager.INSTANCE.commitTransaction(ctx); initFeedDataSource(itemType); } catch (Exception e) { @@ -71,8 +81,20 @@ public class FeedDataSource extends AqlDataSource { return domain; } - public FeedConnectionId getFeedConnectionId() { - return feedConnectionId; + public String getTargetDataset() { + return targetDataset; + } + + public FeedId getSourceFeedId() { + return sourceFeedId; + } + + public ConnectionLocation getLocation() { + return location; + } + + public String[] getLocations() { + return locations; } private void initFeedDataSource(IAType itemType) { @@ -91,4 +113,12 @@ public class FeedDataSource extends AqlDataSource { }; domain = domainForExternalData; } + + public FeedType getSourceFeedType() { + return sourceFeedType; + } + + public int getComputeCardinality() { + return computeCardinality; + } } http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FieldExtractingAdapter.java ---------------------------------------------------------------------- diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FieldExtractingAdapter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FieldExtractingAdapter.java index 5701292..f4be491 100644 --- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FieldExtractingAdapter.java +++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FieldExtractingAdapter.java @@ -4,8 +4,8 @@ import java.io.IOException; import java.nio.ByteBuffer; import edu.uci.ics.asterix.common.exceptions.AsterixException; +import edu.uci.ics.asterix.common.feeds.api.IDatasourceAdapter; import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer; -import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter; import edu.uci.ics.asterix.om.types.ARecordType; import edu.uci.ics.asterix.om.types.ATypeTag; import edu.uci.ics.asterix.om.types.IAType; http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FieldExtractingAdapterFactory.java ---------------------------------------------------------------------- diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FieldExtractingAdapterFactory.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FieldExtractingAdapterFactory.java index cbf771b..fd4b6a7 100644 --- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FieldExtractingAdapterFactory.java +++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FieldExtractingAdapterFactory.java @@ -1,7 +1,10 @@ package edu.uci.ics.asterix.metadata.declared; -import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory; -import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter; +import java.util.Map; + +import edu.uci.ics.asterix.common.feeds.api.IDatasourceAdapter; +import edu.uci.ics.asterix.metadata.external.IAdapterFactory; +import edu.uci.ics.asterix.metadata.external.IAdapterFactory.SupportedOperation; import edu.uci.ics.asterix.om.types.ARecordType; import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; import edu.uci.ics.hyracks.api.context.IHyracksTaskContext; @@ -40,11 +43,7 @@ public class FieldExtractingAdapterFactory implements IAdapterFactory { return "FieldExtractingAdapter[ " + wrappedAdapterFactory.getName() + " ]"; } - @Override - public AdapterType getAdapterType() { - return wrappedAdapterFactory.getAdapterType(); - } - + @Override public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception { return wrappedAdapterFactory.getPartitionConstraint(); @@ -55,5 +54,15 @@ public class FieldExtractingAdapterFactory implements IAdapterFactory { IDatasourceAdapter wrappedAdapter = wrappedAdapterFactory.createAdapter(ctx, partition); return new FieldExtractingAdapter(ctx, inRecDesc, outRecDesc, extractFields, rType, wrappedAdapter); } + + @Override + public void configure(Map configuration, ARecordType outputType) throws Exception { + wrappedAdapterFactory.configure(configuration, outputType); + } + + @Override + public ARecordType getAdapterOutputType() { + return wrappedAdapterFactory.getAdapterOutputType(); + } } http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/LoadableDataSource.java ---------------------------------------------------------------------- diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/LoadableDataSource.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/LoadableDataSource.java index 2dc840e..c91e3f6 100644 --- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/LoadableDataSource.java +++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/LoadableDataSource.java @@ -51,7 +51,7 @@ public class LoadableDataSource extends AqlDataSource { public LoadableDataSource(Dataset targetDataset, IAType itemType, String adapter, Map properties) throws AlgebricksException, IOException { - super(new AqlSourceId("loadable_dv", "loadable_ds"), "loadable_dv", "loadable_source", + super(new AqlSourceId("loadable_dv", "loadable_ds"), "loadable_dv", "loadable_source", itemType, AqlDataSourceType.LOADABLE); this.targetDataset = targetDataset; this.adapter = adapter; http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/PKGeneratingAdapter.java ---------------------------------------------------------------------- diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/PKGeneratingAdapter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/PKGeneratingAdapter.java index 0938ccc..862092a 100644 --- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/PKGeneratingAdapter.java +++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/PKGeneratingAdapter.java @@ -18,7 +18,7 @@ import java.nio.ByteBuffer; import java.util.List; import edu.uci.ics.asterix.builders.RecordBuilder; -import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter; +import edu.uci.ics.asterix.common.feeds.api.IDatasourceAdapter; import edu.uci.ics.asterix.om.base.AMutableUUID; import edu.uci.ics.asterix.om.base.AUUID; import edu.uci.ics.asterix.om.pointables.ARecordPointable; http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/PKGeneratingAdapterFactory.java ---------------------------------------------------------------------- diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/PKGeneratingAdapterFactory.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/PKGeneratingAdapterFactory.java index e371b2b..0a36ea8 100644 --- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/PKGeneratingAdapterFactory.java +++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/PKGeneratingAdapterFactory.java @@ -14,8 +14,10 @@ */ package edu.uci.ics.asterix.metadata.declared; -import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory; -import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter; +import java.util.Map; + +import edu.uci.ics.asterix.common.feeds.api.IDatasourceAdapter; +import edu.uci.ics.asterix.metadata.external.IAdapterFactory; import edu.uci.ics.asterix.om.types.ARecordType; import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; import edu.uci.ics.hyracks.api.context.IHyracksTaskContext; @@ -58,11 +60,6 @@ public class PKGeneratingAdapterFactory implements IAdapterFactory { } @Override - public AdapterType getAdapterType() { - return wrappedAdapterFactory.getAdapterType(); - } - - @Override public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception { return wrappedAdapterFactory.getPartitionConstraint(); } @@ -72,4 +69,14 @@ public class PKGeneratingAdapterFactory implements IAdapterFactory { IDatasourceAdapter wrappedAdapter = wrappedAdapterFactory.createAdapter(ctx, partition); return new PKGeneratingAdapter(ctx, inRecDesc, outRecDesc, inRecType, outRecType, wrappedAdapter, pkIndex); } + + @Override + public void configure(Map configuration, ARecordType outputType) throws Exception { + wrappedAdapterFactory.configure(configuration, outputType); + } + + @Override + public ARecordType getAdapterOutputType() { + return wrappedAdapterFactory.getAdapterOutputType(); + } } http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Feed.java ---------------------------------------------------------------------- diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Feed.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Feed.java index 39aa8ab..53b75a6 100644 --- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Feed.java +++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Feed.java @@ -15,62 +15,67 @@ package edu.uci.ics.asterix.metadata.entities; -import java.util.Map; - +import edu.uci.ics.asterix.common.feeds.FeedId; import edu.uci.ics.asterix.common.functions.FunctionSignature; import edu.uci.ics.asterix.metadata.MetadataCache; import edu.uci.ics.asterix.metadata.api.IMetadataEntity; /** - * Metadata describing a feed. + * Feed POJO */ public class Feed implements IMetadataEntity { private static final long serialVersionUID = 1L; - private final String dataverseName; - private final String feedName; - private final String adapterName; - private final Map adapterConfiguration; - private final FunctionSignature appliedFunction; - - public Feed(String dataverseName, String datasetName, String adapterName, Map adapterConfiguration, - FunctionSignature appliedFunction) { - this.dataverseName = dataverseName; - this.feedName = datasetName; - this.adapterName = adapterName; - this.adapterConfiguration = adapterConfiguration; - this.appliedFunction = appliedFunction; + /** A unique identifier for the feed */ + protected final FeedId feedId; + + /** The function that is to be applied on each incoming feed tuple **/ + protected final FunctionSignature appliedFunction; + + /** The type {@code FeedType} associated with the feed. **/ + protected final FeedType feedType; + + /** A string representation of the instance **/ + protected final String displayName; + + public enum FeedType { + /** + * A feed that derives its data from an external source. + */ + PRIMARY, + + /** + * A feed that derives its data from another primary or secondary feed. + */ + SECONDARY } - public String getDataverseName() { - return dataverseName; + public Feed(String dataverseName, String datasetName, FunctionSignature appliedFunction, FeedType feedType) { + this.feedId = new FeedId(dataverseName, datasetName); + this.appliedFunction = appliedFunction; + this.feedType = feedType; + this.displayName = feedType + "(" + feedId + ")"; } - public String getFeedName() { - return feedName; + public FeedId getFeedId() { + return feedId; } - public String getAdapterName() { - return adapterName; + public String getDataverseName() { + return feedId.getDataverse(); } - public Map getAdapterConfiguration() { - return adapterConfiguration; + public String getFeedName() { + return feedId.getFeedName(); } public FunctionSignature getAppliedFunction() { return appliedFunction; } - @Override - public Object addToCache(MetadataCache cache) { - return cache.addFeedIfNotExists(this); - } - - @Override - public Object dropFromCache(MetadataCache cache) { - return cache.dropFeed(this); + public FeedType getFeedType() { + return feedType; } @Override @@ -81,13 +86,27 @@ public class Feed implements IMetadataEntity { if (!(other instanceof Feed)) { return false; } - Feed otherDataset = (Feed) other; - if (!otherDataset.dataverseName.equals(dataverseName)) { - return false; - } - if (!otherDataset.feedName.equals(feedName)) { - return false; - } - return true; + Feed otherFeed = (Feed) other; + return otherFeed.getFeedId().equals(feedId); + } + + @Override + public int hashCode() { + return displayName.hashCode(); + } + + @Override + public String toString() { + return feedType + "(" + feedId + ")"; + } + + @Override + public Object addToCache(MetadataCache cache) { + return cache.addFeedIfNotExists(this); + } + + @Override + public Object dropFromCache(MetadataCache cache) { + return cache.dropFeed(this); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java ---------------------------------------------------------------------- diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java deleted file mode 100644 index 679276f..0000000 --- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java +++ /dev/null @@ -1,175 +0,0 @@ -/* - * Copyright 2009-2010 by The Regents of the University of California - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * you may obtain a copy of the License from - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package edu.uci.ics.asterix.metadata.entities; - -import java.util.Map; - -import edu.uci.ics.asterix.metadata.MetadataCache; -import edu.uci.ics.asterix.metadata.api.IMetadataEntity; - -/** - * Metadata describing a feed activity record. - */ -public class FeedActivity implements IMetadataEntity, Comparable { - - private static final long serialVersionUID = 1L; - - private int activityId; - - private final String dataverseName; - private final String datasetName; - private final String feedName; - - private String lastUpdatedTimestamp; - private FeedActivityType activityType; - private Map feedActivityDetails; - - public static enum FeedActivityType { - FEED_BEGIN, - FEED_FAILURE, - FEED_END - } - - public static class FeedActivityDetails { - public static final String COMPUTE_LOCATIONS = "compute-locations"; - public static final String INGEST_LOCATIONS = "ingest-locations"; - public static final String STORAGE_LOCATIONS = "storage-locations"; - public static final String TOTAL_INGESTED = "total-ingested"; - public static final String INGESTION_RATE = "ingestion-rate"; - public static final String EXCEPTION_LOCATION = "exception-location"; - public static final String EXCEPTION_MESSAGE = "exception-message"; - public static final String FEED_POLICY_NAME = "feed-policy-name"; - public static final String SUPER_FEED_MANAGER_HOST = "super-feed-manager-host"; - public static final String SUPER_FEED_MANAGER_PORT = "super-feed-manager-port"; - public static final String FEED_NODE_FAILURE = "feed-node-failure"; - - } - - public FeedActivity(String dataverseName, String feedName, String datasetName, FeedActivityType feedActivityType, - Map feedActivityDetails) { - this.dataverseName = dataverseName; - this.feedName = feedName; - this.datasetName = datasetName; - this.activityType = feedActivityType; - this.feedActivityDetails = feedActivityDetails; - } - - public String getDataverseName() { - return dataverseName; - } - - public String getDatasetName() { - return datasetName; - } - - public String getFeedName() { - return feedName; - } - - @Override - public Object addToCache(MetadataCache cache) { - return cache.addFeedActivityIfNotExists(this); - } - - @Override - public Object dropFromCache(MetadataCache cache) { - return cache.dropFeedActivity(this); - } - - @Override - public boolean equals(Object other) { - if (this == other) { - return true; - } - if (!(other instanceof FeedActivity)) { - return false; - } - - if (!((FeedActivity) other).dataverseName.equals(dataverseName)) { - return false; - } - if (!((FeedActivity) other).datasetName.equals(datasetName)) { - return false; - } - if (!((FeedActivity) other).getFeedName().equals(feedName)) { - return false; - } - if (!((FeedActivity) other).getFeedActivityType().equals(activityType)) { - return false; - } - if (((FeedActivity) other).getActivityId() != (activityId)) { - return false; - } - - return true; - } - - @Override - public int hashCode() { - return toString().hashCode(); - } - - @Override - public String toString() { - return dataverseName + "." + feedName + " --> " + datasetName + " " + activityType + " " + activityId; - } - - public FeedActivityType getFeedActivityType() { - return activityType; - } - - public void setFeedActivityType(FeedActivityType feedActivityType) { - this.activityType = feedActivityType; - } - - public String getLastUpdatedTimestamp() { - return lastUpdatedTimestamp; - } - - public void setLastUpdatedTimestamp(String lastUpdatedTimestamp) { - this.lastUpdatedTimestamp = lastUpdatedTimestamp; - } - - public int getActivityId() { - return activityId; - } - - public void setActivityId(int activityId) { - this.activityId = activityId; - } - - public Map getFeedActivityDetails() { - return feedActivityDetails; - } - - public void setFeedActivityDetails(Map feedActivityDetails) { - this.feedActivityDetails = feedActivityDetails; - } - - public FeedActivityType getActivityType() { - return activityType; - } - - public void setActivityType(FeedActivityType activityType) { - this.activityType = activityType; - } - - @Override - public int compareTo(FeedActivity o) { - return o.getActivityId() - this.activityId; - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/PrimaryFeed.java ---------------------------------------------------------------------- diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/PrimaryFeed.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/PrimaryFeed.java new file mode 100644 index 0000000..62f0d07 --- /dev/null +++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/PrimaryFeed.java @@ -0,0 +1,76 @@ +/* + * Copyright 2009-2013 by The Regents of the University of California + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * you may obtain a copy of the License from + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package edu.uci.ics.asterix.metadata.entities; + +import java.util.Map; +import java.util.Map.Entry; + +import edu.uci.ics.asterix.common.functions.FunctionSignature; +import edu.uci.ics.asterix.metadata.api.IMetadataEntity; + +/** + * A primary feed is one that derives its data from an external source via an adaptor. + * This class is a holder object for the metadata associated with a primary feed. + */ +public class PrimaryFeed extends Feed implements IMetadataEntity { + + private static final long serialVersionUID = 1L; + + private final String adaptorName; + private final Map adaptorConfiguration; + + public PrimaryFeed(String dataverseName, String datasetName, String adaptorName, + Map adaptorConfiguration, FunctionSignature appliedFunction) { + super(dataverseName, datasetName, appliedFunction, FeedType.PRIMARY); + this.adaptorName = adaptorName; + this.adaptorConfiguration = adaptorConfiguration; + } + + public String getAdaptorName() { + return adaptorName; + } + + public Map getAdaptorConfiguration() { + return adaptorConfiguration; + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (!super.equals(other) || !(other instanceof PrimaryFeed)) { + return false; + } + + PrimaryFeed otherFeed = (PrimaryFeed) other; + if (!otherFeed.getAdaptorName().equals(adaptorName)) { + return false; + } + + for (Entry entry : adaptorConfiguration.entrySet()) { + if (!(entry.getValue().equals(otherFeed.getAdaptorConfiguration().get(entry.getKey())))) { + return false; + } + } + return true; + } + + @Override + public String toString() { + return "PrimaryFeed (" + adaptorName + ")"; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/SecondaryFeed.java ---------------------------------------------------------------------- diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/SecondaryFeed.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/SecondaryFeed.java new file mode 100644 index 0000000..c1f51ba --- /dev/null +++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/SecondaryFeed.java @@ -0,0 +1,60 @@ +/* + * Copyright 2009-2013 by The Regents of the University of California + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * you may obtain a copy of the License from + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package edu.uci.ics.asterix.metadata.entities; + +import edu.uci.ics.asterix.common.functions.FunctionSignature; +import edu.uci.ics.asterix.metadata.api.IMetadataEntity; + +/** + * A secondary feed is one that derives its data from another (primary/secondary) feed. + * This class is a holder object for the metadata associated with a secondary feed. + */ +public class SecondaryFeed extends Feed implements IMetadataEntity { + + private static final long serialVersionUID = 1L; + + private final String sourceFeedName; + + public SecondaryFeed(String dataverseName, String feedName, String sourceFeedName, FunctionSignature appliedFunction) { + super(dataverseName, feedName, appliedFunction, FeedType.SECONDARY); + this.sourceFeedName = sourceFeedName; + } + + public String getSourceFeedName() { + return sourceFeedName; + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (!super.equals(other) || !(other instanceof SecondaryFeed)) { + return false; + } + + SecondaryFeed otherFeed = (SecondaryFeed) other; + if (!otherFeed.getSourceFeedName().equals(sourceFeedName)) { + return false; + } + return true; + } + + @Override + public String toString() { + return "SecondaryFeed (" + feedId + ")" + "<--" + "(" + sourceFeedName + ")"; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasourceAdapterTupleTranslator.java ---------------------------------------------------------------------- diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasourceAdapterTupleTranslator.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasourceAdapterTupleTranslator.java index 1bb34d2..580881d 100644 --- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasourceAdapterTupleTranslator.java +++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasourceAdapterTupleTranslator.java @@ -85,7 +85,7 @@ public class DatasourceAdapterTupleTranslator extends AbstractTupleTranslator