tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject git commit: TEZ-362. Object cache for users to re-use objects across tasks within the same container/jvm. (hitesh)
Date Tue, 20 Aug 2013 22:28:09 GMT
Updated Branches:
  refs/heads/master 34de31a89 -> a89057105


TEZ-362. Object cache for users to re-use objects across tasks within the same container/jvm.
(hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/a8905710
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/a8905710
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/a8905710

Branch: refs/heads/master
Commit: a890571059052913d346d2ea919c37974b34ed20
Parents: 34de31a
Author: Hitesh Shah <hitesh@apache.org>
Authored: Tue Aug 20 15:27:46 2013 -0700
Committer: Hitesh Shah <hitesh@apache.org>
Committed: Tue Aug 20 15:27:46 2013 -0700

----------------------------------------------------------------------
 pom.xml                                         |  5 ++
 tez-dag/pom.xml                                 |  4 ++
 .../apache/hadoop/mapred/YarnTezDagChild.java   | 25 ++++++++
 tez-engine-api/pom.xml                          |  4 ++
 .../common/objectregistry/ObjectLifeCycle.java  | 37 +++++++++++
 .../common/objectregistry/ObjectRegistry.java   | 56 +++++++++++++++++
 .../objectregistry/ObjectRegistryFactory.java   | 32 ++++++++++
 tez-engine/pom.xml                              |  4 ++
 .../objectregistry/ObjectRegistryImpl.java      | 65 ++++++++++++++++++++
 .../objectregistry/ObjectRegistryModule.java    | 43 +++++++++++++
 .../objectregistry/TestObjectRegistry.java      | 56 +++++++++++++++++
 .../tez/mapreduce/examples/MRRSleepJob.java     | 16 +++++
 12 files changed, 347 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a8905710/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 5fde7ac..47951ce 100644
--- a/pom.xml
+++ b/pom.xml
@@ -235,6 +235,11 @@
        <artifactId>protobuf-java</artifactId>
        <version>${protobuf.version}</version>
       </dependency>
+      <dependency>
+        <groupId>com.google.inject</groupId>
+        <artifactId>guice</artifactId>
+        <version>3.0</version>
+      </dependency>
     </dependencies>
   </dependencyManagement>
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a8905710/tez-dag/pom.xml
----------------------------------------------------------------------
diff --git a/tez-dag/pom.xml b/tez-dag/pom.xml
index 34194db..875a196 100644
--- a/tez-dag/pom.xml
+++ b/tez-dag/pom.xml
@@ -97,6 +97,10 @@
         </exclusion>
       </exclusions>
     </dependency>
+    <dependency>
+      <groupId>com.google.inject</groupId>
+      <artifactId>guice</artifactId>
+    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a8905710/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
index 859c43b..9f59219 100644
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
+++ b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
@@ -58,7 +58,11 @@ import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.counters.Limits;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.engine.api.Task;
+import org.apache.tez.engine.common.objectregistry.ObjectLifeCycle;
+import org.apache.tez.engine.common.objectregistry.ObjectRegistryImpl;
+import org.apache.tez.engine.common.objectregistry.ObjectRegistryModule;
 import org.apache.tez.engine.common.security.JobTokenIdentifier;
 import org.apache.tez.engine.common.security.TokenCache;
 import org.apache.tez.engine.runtime.RuntimeUtils;
@@ -66,6 +70,9 @@ import org.apache.tez.engine.task.RuntimeTask;
 import org.apache.tez.mapreduce.input.SimpleInput;
 import org.apache.tez.mapreduce.output.SimpleOutput;
 
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
 
 /**
  * The main() for TEZ Task processes.
@@ -103,6 +110,11 @@ public class YarnTezDagChild {
     // FIXME fix initialize metrics in child runner
     DefaultMetricsSystem.initialize("VertexTask");
 
+    ObjectRegistryImpl objectRegistry = new ObjectRegistryImpl();
+    @SuppressWarnings("unused")
+    Injector injector = Guice.createInjector(
+        new ObjectRegistryModule(objectRegistry));
+
     // Security framework already loaded the tokens into current ugi
     Credentials credentials =
         UserGroupInformation.getCurrentUser().getCredentials();
@@ -144,6 +156,7 @@ public class YarnTezDagChild {
         TezConfiguration.TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX,
         TezConfiguration.TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX_DEFAULT);
     int taskCount = 0;
+    TezVertexID currentVertexId = null;
     try {
       while (true) {
         // poll for new task
@@ -173,6 +186,18 @@ public class YarnTezDagChild {
               + taskContext.toString());
         }
         taskAttemptId = taskContext.getTaskAttemptId();
+        TezVertexID newVertexId = taskAttemptId.getTaskID().getVertexID();
+
+        if (currentVertexId != null) {
+          if (!currentVertexId.equals(newVertexId)) {
+            objectRegistry.clearCache(ObjectLifeCycle.VERTEX);
+          }
+          if (!currentVertexId.getDAGId().equals(newVertexId.getDAGId())) {
+            objectRegistry.clearCache(ObjectLifeCycle.DAG);
+          }
+        }
+        currentVertexId = newVertexId;
+
         updateLoggers(taskAttemptId);
 
         final Task t = createAndConfigureTezTask(taskContext, umbilical,

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a8905710/tez-engine-api/pom.xml
----------------------------------------------------------------------
diff --git a/tez-engine-api/pom.xml b/tez-engine-api/pom.xml
index 2cc7276..107bdc9 100644
--- a/tez-engine-api/pom.xml
+++ b/tez-engine-api/pom.xml
@@ -40,6 +40,10 @@
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-yarn-common</artifactId>
     </dependency>
+    <dependency>
+      <groupId>com.google.inject</groupId>
+      <artifactId>guice</artifactId>
+    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a8905710/tez-engine-api/src/main/java/org/apache/tez/engine/common/objectregistry/ObjectLifeCycle.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/common/objectregistry/ObjectLifeCycle.java
b/tez-engine-api/src/main/java/org/apache/tez/engine/common/objectregistry/ObjectLifeCycle.java
new file mode 100644
index 0000000..7099299
--- /dev/null
+++ b/tez-engine-api/src/main/java/org/apache/tez/engine/common/objectregistry/ObjectLifeCycle.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.common.objectregistry;
+
+/**
+ * Defines the valid lifecycle and scope of Objects stored in ObjectRegistry.
+ * Objects are guaranteed to not be valid outside of their defined life-cycle
+ * period. Objects are not guaranteed to be retained through the defined period
+ * as they may be evicted for various reasons.
+ */
+public enum ObjectLifeCycle {
+  /** Objects are valid for the lifetime of the Tez JVM/Session
+   */
+  SESSION,
+  /** Objects are valid for the lifetime of the DAG.
+   */
+  DAG,
+  /** Objects are valid for the lifetime of the Vertex.
+   */
+  VERTEX,
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a8905710/tez-engine-api/src/main/java/org/apache/tez/engine/common/objectregistry/ObjectRegistry.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/common/objectregistry/ObjectRegistry.java
b/tez-engine-api/src/main/java/org/apache/tez/engine/common/objectregistry/ObjectRegistry.java
new file mode 100644
index 0000000..a27903d
--- /dev/null
+++ b/tez-engine-api/src/main/java/org/apache/tez/engine/common/objectregistry/ObjectRegistry.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.common.objectregistry;
+
+/**
+ * Preliminary version of a simple shared object cache to re-use
+ * objects across multiple tasks within the same container/JVM.
+ */
+public interface ObjectRegistry {
+
+  /**
+   * Insert or update object into the registry. This will remove an object
+   * associated with the same key with a different life-cycle as there is only
+   * one instance of an Object stored for a given key irrespective of the
+   * life-cycle attached to the Object.
+   * @param lifeCycle What life-cycle is the Object valid for
+   * @param key Key to identify the Object
+   * @param value Object to be inserted
+   * @return Previous Object associated with the key attached if present
+   * else null. Could return the same object if the object was associated with
+   * the same key for a different life-cycle.
+   */
+  public Object add(ObjectLifeCycle lifeCycle, String key, Object value);
+
+  /**
+   * Return the object associated with the provided key
+   * @param key Key to find object
+   * @return Object if found else null
+   */
+  public Object get(String key);
+
+  /**
+   * Delete the object associated with the provided key
+   * @param lifeCycle What life-cycle is the Object valid for
+   * @param key Key to find object
+   * @return True if an object was found and removed
+   */
+  public boolean delete(String key);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a8905710/tez-engine-api/src/main/java/org/apache/tez/engine/common/objectregistry/ObjectRegistryFactory.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/common/objectregistry/ObjectRegistryFactory.java
b/tez-engine-api/src/main/java/org/apache/tez/engine/common/objectregistry/ObjectRegistryFactory.java
new file mode 100644
index 0000000..94352b3
--- /dev/null
+++ b/tez-engine-api/src/main/java/org/apache/tez/engine/common/objectregistry/ObjectRegistryFactory.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.common.objectregistry;
+
+import com.google.inject.Inject;
+
+public class ObjectRegistryFactory {
+
+  @Inject
+  private static ObjectRegistry objectRegistry;
+
+  public static ObjectRegistry getObjectRegistry() {
+    return objectRegistry;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a8905710/tez-engine/pom.xml
----------------------------------------------------------------------
diff --git a/tez-engine/pom.xml b/tez-engine/pom.xml
index 4ff5839..4ecf210 100644
--- a/tez-engine/pom.xml
+++ b/tez-engine/pom.xml
@@ -45,6 +45,10 @@
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-yarn-common</artifactId>
     </dependency>
+    <dependency>
+      <groupId>com.google.inject</groupId>
+      <artifactId>guice</artifactId>
+    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a8905710/tez-engine/src/main/java/org/apache/tez/engine/common/objectregistry/ObjectRegistryImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/objectregistry/ObjectRegistryImpl.java
b/tez-engine/src/main/java/org/apache/tez/engine/common/objectregistry/ObjectRegistryImpl.java
new file mode 100644
index 0000000..351e01c
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/objectregistry/ObjectRegistryImpl.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.common.objectregistry;
+
+import java.util.AbstractMap;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import com.google.inject.Singleton;
+
+@Singleton
+public class ObjectRegistryImpl implements ObjectRegistry {
+
+  private Map<String, Map.Entry<Object, ObjectLifeCycle>> objectCache =
+      new HashMap<String, Map.Entry<Object, ObjectLifeCycle>>();
+
+  @Override
+  public synchronized Object add(ObjectLifeCycle lifeCycle,
+      String key, Object value) {
+    Map.Entry<Object, ObjectLifeCycle> oldEntry =
+        objectCache.put(key,
+            new AbstractMap.SimpleImmutableEntry<Object, ObjectLifeCycle>(
+                value, lifeCycle));
+    return oldEntry != null ? oldEntry.getKey() : null;
+  }
+
+  @Override
+  public synchronized Object get(String key) {
+    Map.Entry<Object, ObjectLifeCycle> entry =
+        objectCache.get(key);
+    return entry != null ? entry.getKey() : null;
+  }
+
+  @Override
+  public synchronized boolean delete(String key) {
+    return (null != objectCache.remove(key));
+  }
+
+  public synchronized void clearCache(ObjectLifeCycle lifeCycle) {
+    for (Entry<String, Entry<Object, ObjectLifeCycle>> entry :
+      objectCache.entrySet()) {
+      if (entry.getValue().getValue().equals(lifeCycle)) {
+        objectCache.remove(entry.getKey());
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a8905710/tez-engine/src/main/java/org/apache/tez/engine/common/objectregistry/ObjectRegistryModule.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/objectregistry/ObjectRegistryModule.java
b/tez-engine/src/main/java/org/apache/tez/engine/common/objectregistry/ObjectRegistryModule.java
new file mode 100644
index 0000000..ab346fd
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/objectregistry/ObjectRegistryModule.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.common.objectregistry;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.inject.AbstractModule;
+
+public class ObjectRegistryModule extends AbstractModule {
+
+  private final ObjectRegistry objectRegistry;
+
+  public ObjectRegistryModule(ObjectRegistry objectRegistry) {
+    this.objectRegistry = objectRegistry;
+  }
+
+  @VisibleForTesting
+  public ObjectRegistryModule() {
+    objectRegistry = new ObjectRegistryImpl();
+  }
+
+  @Override
+  protected void configure() {
+    bind(ObjectRegistry.class).toInstance(this.objectRegistry);
+    requestStaticInjection(ObjectRegistryFactory.class);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a8905710/tez-engine/src/test/java/org/apache/tez/engine/common/objectregistry/TestObjectRegistry.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/test/java/org/apache/tez/engine/common/objectregistry/TestObjectRegistry.java
b/tez-engine/src/test/java/org/apache/tez/engine/common/objectregistry/TestObjectRegistry.java
new file mode 100644
index 0000000..7276782
--- /dev/null
+++ b/tez-engine/src/test/java/org/apache/tez/engine/common/objectregistry/TestObjectRegistry.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.common.objectregistry;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+public class TestObjectRegistry {
+
+  @SuppressWarnings("unused")
+  @Before
+  public void setup() {
+    Injector injector = Guice.createInjector(new ObjectRegistryModule());
+  }
+
+  @Test
+  public void testBasicCRUD() {
+    ObjectRegistry objectRegistry =
+        ObjectRegistryFactory.getObjectRegistry();
+    Assert.assertNotNull(objectRegistry);
+
+    Assert.assertNull(objectRegistry.get("foo"));
+    Assert.assertFalse(objectRegistry.delete("foo"));
+    Integer one = new Integer(1);
+    Integer two_1 = new Integer(2);
+    Integer two_2 = new Integer(3);
+    Assert.assertNull(objectRegistry.add(ObjectLifeCycle.DAG, "one", one));
+    Assert.assertEquals(one, objectRegistry.get("one"));
+    Assert.assertNull(objectRegistry.add(ObjectLifeCycle.DAG, "two", two_1));
+    Assert.assertNotNull(objectRegistry.add(ObjectLifeCycle.SESSION, "two", two_2));
+    Assert.assertNotEquals(two_1, objectRegistry.get("two"));
+    Assert.assertEquals(two_2, objectRegistry.get("two"));
+    Assert.assertTrue(objectRegistry.delete("one"));
+    Assert.assertFalse(objectRegistry.delete("one"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a8905710/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
index 525dcc9..df489ae 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
@@ -74,6 +74,9 @@ import org.apache.tez.dag.api.EdgeProperty.ConnectionPattern;
 import org.apache.tez.dag.api.EdgeProperty.SourceType;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.engine.common.objectregistry.ObjectLifeCycle;
+import org.apache.tez.engine.common.objectregistry.ObjectRegistry;
+import org.apache.tez.engine.common.objectregistry.ObjectRegistryFactory;
 import org.apache.tez.engine.lib.input.ShuffledMergedInput;
 import org.apache.tez.engine.lib.output.OnFileSortedOutput;
 import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
@@ -220,6 +223,19 @@ public class MRRSleepJob extends Configured implements Tool {
           org.apache.tez.mapreduce.hadoop.MRJobConfig.VERTEX_NAME);
 
       TaskAttemptID taId = context.getTaskAttemptID();
+
+      ObjectRegistry objectRegistry = ObjectRegistryFactory.getObjectRegistry();
+      String fooBarVal = (String) objectRegistry.get("FooBar");
+      if (null == fooBarVal) {
+        LOG.info("Adding FooBar key to Object cache");
+        objectRegistry.add(ObjectLifeCycle.DAG,
+            "FooBar", "BarFooFromTask" + taId.getTaskID().toString());
+      } else {
+        LOG.info("Got FooBar val from Object cache"
+            + ", currentTaskId=" + taId.getTaskID().toString()
+            + ", val=" + fooBarVal);
+      }
+
       String[] taskIds = conf.getStrings(MAP_ERROR_TASK_IDS);
       if (taId.getId()+1 >= context.getMaxMapAttempts()) {
         finalAttempt = true;


Mime
View raw message