asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [1/2] incubator-asterixdb git commit: Fixed Feed Connect Statement
Date Wed, 23 Mar 2016 19:03:35 GMT
Repository: incubator-asterixdb
Updated Branches:
  refs/heads/master 9486133a1 -> b2a5170c6


http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/b2a5170c/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/FeedOperationCounter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/FeedOperationCounter.java
b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/FeedOperationCounter.java
new file mode 100644
index 0000000..7b74ef9
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/FeedOperationCounter.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.api;
+
+import org.apache.asterix.external.feed.watch.FeedJobInfo;
+
+public class FeedOperationCounter {
+    private FeedJobInfo feedJobInfo;
+    private int providersCount;
+    private int jobsCount;
+    private boolean failedIngestion = false;
+
+    public FeedOperationCounter(int providersCount, int jobsCount) {
+        this.providersCount = providersCount;
+        this.jobsCount = jobsCount;
+    }
+
+    public int getProvidersCount() {
+        return providersCount;
+    }
+
+    public void setProvidersCount(int providersCount) {
+        this.providersCount = providersCount;
+    }
+
+    public int getJobsCount() {
+        return jobsCount;
+    }
+
+    public void setJobsCount(int jobsCount) {
+        this.jobsCount = jobsCount;
+    }
+
+    public boolean isFailedIngestion() {
+        return failedIngestion;
+    }
+
+    public void setFailedIngestion(boolean failedIngestion) {
+        this.failedIngestion = failedIngestion;
+    }
+
+    public FeedJobInfo getFeedJobInfo() {
+        return feedJobInfo;
+    }
+
+    public void setFeedJobInfo(FeedJobInfo feedJobInfo) {
+        this.feedJobInfo = feedJobInfo;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/b2a5170c/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedProviderReadyMessage.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedProviderReadyMessage.java
b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedProviderReadyMessage.java
new file mode 100644
index 0000000..4c81c5b
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedProviderReadyMessage.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.message;
+
+import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.hyracks.api.job.JobId;
+
+public class FeedProviderReadyMessage extends AbstractApplicationMessage {
+
+    private static final long serialVersionUID = 1L;
+    private final FeedId feedId;
+    private final JobId jobId;
+
+    public FeedProviderReadyMessage(FeedId feedId, JobId jobId) {
+        this.feedId = feedId;
+        this.jobId = jobId;
+    }
+
+    @Override
+    public ApplicationMessageType getMessageType() {
+        return ApplicationMessageType.FEED_PROVIDER_READY;
+    }
+
+    public FeedId getFeedId() {
+        return feedId;
+    }
+
+    public JobId getJobId() {
+        return jobId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/b2a5170c/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
index d0348c2..8475e45 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
@@ -147,22 +147,7 @@ public class FeedCollectOperatorDescriptor extends AbstractSingleActivityOperato
     }
 
     private IngestionRuntime getIntakeRuntime(SubscribableFeedRuntimeId subscribableRuntimeId)
{
-        int waitCycleCount = 0;
-        ISubscribableRuntime ingestionRuntime = subscriptionManager.getSubscribableRuntime(subscribableRuntimeId);
-        while (ingestionRuntime == null && waitCycleCount < 1000) {
-            try {
-                Thread.sleep(3000);
-                waitCycleCount++;
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("waiting to obtain ingestion runtime for subscription " +
subscribableRuntimeId);
-                }
-            } catch (InterruptedException e) {
-                e.printStackTrace();
-                break;
-            }
-            ingestionRuntime = subscriptionManager.getSubscribableRuntime(subscribableRuntimeId);
-        }
-        return (IngestionRuntime) ingestionRuntime;
+        return (IngestionRuntime) subscriptionManager.getSubscribableRuntime(subscribableRuntimeId);
     }
 
     public ConnectionLocation getSubscriptionLocation() {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/b2a5170c/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
index c1748d9..6771010 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
@@ -36,6 +36,7 @@ import org.apache.asterix.external.feed.api.IIntakeProgressTracker;
 import org.apache.asterix.external.feed.api.ISubscriberRuntime;
 import org.apache.asterix.external.feed.dataflow.DistributeFeedFrameWriter;
 import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.asterix.external.feed.message.FeedProviderReadyMessage;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
 import org.apache.asterix.external.feed.runtime.AdapterRuntimeManager;
 import org.apache.asterix.external.feed.runtime.CollectionRuntime;
@@ -115,6 +116,9 @@ public class FeedIntakeOperatorNodePushable extends AbstractUnaryOutputSourceOpe
                 ingestionRuntime = new IngestionRuntime(feedId, runtimeId, feedFrameWriter,
recordDesc,
                         adapterRuntimeManager, ctx);
                 feedSubscriptionManager.registerFeedSubscribableRuntime(ingestionRuntime);
+                // Notify FeedJobNotificationHandler that this provider is ready to receive
subscription requests.
+                ctx.sendApplicationMessageToCC(new FeedProviderReadyMessage(feedId, ctx.getJobletContext().getJobId()),
+                        null);
                 feedFrameWriter.open();
             } else {
                 if (ingestionRuntime.getAdapterRuntimeManager().getState().equals(State.INACTIVE_INGESTION))
{


Mime
View raw message