tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject tez git commit: TEZ-2636. MRInput and MultiMRInput should work for cases when there are 0 physical inputs. (sseth)
Date Thu, 30 Jul 2015 01:52:45 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.7 b093151f9 -> 0dd8af995


TEZ-2636. MRInput and MultiMRInput should work for cases when there are
0 physical inputs. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/0dd8af99
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/0dd8af99
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/0dd8af99

Branch: refs/heads/branch-0.7
Commit: 0dd8af9959f97de38ac764ae639077fd4d7bbc30
Parents: b093151
Author: Siddharth Seth <sseth@apache.org>
Authored: Wed Jul 29 18:52:29 2015 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Wed Jul 29 18:52:29 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  8 +++
 .../org/apache/tez/mapreduce/input/MRInput.java | 26 +++++++-
 .../tez/mapreduce/input/MultiMRInput.java       |  9 ++-
 .../apache/tez/mapreduce/input/TestMRInput.java | 69 ++++++++++++++++++++
 .../tez/mapreduce/input/TestMultiMRInput.java   | 34 ++++++++++
 5 files changed, 144 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/0dd8af99/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6a8723f..9fe03fa 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -416,6 +416,14 @@ TEZ-UI CHANGES (TEZ-8):
   TEZ-1783. Wrapper in standalone mode.
   TEZ-1820. Fix wrong links.
 
+Release 0.5.5: Unreleased
+
+INCOMPATIBLE CHANGES
+
+ALL CHANGES:
+  TEZ-2636. MRInput and MultiMRInput should work for cases when there are 0 physical inputs.
+  TEZ-2600. When used with HDFS federation(viewfs) ,tez will throw a error
+
 Release 0.5.4: 2015-06-26
 
 ALL CHANGES:

