asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From xi...@apache.org
Subject asterixdb git commit: Fix record loss for certain certain feed type
Date Tue, 04 Apr 2017 17:52:26 GMT
Repository: asterixdb
Updated Branches:
  refs/heads/master 39d09a05c -> 45b72a9a0


Fix record loss for certain certain feed type

1. Fix blindly replace connector between FeedCollector and
   AssignOperator.
2. Wrap AssignOperator into the FeedMetaOperator to make sure the
   operators inside (udf, accessor, etc.) can handle messages in the
   feed workflow.
3. Revise feed connection job merge function.
4. Test case fix.

Change-Id: I4e1f7dd3621482a11feb675a93b826ae2cb965a7
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1652
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/45b72a9a
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/45b72a9a
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/45b72a9a

Branch: refs/heads/master
Commit: 45b72a9a0d478325c55de7a36cd73117ff0a24f2
Parents: 39d09a0
Author: Xikui Wang <xkkwww@gmail.com>
Authored: Mon Apr 3 16:59:57 2017 -0700
Committer: Xikui Wang <xkkwww@gmail.com>
Committed: Tue Apr 4 10:52:02 2017 -0700

----------------------------------------------------------------------
 ...ceRandomPartitioningFeedComputationRule.java |  2 +-
 .../apache/asterix/utils/FeedOperations.java    | 46 ++++++++++----------
 .../change-feed-with-meta-pk-in-meta.4.adm      |  2 +-
 .../change-feed-with-meta-pk-in-meta.5.adm      |  2 +-
 .../FeedCollectOperatorDescriptor.java          |  3 +-
 .../operators/FeedMetaOperatorDescriptor.java   |  2 +-
 .../dataset-with-meta-record.5.adm              |  2 +-
 7 files changed, 30 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/45b72a9a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
index 1a0ecd9..dfb73ee 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
@@ -61,7 +61,7 @@ public class IntroduceRandomPartitioningFeedComputationRule implements IAlgebrai
 
         final FeedDataSource feedDataSource = (FeedDataSource) dataSource;
         FeedConnection feedConnection = feedDataSource.getFeedConnection();
