asterixdb-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "abdullah alamoudi (Code Review)" <do-not-re...@asterixdb.incubator.apache.org>
Subject Change in asterixdb[master]: PLEASE EDIT to provide a meaningful commit message!
Date Mon, 19 Feb 2018 21:49:23 GMT
abdullah alamoudi has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/2404

Change subject: PLEASE EDIT to provide a meaningful commit message!
......................................................................

PLEASE EDIT to provide a meaningful commit message!

The following commits from your working branch will be included:

commit 43228caa7733c093dd98dff6b9ec04a940f1b4b3
Author: Abdullah Alamoudi <bamousaa@gmail.com>
Date:   Sun Feb 18 14:09:26 2018 -0800

    [NO ISSUE][ING] Prevent duplicate active runtimes in NCs

    - user model changes: no
    - storage format changes: no
    - interface changes: yes
      - Add getApplication() to NodeControllerService

    details:
    - Previously, when an active runtime is registered, we ignore
      if the runtime id is already registered.
    - After this change, such operation will throw an exception.
    - In addition, this change ensures that all previous tasks
      of a CC on an NC are completed before completion of
      registration.

Change-Id: I2626046ca6e809d964497a6e075f5b8848a7c1ea
---
M asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
M asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java
M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
A hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java
M hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java
13 files changed, 121 insertions(+), 15 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/04/2404/1

diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
index f9aef4c..77ca2c2 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
@@ -73,7 +73,9 @@
         if (shutdown) {
             throw new RuntimeDataException(ErrorCode.ACTIVE_MANAGER_SHUTDOWN);
         }