http://git-wip-us.apache.org/repos/asf/tez/blob/0dd8af99/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index 270f68f..70365cd 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -445,7 +445,9 @@ public class MRInput extends MRInputBase {
 
   @Override
   public void start() {
-    Preconditions.checkState(getNumPhysicalInputs() == 1, "Expecting only 1 physical input
for MRInput");
+    Preconditions.checkState(getNumPhysicalInputs() == 0 || getNumPhysicalInputs() == 1,
+        "Expecting 0 or 1 physical input for MRInput");
+    LOG.info("MRInput setup to received {} events", getNumPhysicalInputs());
   }
 
   @Private
@@ -502,6 +504,24 @@ public class MRInput extends MRInputBase {
         .checkState(readerCreated == false,
             "Only a single instance of record reader can be created for this input.");
     readerCreated = true;
+    if (getNumPhysicalInputs() == 0) {
+      return new KeyValueReader() {
+        @Override
+        public boolean next() throws IOException {
+          return false;
+        }
+
+        @Override
+        public Object getCurrentKey() throws IOException {
+          return null;
+        }
+
+        @Override
+        public Object getCurrentValue() throws IOException {
+          return null;
+        }
+      };
+    }
     rrLock.lock();
     try {
       if (!mrReader.isSetup())
@@ -515,6 +535,10 @@ public class MRInput extends MRInputBase {
 
   @Override
   public void handleEvents(List<Event> inputEvents) throws Exception {
+    if (getNumPhysicalInputs() == 0) {
+      throw new IllegalStateException(
+          "Unexpected event. MRInput has been setup to receive 0 events");
+    }
     if (eventReceived || inputEvents.size() != 1) {
       throw new IllegalStateException(
           "MRInput expects only a single input. Received: current eventListSize: "

http://git-wip-us.apache.org/repos/asf/tez/blob/0dd8af99/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java
b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java
index 425d737..44d9c96 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java
@@ -110,6 +110,9 @@ public class MultiMRInput extends MRInputBase {
     super.initialize();
     LOG.info("Using New mapreduce API: " + useNewApi + ", numPhysicalInputs: "
         + getNumPhysicalInputs());
+    if (getNumPhysicalInputs() == 0) {
+      getContext().inputIsReady();
+    }
     return null;
   }
 
@@ -140,6 +143,10 @@ public class MultiMRInput extends MRInputBase {
   public void handleEvents(List<Event> inputEvents) throws Exception {
     lock.lock();
     try {
+      if (getNumPhysicalInputs() == 0) {
+        throw new IllegalStateException(
+            "Unexpected event. MultiMRInput has been setup to receive 0 events");
+      }
       Preconditions.checkState(eventCount.get() + inputEvents.size() <= getNumPhysicalInputs(),
           "Unexpected event. All physical sources already initialized");
       for (Event event : inputEvents) {
@@ -197,6 +204,6 @@ public class MultiMRInput extends MRInputBase {
 
   @Override
   public void start() throws Exception {
-    Preconditions.checkState(getNumPhysicalInputs() >= 1, "Expecting one or more physical
inputs");
+    Preconditions.checkState(getNumPhysicalInputs() >= 0, "Expecting zero or more physical
inputs");
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/0dd8af99/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java
new file mode 100644
index 0000000..61b6f81
--- /dev/null
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.input;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.DataSourceDescriptor;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.InputContext;
+import org.junit.Test;
+
+public class TestMRInput {
+
+  @Test(timeout = 5000)
+  public void test0PhysicalInputs() throws IOException {
+    InputContext inputContext = mock(InputContext.class);
+
+    DataSourceDescriptor dsd = MRInput.createConfigBuilder(new Configuration(false),
+        FileInputFormat.class, "testPath").build();
+
+    ApplicationId applicationId = ApplicationId.newInstance(1000, 1);
+    doReturn(dsd.getInputDescriptor().getUserPayload()).when(inputContext).getUserPayload();
+    doReturn(applicationId).when(inputContext).getApplicationId();
+    doReturn(1).when(inputContext).getTaskIndex();
+    doReturn(1).when(inputContext).getTaskAttemptNumber();
+    doReturn(new TezCounters()).when(inputContext).getCounters();
+
+
+    MRInput mrInput = new MRInput(inputContext, 0);
+
+    mrInput.initialize();
+
+    mrInput.start();
+
+    assertFalse(mrInput.getReader().next());
+
+    List<Event> events = new LinkedList<Event>();
+    try {
+      mrInput.handleEvents(events);
+      fail("HandleEvents should cause an input with 0 physical inputs to fail");
+    } catch (Exception e) {
+      assertTrue(e instanceof IllegalStateException);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/0dd8af99/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
index 4031140..f390d8a 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
@@ -28,6 +28,7 @@ import static org.mockito.Mockito.mock;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Random;
 
@@ -85,6 +86,39 @@ public class TestMultiMRInput {
   }
 
   @Test(timeout = 5000)
+  public void test0PhysicalInputs() throws Exception {
+
+    Path workDir = new Path(TEST_ROOT_DIR, "testSingleSplit");
+    JobConf jobConf = new JobConf(defaultConf);
+    jobConf.setInputFormat(org.apache.hadoop.mapred.SequenceFileInputFormat.class);
+    FileInputFormat.setInputPaths(jobConf, workDir);
+
+    MRInputUserPayloadProto.Builder builder = MRInputUserPayloadProto.newBuilder();
+    builder.setGroupingEnabled(false);
+    builder.setConfigurationBytes(TezUtils.createByteStringFromConf(jobConf));
+    byte[] payload = builder.build().toByteArray();
+
+    InputContext inputContext = createTezInputContext(payload);
+
+
+    MultiMRInput mMrInput = new MultiMRInput(inputContext, 0);
+
+    mMrInput.initialize();
+
+    mMrInput.start();
+
+    assertEquals(0, mMrInput.getKeyValueReaders().size());
+
+    List<Event> events = new LinkedList<Event>();
+    try {
+      mMrInput.handleEvents(events);
+      fail("HandleEvents should cause an input with 0 physical inputs to fail");
+    } catch (Exception e) {
+      assertTrue(e instanceof IllegalStateException);
+    }
+  }
+
+  @Test(timeout = 5000)
   public void testSingleSplit() throws Exception {
 
     Path workDir = new Path(TEST_ROOT_DIR, "testSingleSplit");


Mime
View raw message