tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject git commit: TEZ-1542. Fix a Local Mode crash on concurrentModificationException. Contributed by Chen He.
Date Fri, 24 Oct 2014 17:47:08 GMT
Repository: tez
Updated Branches:
  refs/heads/master b8b317237 -> 9e1eb5277


TEZ-1542. Fix a Local Mode crash on concurrentModificationException.
Contributed by Chen He.


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/9e1eb527
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/9e1eb527
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/9e1eb527

Branch: refs/heads/master
Commit: 9e1eb5277dccfc16bebe07bab86351ff258bdd45
Parents: b8b3172
Author: Siddharth Seth <sseth@apache.org>
Authored: Fri Oct 24 10:46:54 2014 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Fri Oct 24 10:46:54 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../tez/common/EnvironmentUpdateUtils.java      |  4 +-
 .../tez/common/TestEnvironmentUpdateUtils.java  | 64 +++++++++++++++++++-
 3 files changed, 66 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/9e1eb527/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ae6bc31..f6cbef5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -63,6 +63,7 @@ ALL CHANGES:
   TEZ-1141. DAGStatus.Progress should include number of failed and killed attempts.
   TEZ-1424. Fixes to DAG text representation in debug mode.
   TEZ-1590. Fetchers should not report failures after the Processor on the task completes.
+  TEZ-1542. Fix a Local Mode crash on concurrentModificationException.
 
 Release 0.5.1: 2014-10-02
 

http://git-wip-us.apache.org/repos/asf/tez/blob/9e1eb527/tez-common/src/main/java/org/apache/tez/common/EnvironmentUpdateUtils.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/EnvironmentUpdateUtils.java b/tez-common/src/main/java/org/apache/tez/common/EnvironmentUpdateUtils.java
index cd519d9..0e597b3 100644
--- a/tez-common/src/main/java/org/apache/tez/common/EnvironmentUpdateUtils.java
+++ b/tez-common/src/main/java/org/apache/tez/common/EnvironmentUpdateUtils.java
@@ -38,7 +38,7 @@ public class EnvironmentUpdateUtils {
    * @param key System environment variable
    * @param value Value to assign to system environment variable
    */
-  public static void put(String key, String value){
+  public synchronized static void put(String key, String value){
     Map<String, String> environment = new HashMap<String, String>(System.getenv());
     environment.put(key, value);
     if (!Shell.WINDOWS) {
@@ -57,7 +57,7 @@ public class EnvironmentUpdateUtils {
    * environment variable and the value is the value to assign the system
    * environment variable
    */
-  public static void putAll(Map<String, String> additionalEnvironment) {
+  public synchronized static void putAll(Map<String, String> additionalEnvironment)
{
     Map<String, String> environment = new HashMap<String, String>(System.getenv());
     environment.putAll(additionalEnvironment);
     if (!Shell.WINDOWS) {

http://git-wip-us.apache.org/repos/asf/tez/blob/9e1eb527/tez-common/src/test/java/org/apache/tez/common/TestEnvironmentUpdateUtils.java
----------------------------------------------------------------------
diff --git a/tez-common/src/test/java/org/apache/tez/common/TestEnvironmentUpdateUtils.java
b/tez-common/src/test/java/org/apache/tez/common/TestEnvironmentUpdateUtils.java
index d8c6980..be241d0 100644
--- a/tez-common/src/test/java/org/apache/tez/common/TestEnvironmentUpdateUtils.java
+++ b/tez-common/src/test/java/org/apache/tez/common/TestEnvironmentUpdateUtils.java
@@ -20,8 +20,20 @@ package org.apache.tez.common;
 
 import static org.junit.Assert.assertEquals;
 
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.junit.Test;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
 public class TestEnvironmentUpdateUtils {
 
   @Test
@@ -31,4 +43,54 @@ public class TestEnvironmentUpdateUtils {
     assertEquals("Environment was not set propertly", "test.value1", System.getenv("test.environment1"));
     assertEquals("Environment was not set propertly", "test.value2", System.getenv("test.environment2"));
   }
-}
+
+  @Test
+  public void testConcurrentRequests() throws InterruptedException {
+    int timeoutSecond = 5;
+    int concurThread = 10;
+    int exceptionCount = 0;
+    List<Callable<Object>> tasks = new ArrayList<Callable<Object>>();
+    List<ListenableFuture<Object>> pendingTasks = new ArrayList<ListenableFuture<Object>>();
+    final ExecutorService callbackExecutor = Executors.newFixedThreadPool(concurThread,
+        new ThreadFactoryBuilder().setDaemon(false).setNameFormat("CallbackExecutor").build());
+    ListeningExecutorService taskExecutorService =
+        MoreExecutors.listeningDecorator(callbackExecutor);
+    while(concurThread > 0){
+      ListenableFuture<Object> runningTaskFuture =
+          taskExecutorService.submit(new EnvironmentRequest());
+      pendingTasks.add(runningTaskFuture);
+      concurThread--;
+    }
+
+    //waiting for all threads submitted to thread pool
+    for (ListenableFuture<Object> future : pendingTasks) {
+     try {
+        future.get();
+      } catch (ExecutionException e) {
+        exceptionCount++;
+      }
+    }
+
+    //stop accepting new threads and shutdown threadpool
+    taskExecutorService.shutdown();
+    try {
+      if(!taskExecutorService.awaitTermination(timeoutSecond, TimeUnit.SECONDS)) {
+        taskExecutorService.shutdownNow();
+      }
+    } catch (InterruptedException ie) {
+      taskExecutorService.shutdownNow();
+    }
+
+    assertEquals(0, exceptionCount);
+  }
+
+  private class EnvironmentRequest implements Callable<Object> {
+
+    @Override
+    public Object call() throws Exception {
+      EnvironmentUpdateUtils.put("test.environment.concurrent"
+          +Thread.currentThread().getId(), "test.evironment.concurrent");
+      return null;
+    }
+  }
+ }


Mime
View raw message