-        if (feedConnection.getAppliedFunctions() == null) {
+        if (feedConnection.getAppliedFunctions() == null || feedConnection.getAppliedFunctions().size()
== 0) {
             return false;
         }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/45b72a9a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index 4ea524a..874cbb1 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -80,6 +80,7 @@ import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Triple;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import org.apache.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory;
 import org.apache.hyracks.algebricks.runtime.operators.std.StreamSelectRuntimeFactory;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.constraints.Constraint;
@@ -207,12 +208,15 @@ public class FeedOperations {
         Map<OperatorDescriptorId, List<LocationConstraint>> operatorLocations
= new HashMap<>();
         Map<OperatorDescriptorId, Integer> operatorCounts = new HashMap<>();
         List<JobId> jobIds = new ArrayList<>();
+        FeedMetaOperatorDescriptor metaOp;
 
         for (int iter1 = 0; iter1 < jobsList.size(); iter1++) {
             FeedConnection curFeedConnection = feedConnections.get(iter1);
             JobSpecification subJob = jobsList.get(iter1);
             operatorIdMapping.clear();
             Map<OperatorDescriptorId, IOperatorDescriptor> operatorsMap = subJob.getOperatorMap();
+            FeedConnectionId feedConnectionId = new FeedConnectionId(ingestionOp.getEntityId(),
+                    feedConnections.get(iter1).getDatasetName());
 
             FeedPolicyEntity feedPolicyEntity =
                     FeedMetadataUtil.validateIfPolicyExists(curFeedConnection.getDataverseName(),
@@ -221,26 +225,36 @@ public class FeedOperations {
             for (Map.Entry<OperatorDescriptorId, IOperatorDescriptor> entry : operatorsMap.entrySet())
{
                 IOperatorDescriptor opDesc = entry.getValue();
                 OperatorDescriptorId oldId = opDesc.getOperatorId();
-                OperatorDescriptorId opId;
+                OperatorDescriptorId opId = null;
                 if (opDesc instanceof LSMTreeInsertDeleteOperatorDescriptor
                         && ((LSMTreeInsertDeleteOperatorDescriptor) opDesc).isPrimary())
{
                     String operandId = ((LSMTreeInsertDeleteOperatorDescriptor) opDesc).getIndexName();
-                    FeedMetaOperatorDescriptor metaOp = new FeedMetaOperatorDescriptor(jobSpec,
-                            new FeedConnectionId(ingestionOp.getEntityId(),
-                                    feedConnections.get(iter1).getDatasetName()),
-                            opDesc, feedPolicyEntity.getProperties(), FeedRuntimeType.STORE,
false, operandId);
+                    metaOp = new FeedMetaOperatorDescriptor(jobSpec,
+                            feedConnectionId, opDesc, feedPolicyEntity.getProperties(), FeedRuntimeType.STORE,
+                            operandId);
                     opId = metaOp.getOperatorId();
                     opDesc.setOperatorId(opId);
                 } else {
                     if (opDesc instanceof AlgebricksMetaOperatorDescriptor) {
                         AlgebricksMetaOperatorDescriptor algOp = (AlgebricksMetaOperatorDescriptor)
opDesc;
-                        for (IPushRuntimeFactory runtimeFactory : algOp.getPipeline().getRuntimeFactories())
{
-                            if (runtimeFactory instanceof StreamSelectRuntimeFactory) {
-                                ((StreamSelectRuntimeFactory) runtimeFactory).retainMissing(true,
0);
+                        IPushRuntimeFactory[] runtimeFactories = algOp.getPipeline().getRuntimeFactories();
+                        // Tweak AssignOp to work with messages
+                        if (runtimeFactories[0] instanceof AssignRuntimeFactory &&
runtimeFactories.length > 1) {
+                            IConnectorDescriptor connectorDesc = subJob.getOperatorInputMap()
+                                    .get(opDesc.getOperatorId()).get(0);
+                            // anything on the network interface needs to be message compatible
+                            if (connectorDesc instanceof MToNPartitioningConnectorDescriptor)
{
+                                metaOp = new FeedMetaOperatorDescriptor(jobSpec,
+                                        feedConnectionId, opDesc, feedPolicyEntity.getProperties(),
+                                        FeedRuntimeType.COMPUTE, null);
+                                opId = metaOp.getOperatorId();
+                                opDesc.setOperatorId(opId);
                             }
                         }
                     }
-                    opId = jobSpec.createOperatorDescriptorId(opDesc);
+                    if (opId == null) {
+                        opId = jobSpec.createOperatorDescriptorId(opDesc);
+                    }
                 }
                 operatorIdMapping.put(oldId, opId);
             }
@@ -250,9 +264,6 @@ public class FeedOperations {
             for (Entry<ConnectorDescriptorId, IConnectorDescriptor> entry : subJob.getConnectorMap().entrySet())
{
                 IConnectorDescriptor connDesc = entry.getValue();
                 ConnectorDescriptorId newConnId;
-                if (entry.getKey().getId() == 0) {
-                    continue;
-                }
                 if (connDesc instanceof MToNPartitioningConnectorDescriptor) {
                     MToNPartitioningConnectorDescriptor m2nConn = (MToNPartitioningConnectorDescriptor)
connDesc;
                     connDesc = new MToNPartitioningWithMessageConnectorDescriptor(jobSpec,
@@ -277,11 +288,8 @@ public class FeedOperations {
                 if (leftOp.getLeft() instanceof FeedCollectOperatorDescriptor) {
                     jobSpec.connect(new OneToOneConnectorDescriptor(jobSpec), replicateOp,
iter1, leftOpDesc,
                             leftOp.getRight());
-                    jobSpec.connect(new OneToOneConnectorDescriptor(jobSpec), leftOpDesc,
leftOp.getRight(),
-                            rightOpDesc, rightOp.getRight());
-                } else {
-                    jobSpec.connect(connDesc, leftOpDesc, leftOp.getRight(), rightOpDesc,
rightOp.getRight());
                 }
+                jobSpec.connect(connDesc, leftOpDesc, leftOp.getRight(), rightOpDesc, rightOp.getRight());
             }
 
             // prepare for setting partition constraints
@@ -295,16 +303,10 @@ public class FeedOperations {
                 switch (lexpr.getTag()) {
                     case PARTITION_COUNT:
                         opId = ((PartitionCountExpression) lexpr).getOperatorDescriptorId();
-                        if (opId.getId() == 0) {
-                            continue;
-                        }
                         operatorCounts.put(operatorIdMapping.get(opId), (int) ((ConstantExpression)
cexpr).getValue());
                         break;
                     case PARTITION_LOCATION:
                         opId = ((PartitionLocationExpression) lexpr).getOperatorDescriptorId();
-                        if (opId.getId() == 0) {
-                            continue;
-                        }
                         IOperatorDescriptor opDesc = jobSpec.getOperatorMap().get(operatorIdMapping.get(opId));
                         List<LocationConstraint> locations = operatorLocations.get(opDesc.getOperatorId());
                         if (locations == null) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/45b72a9a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.4.adm
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.4.adm
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.4.adm
index cbdc907..c31da8b 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.4.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.4.adm
@@ -1 +1 @@
-788
+804
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/45b72a9a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.5.adm
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.5.adm
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.5.adm
index cbdc907..14b76cb 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.5.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.5.adm
@@ -1 +1 @@
-788
+804

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/45b72a9a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
index c4cb650..97e5511 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
@@ -54,8 +54,7 @@ public class FeedCollectOperatorDescriptor extends AbstractSingleActivityOperato
     private final FeedRuntimeType subscriptionLocation;
 
     public FeedCollectOperatorDescriptor(JobSpecification spec, FeedConnectionId feedConnectionId,
ARecordType atype,
-            RecordDescriptor rDesc, Map<String, String> feedPolicyProperties,
-            FeedRuntimeType subscriptionLocation) {
+            RecordDescriptor rDesc, Map<String, String> feedPolicyProperties, FeedRuntimeType
subscriptionLocation) {
         super(spec, 1, 1);
         this.recordDescriptors[0] = rDesc;
         this.outputType = atype;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/45b72a9a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java
index d0d9f7b..cffd303 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java
@@ -76,7 +76,7 @@ public class FeedMetaOperatorDescriptor extends AbstractSingleActivityOperatorDe
 
     public FeedMetaOperatorDescriptor(final JobSpecification spec, final FeedConnectionId
feedConnectionId,
             final IOperatorDescriptor coreOperatorDescriptor, final Map<String, String>
feedPolicyProperties,
-            final FeedRuntimeType runtimeType, final boolean enableSubscriptionMode, final
String operandId) {
+            final FeedRuntimeType runtimeType, final String operandId) {
         super(spec, coreOperatorDescriptor.getInputArity(), coreOperatorDescriptor.getOutputArity());
         this.feedConnectionId = feedConnectionId;
         this.feedPolicyProperties = feedPolicyProperties;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/45b72a9a/asterixdb/asterix-installer/src/test/resources/transactionts/results/query_after_restart/dataset-with-meta-record/dataset-with-meta-record.5.adm
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-installer/src/test/resources/transactionts/results/query_after_restart/dataset-with-meta-record/dataset-with-meta-record.5.adm
b/asterixdb/asterix-installer/src/test/resources/transactionts/results/query_after_restart/dataset-with-meta-record/dataset-with-meta-record.5.adm
index 2975e63..c31da8b 100644
--- a/asterixdb/asterix-installer/src/test/resources/transactionts/results/query_after_restart/dataset-with-meta-record/dataset-with-meta-record.5.adm
+++ b/asterixdb/asterix-installer/src/test/resources/transactionts/results/query_after_restart/dataset-with-meta-record/dataset-with-meta-record.5.adm
@@ -1 +1 @@
-788
\ No newline at end of file
+804
\ No newline at end of file


Mime
View raw message