ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jlun...@apache.org
Subject ambari git commit: AMBARI-13065: RU: Core Slaves restart schedule is extremely slow on very large cluster (jluniya)
Date Sat, 19 Sep 2015 00:27:50 GMT
Repository: ambari
Updated Branches:
  refs/heads/branch-2.1 07629edc8 -> 1aae6477b


AMBARI-13065: RU: Core Slaves restart schedule is extremely slow on very large cluster (jluniya)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/1aae6477
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/1aae6477
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/1aae6477

Branch: refs/heads/branch-2.1
Commit: 1aae6477b777996b3a1a2abfe30de471b9f4a85f
Parents: 07629ed
Author: Jayush Luniya <jluniya@hortonworks.com>
Authored: Fri Sep 18 17:23:32 2015 -0700
Committer: Jayush Luniya <jluniya@hortonworks.com>
Committed: Fri Sep 18 17:27:45 2015 -0700

----------------------------------------------------------------------
 .../actionmanager/ActionDBAccessorImpl.java     |  70 ++++--
 .../server/actionmanager/ActionScheduler.java   |  10 +-
 .../ambari/server/actionmanager/Request.java    |  40 ++-
 .../apache/ambari/server/utils/LoopBody.java    |  27 +++
 .../apache/ambari/server/utils/Parallel.java    | 242 +++++++++++++++++++
 .../ambari/server/utils/ParallelLoopResult.java |  63 +++++
 .../AmbariManagementControllerTest.java         |   3 +-
 .../ambari/server/utils/TestParallel.java       | 188 ++++++++++++++
 8 files changed, 613 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/1aae6477/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
index 44e2eae..146a1e9 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
@@ -48,6 +48,9 @@ import org.apache.ambari.server.orm.entities.RoleSuccessCriteriaEntity;
 import org.apache.ambari.server.orm.entities.StageEntity;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.Host;