-        runtimes.putIfAbsent(runtime.getRuntimeId(), runtime);
+        if (runtimes.putIfAbsent(runtime.getRuntimeId(), runtime) != null) {
+            throw new RuntimeDataException(ErrorCode.ACTIVE_RUNTIME_ALREADY_REGISTERED, runtime.getRuntimeId());
+        }
     }
 
     public void deregisterRuntime(ActiveRuntimeId id) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index 1758daa..0902203 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -182,8 +182,9 @@
     protected void finish(ActiveEvent event) throws HyracksDataException {
         LOGGER.log(level, "the job " + jobId + " finished");
         if (numRegistered != numDeRegistered) {
-            LOGGER.log(Level.WARN, "the job " + jobId + " finished with reported runtime
registrations = "
-                    + numRegistered + " and deregistrations = " + numDeRegistered + " on
node controllers");
+            LOGGER.log(Level.WARN,
+                    "the job {} finished with reported runtime registrations = {} and deregistrations
= {} on node controllers",
+                    jobId, numRegistered, numDeRegistered);
         }
         jobId = null;
         Pair<JobStatus, List<Exception>> status = (Pair<JobStatus, List<Exception>>)
event.getEventObject();
@@ -194,8 +195,7 @@
             jobFailure = exceptions.isEmpty() ? new RuntimeDataException(ErrorCode.UNREPORTED_TASK_FAILURE_EXCEPTION)
                     : exceptions.get(0);
             setState((state == ActivityState.STOPPING) ? ActivityState.STOPPED : ActivityState.TEMPORARILY_FAILED);
-            if (prevState != ActivityState.SUSPENDING && prevState != ActivityState.RECOVERING
-                    && prevState != ActivityState.RESUMING && prevState !=
ActivityState.STOPPING) {
+            if (prevState == ActivityState.RUNNING) {
                 recover();
             }
         } else {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index b0382f7..6b59942 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -207,7 +207,7 @@
     }
 
     @Override
-    public void onRegisterNode(CcId ccId) throws Exception {
+    public void completeRegistration(CcId ccId) throws HyracksDataException {
         if (startupCompleted) {
             /*
              * If the node completed its startup before, then this is a re-registration with
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 1a83603..bc5c365 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -255,6 +255,7 @@
     public static final int FEED_FAILED_WHILE_GETTING_A_NEW_RECORD = 3110;
     public static final int FEED_START_FEED_WITHOUT_CONNECTION = 3111;
     public static final int PARSER_COLLECTION_ITEM_CANNOT_BE_NULL = 3112;
+    public static final int ACTIVE_RUNTIME_ALREADY_REGISTERED = 3113;
 
     // Lifecycle management errors
     public static final int DUPLICATE_PARTITION_ID = 4000;
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index 995b541..9bf513e 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -244,6 +244,7 @@
 3110 = Feed failed while reading a new record
 3111 = Feed %1$s is not connected to any dataset
 3112 = Array/Multiset item cannot be null
+3113 = Active Runtime %1$s is already registered
 
 # Lifecycle management errors
 4000 = Partition id %1$d for node %2$s already in use by node %3$s
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java
index af6cb92..9a07fb8 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java
@@ -19,6 +19,7 @@
 package org.apache.hyracks.api.application;
 
 import org.apache.hyracks.api.control.CcId;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IFileDeviceResolver;
 import org.apache.hyracks.api.job.resource.NodeCapacity;
 
@@ -34,5 +35,14 @@
      */
     IFileDeviceResolver getFileDeviceResolver();
 
-    void onRegisterNode(CcId ccId) throws Exception;
+    /**
+     * Called when a node ensures that no leftover resources are still there
+     * from previous registrations
+     *
+     * @param ccId
+     *            the id of the cluster controller
+     * @throws HyracksDataException
+     *             if a failure completing the registration takes place
+     */
+    void completeRegistration(CcId ccId) throws HyracksDataException;
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index f8fe77f..936f6dd 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -328,7 +328,6 @@
     }
 
     private void stopApplication() throws Exception {
-
         application.stop();
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
index ea16032..fe18461 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
@@ -26,6 +26,7 @@
 import org.apache.hyracks.api.config.IConfigManager;
 import org.apache.hyracks.api.config.Section;
 import org.apache.hyracks.api.control.CcId;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IFileDeviceResolver;
 import org.apache.hyracks.api.job.resource.NodeCapacity;
 import org.apache.hyracks.api.util.HyracksConstants;
@@ -60,7 +61,7 @@
     }
 
     @Override
-    public void onRegisterNode(CcId ccId) throws Exception {
+    public void completeRegistration(CcId ccId) throws HyracksDataException {
         // no-op
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 0f40b60..1883947 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -62,6 +62,7 @@
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
 import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
 import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.api.util.InvokeUtil;
 import org.apache.hyracks.control.common.base.IClusterController;
 import org.apache.hyracks.control.common.config.ConfigManager;
 import org.apache.hyracks.control.common.context.ServerContext;
@@ -95,7 +96,6 @@
 import org.apache.hyracks.net.protocols.muxdemux.FullFrameChannelInterfaceFactory;
 import org.apache.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters;
 import org.apache.hyracks.util.ExitUtil;
-import org.apache.hyracks.api.util.InvokeUtil;
 import org.apache.hyracks.util.PidHelper;
 import org.apache.hyracks.util.trace.ITracer;
 import org.apache.hyracks.util.trace.Tracer;
@@ -419,7 +419,6 @@
         if (distributedState != null) {
             getDistributedState().put(ccId, distributedState);
         }
-        application.onRegisterNode(ccId);
         IClusterController ccs = ccc.getClusterControllerService();
         NodeParameters nodeParameters = ccc.getNodeParameters();
 
@@ -798,4 +797,8 @@
         return application.getApplicationContext();
     }
 
+    public INCApplication getApplication() {
+        return application;
+    }
+
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index 9b32cc7..16c8156 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -102,6 +102,8 @@
 
     private volatile boolean aborted;
 
+    private volatile boolean completed = false;
+
     private NodeControllerService ncs;
 
     private List<List<PartitionChannel>> inputChannelsFromConnectors;
@@ -255,8 +257,7 @@
         if (aborted) {
             return false;
         }
-        pendingThreads.add(t);
-        return true;
+        return pendingThreads.add(t);
     }
 
     private synchronized void removePendingThread(Thread t) {
@@ -276,6 +277,7 @@
     public void run() {
         Thread ct = Thread.currentThread();
         String threadName = ct.getName();
+        ct.setName(displayName + ":" + taskAttemptId + ":" + 0);
         // Calls synchronized addPendingThread(..) to make sure that in the abort() method,
         // the thread is not escaped from interruption.
         if (!addPendingThread(ct)) {
@@ -285,7 +287,6 @@
                     .schedule(new NotifyTaskFailureWork(ncs, this, exceptions, joblet.getJobId(),
taskAttemptId));
             return;
         }
-        ct.setName(displayName + ":" + taskAttemptId + ":" + 0);
         try {
             Exception operatorException = null;
             try {
@@ -354,6 +355,7 @@
             ct.setName(threadName);
             close();
             removePendingThread(ct);
+            completed = true;
         }
         if (!exceptions.isEmpty()) {
             if (LOGGER.isWarnEnabled()) {
@@ -461,6 +463,7 @@
         return ncs.createOrGetJobParameterByteStore(joblet.getJobId()).getParameterValue(name,
start, length);
     }
 
+    @Override
     public Set<JobFlag> getJobFlags() {
         return jobFlags;
     }
@@ -469,4 +472,8 @@
     public IStatsCollector getStatsCollector() {
         return statsCollector;
     }
+
+    public boolean isCompleted() {
+        return completed;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
index 2bcf414..40c0e97 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
@@ -18,7 +18,9 @@
  */
 package org.apache.hyracks.control.nc.work;
 
+import java.util.ArrayDeque;
 import java.util.Collection;
+import java.util.Deque;
 
 import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
@@ -50,12 +52,14 @@
         if (dpm == null) {
             LOGGER.log(Level.WARN, "DatasetPartitionManager is null on " + ncs.getId());
         }
+        Deque<Task> abortedTasks = new ArrayDeque<>();
         Collection<Joblet> joblets = ncs.getJobletMap().values();
         // TODO(mblow): should we have one jobletmap per cc?
         joblets.stream().filter(joblet -> joblet.getJobId().getCcId().equals(ccId)).forEach(joblet
-> {
             Collection<Task> tasks = joblet.getTaskMap().values();
             for (Task task : tasks) {
                 task.abort();
+                abortedTasks.add(task);
             }
             final JobId jobId = joblet.getJobId();
             if (dpm != null) {
@@ -64,5 +68,6 @@
             }
             ncs.getWorkQueue().schedule(new CleanupJobletWork(ncs, jobId, JobStatus.FAILURE));
         });
+        ncs.getWorkQueue().schedule(new EnsureAllCcTasksCompleted(ncs, ccId, abortedTasks));
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java
new file mode 100644
index 0000000..8c609ec
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.work;
+
+import java.util.Deque;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hyracks.api.control.CcId;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.control.common.work.AbstractWork;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.control.nc.Task;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class EnsureAllCcTasksCompleted extends AbstractWork {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+    private static final long TIMEOUT = TimeUnit.MINUTES.toMillis(2);
+    private final NodeControllerService ncs;
+    private final CcId ccId;
+    private final Deque<Task> abortedTasks;
+    private final long startTime;
+
+    public EnsureAllCcTasksCompleted(NodeControllerService ncs, CcId ccId, Deque<Task>
abortedTasks) {
+        this.ncs = ncs;
+        this.ccId = ccId;
+        this.abortedTasks = abortedTasks;
+        startTime = System.currentTimeMillis();
+    }
+
+    @Override
+    public void run() {
+        int numTasks = abortedTasks.size();
+        for (int i = 0; i < numTasks; i++) {
+            Task task = abortedTasks.poll();
+            if (!task.isCompleted()) {
+                abortedTasks.add(task);
+            }
+        }
+        if (abortedTasks.isEmpty()) {
+            // all tasks has completed
+            try {
+                ncs.getApplication().completeRegistration(ccId);
+            } catch (HyracksDataException e) {
+                LOGGER.log(Level.WARN, "Failed to complete registration", e);
+            }
+        } else {
+            if (System.currentTimeMillis() - startTime > TIMEOUT) {
+                LOGGER.log(Level.ERROR,
+                        "Failed to abort all previous tasks associated with CC {} after {}ms.
Giving up", ccId,
+                        TIMEOUT);
+            } else {
+                ncs.getWorkQueue().schedule(this);
+            }
+        }
+    }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java
b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java
index 15248e7..ab9aaec 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java
@@ -23,6 +23,7 @@
 import org.apache.hyracks.api.application.IServiceContext;
 import org.apache.hyracks.api.config.IConfigManager;
 import org.apache.hyracks.api.control.CcId;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IFileDeviceResolver;
 import org.apache.hyracks.api.job.resource.NodeCapacity;
 
@@ -46,7 +47,7 @@
     }
 
     @Override
-    public void onRegisterNode(CcId ccs) throws Exception {
+    public void completeRegistration(CcId ccs) throws HyracksDataException {
         // No-op
     }
 

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2404
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I2626046ca6e809d964497a6e075f5b8848a7c1ea
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <bamousaa@gmail.com>

Mime
View raw message