asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [1/2] incubator-asterixdb git commit: Asterix-1389 Fix Deadlocks in Feed Connections
Date Fri, 08 Apr 2016 16:36:23 GMT
Repository: incubator-asterixdb
Updated Branches:
  refs/heads/master db831a455 -> fd0147101


http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fd014710/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/FeedOperationCounter.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/FeedOperationCounter.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/FeedOperationCounter.java
index 7b74ef9..0dd87d6 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/FeedOperationCounter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/FeedOperationCounter.java
@@ -22,29 +22,19 @@ import org.apache.asterix.external.feed.watch.FeedJobInfo;
 
 public class FeedOperationCounter {
     private FeedJobInfo feedJobInfo;
-    private int providersCount;
-    private int jobsCount;
+    private int partitionCount;
     private boolean failedIngestion = false;
 
-    public FeedOperationCounter(int providersCount, int jobsCount) {
-        this.providersCount = providersCount;
-        this.jobsCount = jobsCount;
+    public FeedOperationCounter(int partitionCount) {
+        this.partitionCount = partitionCount;
     }
 
-    public int getProvidersCount() {
-        return providersCount;
+    public int getPartitionCount() {
+        return partitionCount;
     }
 
-    public void setProvidersCount(int providersCount) {
-        this.providersCount = providersCount;
-    }
-
-    public int getJobsCount() {
-        return jobsCount;
-    }
-
-    public void setJobsCount(int jobsCount) {
-        this.jobsCount = jobsCount;
+    public void setPartitionCount(int partitionCount) {
+        this.partitionCount = partitionCount;
     }
 
     public boolean isFailedIngestion() {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fd014710/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleEventSubscriber.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleEventSubscriber.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleEventSubscriber.java
index 0c8724e..ad3c1c9 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleEventSubscriber.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleEventSubscriber.java
@@ -27,7 +27,8 @@ public interface IFeedLifecycleEventSubscriber {
         FEED_COLLECT_STARTED,
         FEED_INTAKE_FAILURE,
         FEED_COLLECT_FAILURE,
-        FEED_ENDED
+        FEED_INTAKE_ENDED,
+        FEED_COLLECT_ENDED
     }
 
     public void assertEvent(FeedLifecycleEvent event) throws AsterixException, InterruptedException;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fd014710/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleListener.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleListener.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleListener.java
index 28b713e..448ea47 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleListener.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleListener.java
@@ -45,6 +45,6 @@ public interface IFeedLifecycleListener extends IJobLifecycleListener, IClusterE
 
     public List<String> getCollectLocations(FeedConnectionId feedConnectionId);
 
-    boolean isFeedConnectionActive(FeedConnectionId connectionId);
+    boolean isFeedConnectionActive(FeedConnectionId connectionId, IFeedLifecycleEventSubscriber
eventSubscriber);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fd014710/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedPartitionStartMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedPartitionStartMessage.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedPartitionStartMessage.java
new file mode 100644
index 0000000..49b23ed
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedPartitionStartMessage.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.message;
+
+import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.hyracks.api.job.JobId;
+
+public class FeedPartitionStartMessage extends AbstractApplicationMessage {
+
+    private static final long serialVersionUID = 1L;
+    private final FeedId feedId;
+    private final JobId jobId;
+
+    public FeedPartitionStartMessage(FeedId feedId, JobId jobId) {
+        this.feedId = feedId;
+        this.jobId = jobId;
+    }
+
+    @Override
+    public ApplicationMessageType getMessageType() {
+        return ApplicationMessageType.FEED_PROVIDER_READY;
+    }
+
+    public FeedId getFeedId() {
+        return feedId;
+    }
+
+    public JobId getJobId() {
+        return jobId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fd014710/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedProviderReadyMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedProviderReadyMessage.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedProviderReadyMessage.java
deleted file mode 100644
index 4c81c5b..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedProviderReadyMessage.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.message;
-
-import org.apache.asterix.common.messaging.AbstractApplicationMessage;
-import org.apache.asterix.external.feed.management.FeedId;
-import org.apache.hyracks.api.job.JobId;
-
-public class FeedProviderReadyMessage extends AbstractApplicationMessage {
-
-    private static final long serialVersionUID = 1L;
-    private final FeedId feedId;
-    private final JobId jobId;
-
-    public FeedProviderReadyMessage(FeedId feedId, JobId jobId) {
-        this.feedId = feedId;
-        this.jobId = jobId;
-    }
-
-    @Override
-    public ApplicationMessageType getMessageType() {
-        return ApplicationMessageType.FEED_PROVIDER_READY;
-    }
-
-    public FeedId getFeedId() {
-        return feedId;
-    }
-
-    public JobId getJobId() {
-        return jobId;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fd014710/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedConnectJobInfo.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedConnectJobInfo.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedConnectJobInfo.java
index 3e42169..b69a7b3 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedConnectJobInfo.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedConnectJobInfo.java
@@ -36,6 +36,7 @@ public class FeedConnectJobInfo extends FeedJobInfo {
     private List<String> collectLocations;
     private List<String> computeLocations;
     private List<String> storageLocations;
+    private int partitionStarts = 0;
 
     public FeedConnectJobInfo(JobId jobId, FeedJobState state, FeedConnectionId connectionId,
             IFeedJoint sourceFeedJoint, IFeedJoint computeFeedJoint, JobSpecification spec,
@@ -91,4 +92,12 @@ public class FeedConnectJobInfo extends FeedJobInfo {
         this.computeFeedJoint = computeFeedJoint;
     }
 
+    public void partitionStart() {
+        partitionStarts++;
+    }
+
+    public boolean collectionStarted() {
+        return partitionStarts == collectLocations.size();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fd014710/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
index 87e1edb..178d2d5 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
@@ -34,6 +34,7 @@ import org.apache.asterix.external.feed.dataflow.FeedFrameCollector.State;
 import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
 import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.asterix.external.feed.message.FeedPartitionStartMessage;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
 import org.apache.asterix.external.feed.runtime.CollectionRuntime;
 import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
@@ -85,6 +86,10 @@ public class FeedCollectOperatorNodePushable extends AbstractUnaryOutputSourceOp
             switch (((SubscribableFeedRuntimeId) sourceRuntime.getRuntimeId()).getFeedRuntimeType())
{
                 case INTAKE:
                     handleCompleteConnection();
+                    // Notify CC that Collection started
+                    ctx.sendApplicationMessageToCC(
+                            new FeedPartitionStartMessage(connectionId.getFeedId(), ctx.getJobletContext().getJobId()),
+                            null);
                     break;
                 case COMPUTE:
                     handlePartialConnection();
@@ -93,7 +98,6 @@ public class FeedCollectOperatorNodePushable extends AbstractUnaryOutputSourceOp
                     throw new IllegalStateException("Invalid source type "
                             + ((SubscribableFeedRuntimeId) sourceRuntime.getRuntimeId()).getFeedRuntimeType());
             }
-
             State state = collectRuntime.waitTillCollectionOver();
             if (state.equals(State.FINISHED)) {
                 feedManager.getFeedConnectionManager().deRegisterFeedRuntime(connectionId,

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fd014710/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
index 6771010..cd20900 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
@@ -36,7 +36,7 @@ import org.apache.asterix.external.feed.api.IIntakeProgressTracker;
 import org.apache.asterix.external.feed.api.ISubscriberRuntime;
 import org.apache.asterix.external.feed.dataflow.DistributeFeedFrameWriter;
 import org.apache.asterix.external.feed.management.FeedId;
-import org.apache.asterix.external.feed.message.FeedProviderReadyMessage;
+import org.apache.asterix.external.feed.message.FeedPartitionStartMessage;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
 import org.apache.asterix.external.feed.runtime.AdapterRuntimeManager;
 import org.apache.asterix.external.feed.runtime.CollectionRuntime;
@@ -117,7 +117,7 @@ public class FeedIntakeOperatorNodePushable extends AbstractUnaryOutputSourceOpe
                         adapterRuntimeManager, ctx);
                 feedSubscriptionManager.registerFeedSubscribableRuntime(ingestionRuntime);
                 // Notify FeedJobNotificationHandler that this provider is ready to receive
subscription requests.
-                ctx.sendApplicationMessageToCC(new FeedProviderReadyMessage(feedId, ctx.getJobletContext().getJobId()),
+                ctx.sendApplicationMessageToCC(new FeedPartitionStartMessage(feedId, ctx.getJobletContext().getJobId()),
                         null);
                 feedFrameWriter.open();
             } else {


Mime
View raw message