+import org.apache.ambari.server.utils.LoopBody;
+import org.apache.ambari.server.utils.Parallel;
+import org.apache.ambari.server.utils.ParallelLoopResult;
 import org.apache.ambari.server.utils.StageUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -133,11 +136,25 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
    */
   @Override
   public List<Stage> getAllStages(long requestId) {
-    List<Stage> stages = new ArrayList<Stage>();
-    for (StageEntity stageEntity : stageDAO.findByRequestId(requestId)) {
-      stages.add(stageFactory.createExisting(stageEntity));
+    List<StageEntity> stageEntities = stageDAO.findByRequestId(requestId);
+    ParallelLoopResult<Stage> loopResult = Parallel.forLoop(stageEntities, new LoopBody<StageEntity,
Stage>() {
+      @Override
+      public Stage run(StageEntity stageEntity) {
+        return stageFactory.createExisting(stageEntity);
+      }
+    });
+    if(loopResult.getIsCompleted()) {
+      return loopResult.getResult();
+    } else {
+      // Fetch any missing results sequentially
+      List<Stage> stages = loopResult.getResult();
+      for(int i = 0; i < stages.size(); i++) {
+        if(stages.get(i) == null) {
+          stages.set(i, stageFactory.createExisting(stageEntities.get(i)));
+        }
+      }
+      return stages;
     }
-    return stages;
   }
 
   @Override
@@ -206,15 +223,25 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
    */
   @Override
   public List<Stage> getStagesInProgress() {
-    List<Stage> stages = new ArrayList<Stage>();
-
     List<StageEntity> stageEntities = stageDAO.findByCommandStatuses(HostRoleStatus.IN_PROGRESS_STATUSES);
-
-    for (StageEntity stageEntity : stageEntities) {
-      stages.add(stageFactory.createExisting(stageEntity));
+    ParallelLoopResult<Stage> loopResult = Parallel.forLoop(stageEntities, new LoopBody<StageEntity,
Stage>() {
+      @Override
+      public Stage run(StageEntity stageEntity) {
+        return stageFactory.createExisting(stageEntity);
+      }
+    });
+    if(loopResult.getIsCompleted()) {
+      return loopResult.getResult();
+    } else {
+      // Fetch any missing results sequentially
+      List<Stage> stages = loopResult.getResult();
+      for(int i = 0; i < stages.size(); i++) {
+        if(stages.get(i) == null) {
+          stages.set(i, stageFactory.createExisting(stageEntities.get(i)));
+        }
+      }
+      return stages;
     }
-
-    return stages;
   }
 
   /**
@@ -662,11 +689,24 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
   @Override
   public List<Request> getRequests(Collection<Long> requestIds){
     List<RequestEntity> requestEntities = requestDAO.findByPks(requestIds);
-    List<Request> requests = new ArrayList<Request>(requestEntities.size());
-    for (RequestEntity requestEntity : requestEntities) {
-      requests.add(requestFactory.createExisting(requestEntity));
+    ParallelLoopResult<Request> loopResult = Parallel.forLoop(requestEntities, new
LoopBody<RequestEntity, Request>() {
+      @Override
+      public Request run(RequestEntity requestEntity) {
+        return requestFactory.createExisting(requestEntity);
+      }
+    });
+    if(loopResult.getIsCompleted()) {
+      return loopResult.getResult();
+    } else {
+      // Fetch any missing results sequentially
+      List<Request> requests = loopResult.getResult();
+      for(int i = 0; i < requests.size(); i++) {
+        if(requests.get(i) == null) {
+          requests.set(i, requestFactory.createExisting(requestEntities.get(i)));
+        }
+      }
+      return requests;
     }
-    return requests;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/ambari/blob/1aae6477/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
index 5833125..1e019b7 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
@@ -468,14 +468,8 @@ class ActionScheduler implements Runnable {
 
     if (prevStageId > 0) {
       // Find previous stage instance
-      List<Stage> allStages = db.getAllStages(stage.getRequestId());
-      Stage prevStage = null;
-      for (Stage s : allStages) {
-        if (s.getStageId() == prevStageId) {
-          prevStage = s;
-          break;
-        }
-      }
+      String actionId = StageUtils.getActionId(stage.getRequestId(), prevStageId);
+      Stage prevStage = db.getStage(actionId);
 
       // If the previous stage is skippable then we shouldn't automatically fail the given
stage
       if (prevStage == null || prevStage.isSkippable()) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/1aae6477/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Request.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Request.java
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Request.java
index faebb20..26447e6 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Request.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Request.java
@@ -20,6 +20,7 @@ package org.apache.ambari.server.actionmanager;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 
 import com.google.inject.Inject;
@@ -36,6 +37,9 @@ import org.apache.ambari.server.orm.entities.RequestOperationLevelEntity;
 import org.apache.ambari.server.orm.entities.RequestResourceFilterEntity;
 import org.apache.ambari.server.orm.entities.StageEntity;
 import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.utils.LoopBody;
+import org.apache.ambari.server.utils.Parallel;
+import org.apache.ambari.server.utils.ParallelLoopResult;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -158,7 +162,7 @@ public class Request {
   /**
    * Load existing request from database
    */
-  public Request(@Assisted RequestEntity entity, StageFactory stageFactory, Clusters clusters){
+  public Request(@Assisted RequestEntity entity, final StageFactory stageFactory, Clusters
clusters){
     if (entity == null) {
       throw new RuntimeException("Request entity cannot be null.");
     }
@@ -189,13 +193,37 @@ public class Request {
       this.requestScheduleId = entity.getRequestScheduleEntity().getScheduleId();
     }
 
-    for (StageEntity stageEntity : entity.getStages()) {
-      Stage stage = stageFactory.createExisting(stageEntity);
-      stages.add(stage);
+    Collection<StageEntity> stageEntities = entity.getStages();
+    if(stageEntities == null || stageEntities.isEmpty()) {
+      stages = Collections.emptyList();
+    } else {
+      List<StageEntity> stageEntityList;
+      if(stageEntities instanceof List) {
+        stageEntityList = (List<StageEntity>) stageEntities;
+      } else {
+        stageEntityList = new ArrayList<StageEntity>(stageEntities);
+      }
+      ParallelLoopResult<Stage> loopResult = Parallel.forLoop(stageEntityList, new
LoopBody<StageEntity, Stage>() {
+        @Override
+        public Stage run(StageEntity stageEntity) {
+          return stageFactory.createExisting(stageEntity);
+        }
+      });
+      List<Stage> stageList;
+      if(loopResult.getIsCompleted()) {
+        stageList = loopResult.getResult();
+      } else {
+        // Fetch any missing results sequentially
+        stageList = loopResult.getResult();
+        for(int i = 0; i < stages.size(); i++) {
+          if(stageList.get(i) == null) {
+            stageList.set(i, stageFactory.createExisting(stageEntityList.get(i)));
+          }
+        }
+      }
+      stages = stageList;
     }
-
     resourceFilters = filtersFromEntity(entity);
-
     operationLevel = operationLevelFromEntity(entity);
   }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/1aae6477/ambari-server/src/main/java/org/apache/ambari/server/utils/LoopBody.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/utils/LoopBody.java b/ambari-server/src/main/java/org/apache/ambari/server/utils/LoopBody.java
new file mode 100644
index 0000000..0a93814
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/utils/LoopBody.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.utils;
+
+/**
+ * Interface for loop body invoked during each iteration of parallel loops
+ * @param <T> The type of source data that will be operated upon
+ * @param <R> The type of result data
+ */
+public interface LoopBody <T, R> {
+  R run(T t);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/1aae6477/ambari-server/src/main/java/org/apache/ambari/server/utils/Parallel.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/utils/Parallel.java b/ambari-server/src/main/java/org/apache/ambari/server/utils/Parallel.java
new file mode 100644
index 0000000..a67ee5c
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/utils/Parallel.java
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.utils;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ThreadFactory;
+import java.util.List;
+import java.util.LinkedList;
+import java.util.concurrent.Future;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *  This class provides support for parallel loops.
+ *  Iterations in the loop run in parallel in parallel loops.
+ */
+public class Parallel {
+
+  /**
+   * Max pool size
+   */
+  private static final int MAX_POOL_SIZE = Math.max(2, Runtime.getRuntime().availableProcessors());
+
+  /**
+   * Keep alive time (1 sec)
+   */
+  private static final int KEEP_ALIVE_TIME_MILLISECONDS = 1000;
+
+  /**
+   * Poll duration (10 secs)
+   */
+  private static final int POLL_DURATION_MILLISECONDS = 10000;
+
+  /**
+   * Logger
+   */
+  private static final Logger LOG = LoggerFactory.getLogger(Parallel.class);
+
+  /**
+   *  Thread pool executor
+   */
+  private static ExecutorService executor = initExecutor();
+
+  /**
+   * Initialize executor
+   *
+   * @return
+   */
+  private static ExecutorService initExecutor() {
+
+    BlockingQueue<Runnable> blockingQueue = new SynchronousQueue<Runnable>();
// Using synchronous queue
+
+    // Create thread pool
+    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
+        0,                                        // Core pool size
+        MAX_POOL_SIZE,                            // Max pool size
+        KEEP_ALIVE_TIME_MILLISECONDS,             // Keep alive time for idle threads
+        TimeUnit.MILLISECONDS,
+        blockingQueue,                            // Using synchronous queue
+        new ParallelLoopsThreadFactory(),         // Thread pool factory to use
+        new ThreadPoolExecutor.CallerRunsPolicy() // Rejected tasks will run on calling thread.
+    );
+    threadPool.allowCoreThreadTimeOut(true);
+    LOG.debug(
+        "Parallel library initialized: ThreadCount = {}, CurrentPoolSize = {}, CorePoolSize
= {}, MaxPoolSize = {}",
+        Thread.activeCount(), threadPool.getPoolSize(), threadPool.getCorePoolSize(), threadPool.getMaximumPoolSize());
+    return threadPool;
+  }
+
+  /**
+   * Executes a "for" parallel loop operation over all items in the data source in which
iterations run in parallel.
+   *
+   * @param source      Data source to iterate over
+   * @param loopBody    The loop body that is invoked once per iteration
+   * @param <T>         The type of data in the source
+   * @param <R>         The type of data to be returned
+   * @return            {@link ParallelLoopResult} Parallel loop result
+   */
+  public static <T, R> ParallelLoopResult<R> forLoop(
+      List<T> source,
+      final LoopBody<T, R> loopBody) {
+
+    if(source == null || source.isEmpty()) {
+      return new ParallelLoopResult<R>(true, (List<R>) Collections.emptyList());
+    }
+    return forLoop(source, 0, source.size(), loopBody);
+  }
+
+  /**
+   * Executes a "for" parallel loop operation in which iterations run in parallel.
+   *
+   * @param source      Data source to iterate over
+   * @param startIndex  The loop start index, inclusive
+   * @param endIndex    The loop end index, exclusive
+   * @param loopBody    The loop body that is invoked once per iteration
+   * @param <T>         The type of data in the source
+   * @param <R>         The type of data to be returned
+   * @return            {@link ParallelLoopResult} Parallel loop result
+   *
+   */
+  public static <T, R> ParallelLoopResult<R> forLoop(
+      final List<T> source,
+      int startIndex,
+      int endIndex,
+      final LoopBody<T, R> loopBody) {
+
+    if(source == null || source.isEmpty() || startIndex == endIndex) {
+      return new ParallelLoopResult<R>(true, (List<R>) Collections.emptyList());
+    }
+    if(startIndex < 0 || startIndex >= source.size()) {
+      throw new IndexOutOfBoundsException("startIndex is out of bounds");
+    }
+    if(endIndex < 0 || endIndex > source.size()) {
+      throw new IndexOutOfBoundsException("endIndex is out of bounds");
+    }
+    if(startIndex > endIndex) {
+      throw new IndexOutOfBoundsException("startIndex > endIndex");
+    }
+    if(source.size() == 1 || (endIndex - startIndex) == 1) {
+      // Don't spawn a new thread for a single element list
+      List<R> result = Collections.singletonList(loopBody.run(source.get(startIndex)));
+      return new ParallelLoopResult<R>(true, result);
+    }
+
+    // Create a completion service for each call
+    CompletionService<ResultWrapper<R>> completionService = new ExecutorCompletionService<ResultWrapper<R>>(executor);
+
+    List<Future<ResultWrapper<R>>> futures = new LinkedList<Future<ResultWrapper<R>>>();
+    for (int i = startIndex; i < endIndex; i++) {
+      final Integer k = i;
+      Future<ResultWrapper<R>> future = completionService.submit(new Callable<ResultWrapper<R>>()
{
+        @Override
+        public ResultWrapper<R> call() throws Exception {
+          ResultWrapper<R> res = new ResultWrapper<R>();
+          res.index = k;
+          res.result = loopBody.run(source.get(k));
+          return res;
+        }
+      });
+      futures.add(future);
+    }
+
+    boolean completed = true;
+    R[] result = (R[]) new Object[futures.size()];
+    try {
+      for (int i = 0; i < futures.size(); i++) {
+        Future<ResultWrapper<R>> futureResult = completionService.poll(POLL_DURATION_MILLISECONDS,
TimeUnit.MILLISECONDS);
+        if (futureResult == null) {
+          // Time out! no progress was made during the last poll duration. Abort the threads
and cancel the threads.
+          LOG.error("Completion service in Parallel.forLoop timed out!");
+          completed = false;
+          for(int fIndex = 0; fIndex < futures.size(); fIndex++) {
+            Future<ResultWrapper<R>> future = futures.get(fIndex);
+            if(future.isDone()) {
+              LOG.debug("    Task - {} has already completed", fIndex);
+            } else if(future.isCancelled()) {
+              LOG.debug("    Task - {} has already been cancelled", fIndex);
+            } else if(!future.cancel(true)) {
+              LOG.debug("    Task - {} could not be cancelled", fIndex);
+            } else {
+              LOG.debug("    Task - {} successfully cancelled", fIndex);
+            }
+          }
+          break;
+        } else {
+          ResultWrapper<R> res = futureResult.get();
+          if(res.result != null) {
+            result[res.index] = res.result;
+          } else {
+            LOG.error("Result is null for {}", res.index);
+            completed = false;
+          }
+        }
+      }
+    } catch (InterruptedException e) {
+      LOG.error("Caught InterruptedException in Parallel.forLoop", e);
+      completed = false;
+    } catch (ExecutionException e) {
+      LOG.error("Caught ExecutionException in Parallel.forLoop", e);
+      completed = false;
+    }
+    // Return parallel loop result
+    return new ParallelLoopResult<R>(completed, Arrays.asList(result));
+  }
+
+  /**
+   * A custom {@link ThreadFactory} for the threads that will handle
+   * {@link org.apache.ambari.server.utils.Parallel} loop iterations.
+   */
+  private static final class ParallelLoopsThreadFactory implements ThreadFactory {
+
+    private static final AtomicInteger threadId = new AtomicInteger(1);
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public Thread newThread(Runnable r) {
+      Thread thread = Executors.defaultThreadFactory().newThread(r);
+      thread.setName("parallel-loop-" + threadId.getAndIncrement());
+      return thread;
+    }
+  }
+
+  /**
+   * Result wrapper for Parallel.forLoop used internally
+   * @param <R> Type of result to wrap
+   */
+  private static final class ResultWrapper<R> {
+    int index;
+    R result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/1aae6477/ambari-server/src/main/java/org/apache/ambari/server/utils/ParallelLoopResult.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/utils/ParallelLoopResult.java
b/ambari-server/src/main/java/org/apache/ambari/server/utils/ParallelLoopResult.java
new file mode 100644
index 0000000..85ff706
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/utils/ParallelLoopResult.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.utils;
+
+import java.util.List;
+
+/**
+ * Provides completion status and results of a {@link Parallel} loop
+ * @param <R> Result type
+ */
+public class ParallelLoopResult<R> {
+  private boolean isCompleted;
+  private List<R> result;
+
+  /**
+   * Flag to indicate if the parallel loop completed all iterations
+   * @return
+   */
+  public boolean getIsCompleted() {
+    return isCompleted;
+  }
+
+  /**
+   * Flag to indicate if the parallel loop completed all iterations
+   * @return
+   */
+  public void setIsCompleted(boolean completed) {
+    isCompleted = completed;
+  }
+
+  public List<R> getResult() {
+    return result;
+  }
+
+  public void setResult(List<R> result) {
+    this.result = result;
+  }
+
+  /**
+   * Constructor
+   * @param completed Indicates if the parallel loop completed all iterations
+   * @param result    Results of parallel loop. Results could be partially completed.
+   */
+  public ParallelLoopResult(boolean completed, List<R> result) {
+    isCompleted = completed;
+    this.result = result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/1aae6477/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java
b/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java
index 05eb277..a322064 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java
@@ -5486,7 +5486,8 @@ public class AmbariManagementControllerTest {
     long requestId2 = startService(clusterName, serviceName1, true, true);
     long requestId3 = startService(clusterName, serviceName2, true, true);
 
-    stages = actionDB.getAllStages(requestId2);
+    stages = new ArrayList<>();
+    stages.addAll(actionDB.getAllStages(requestId2));
     stages.addAll(actionDB.getAllStages(requestId3));
     HostRoleCommand hdfsCmdHost3 = null;
     HostRoleCommand hdfsCmdHost2 = null;

http://git-wip-us.apache.org/repos/asf/ambari/blob/1aae6477/ambari-server/src/test/java/org/apache/ambari/server/utils/TestParallel.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/utils/TestParallel.java
b/ambari-server/src/test/java/org/apache/ambari/server/utils/TestParallel.java
new file mode 100644
index 0000000..0628f20
--- /dev/null
+++ b/ambari-server/src/test/java/org/apache/ambari/server/utils/TestParallel.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.utils;
+
+import org.junit.Test;
+import junit.framework.Assert;
+
+
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Collections;
+import java.util.LinkedList;
+
+/**
+ * Tests parallel loops
+ */
+public class TestParallel {
+
+  /**
+   * Tests {@link org.apache.ambari.server.utils.Parallel} forLoop base cases.
+   * @throws Exception
+   */
+  @Test
+  public void testParallelForLoopBaseCases() throws Exception {
+
+    ParallelLoopResult<Integer> nullLoopResult = Parallel.forLoop(
+        null,
+        new LoopBody<Integer, Integer>() {
+          @Override
+          public Integer run(Integer integer) {
+            return integer;
+          }
+        });
+    Assert.assertTrue(nullLoopResult.getIsCompleted());
+    Assert.assertTrue(nullLoopResult.getResult().isEmpty());
+
+    ParallelLoopResult<Integer> emptyLoopResult = Parallel.forLoop(
+        new ArrayList<Integer>(),
+        new LoopBody<Integer, Integer>() {
+          @Override
+          public Integer run(Integer integer) {
+            return integer;
+          }
+        });
+    Assert.assertTrue(emptyLoopResult.getIsCompleted());
+    Assert.assertTrue(emptyLoopResult.getResult().isEmpty());
+
+    ParallelLoopResult<Integer> singleElementLoopResult = Parallel.forLoop(
+        Collections.singletonList(7),
+        new LoopBody<Integer, Integer>() {
+          @Override
+          public Integer run(Integer integer) {
+            return integer;
+          }
+        });
+    Assert.assertTrue(singleElementLoopResult.getIsCompleted());
+    List<Integer> singleElementList = singleElementLoopResult.getResult();
+    Assert.assertTrue(singleElementLoopResult.getIsCompleted());
+    Assert.assertFalse(singleElementList.isEmpty());
+    Assert.assertEquals(1, singleElementList.size());
+    Assert.assertNotNull(singleElementList.get(0));
+  }
+
+  /**
+   * Tests Parallel.forLoop
+   * @throws Exception
+   */
+  @Test
+  public void testParallelForLoop() throws Exception {
+    final List<Integer> input = new LinkedList<Integer>();
+    for(int i = 0; i < 10; i++) {
+      input.add(i);
+    }
+
+    ParallelLoopResult<Integer> loopResult = Parallel.forLoop(input, new LoopBody<Integer,
Integer>() {
+      @Override
+      public Integer run(Integer in1) {
+        return in1 * in1;
+      }
+    });
+    Assert.assertTrue(loopResult.getIsCompleted());
+    Assert.assertNotNull(loopResult.getResult());
+
+    List<Integer> output = loopResult.getResult();
+    Assert.assertEquals(input.size(), output.size());
+    for(int i = 0; i < input.size(); i++) {
+      Assert.assertEquals( i * i,  (int)output.get(i));
+    }
+  }
+
+  /**
+   * Tests nested {@link org.apache.ambari.server.utils.Parallel} forLoop
+   * @throws Exception
+   */
+  @Test
+  public void testNestedParallelForLoopIterationFailures() throws Exception {
+    final List<Integer> input = new LinkedList<Integer>();
+    for(int i = 0; i < 10; i++) {
+      input.add(i);
+    }
+    final ParallelLoopResult<Integer>[] innerLoopResults =  new ParallelLoopResult[input.size()];
+    ParallelLoopResult<Integer> loopResult = Parallel.forLoop(input, new LoopBody<Integer,
Integer>() {
+      @Override
+      public Integer run(final Integer in1) {
+        int sq = in1 * in1;
+        ParallelLoopResult<Integer> innerLoopResult = Parallel.forLoop(input, new LoopBody<Integer,
Integer>() {
+          @Override
+          public Integer run(Integer in2) {
+            return in1 * in2;
+          }
+        });
+        innerLoopResults[in1] = innerLoopResult;
+        return in1 * in1;
+      }
+    });
+    Assert.assertNotNull(loopResult);
+    Assert.assertTrue(loopResult.getIsCompleted());
+    List<Integer> output = loopResult.getResult();
+    Assert.assertNotNull(output);
+    Assert.assertEquals(input.size(), output.size());
+
+    for(int i = 0; i < input.size(); i++) {
+      Assert.assertEquals(i * i, (int) output.get(i));
+      ParallelLoopResult<Integer> innerLoopResult = innerLoopResults[i];
+      Assert.assertNotNull(innerLoopResult);
+      Assert.assertTrue(innerLoopResult.getIsCompleted());
+      List<Integer> innerOutput = innerLoopResult.getResult();
+      Assert.assertNotNull(innerOutput);
+      Assert.assertEquals(input.size(), innerOutput.size());
+
+      for(int j = 0; j < input.size(); j++) {
+        Assert.assertEquals(i*j, (int) innerOutput.get(j));
+      }
+    }
+  }
+
+  /**
+   * Tests {@link org.apache.ambari.server.utils.Parallel} forLoop iteration failures
+   * @throws Exception
+   */
+  @Test
+  public void testParallelForLoopIterationFailures() throws Exception {
+    final List<Integer> input = new LinkedList<Integer>();
+    for(int i = 0; i < 10; i++) {
+      input.add(i);
+    }
+    final List<Integer> failForList = Arrays.asList(new Integer[] { 2, 5, 7});
+    ParallelLoopResult<Integer> loopResult = Parallel.forLoop(input, new LoopBody<Integer,
Integer>() {
+      @Override
+      public Integer run(Integer in1) {
+        if(failForList.contains(in1)) {
+          // Return null
+          return null;
+        }
+        return in1 * in1;
+      }
+    });
+    Assert.assertFalse(loopResult.getIsCompleted());
+    Assert.assertNotNull(loopResult.getResult());
+    List<Integer> output = loopResult.getResult();
+    Assert.assertEquals(input.size(), output.size());
+
+    for(int i = 0; i < input.size(); i++) {
+      if(failForList.contains(i)) {
+        Assert.assertNull(output.get(i));
+        output.set(i, i * i);
+      } else {
+        Assert.assertEquals(i * i, (int) output.get(i));
+      }
+    }
+  }
+}


Mime
View raw message