flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [2/3] flink git commit: [FLINK-2445] Improve HadoopOutputFormatTests
Date Wed, 23 Mar 2016 16:37:59 GMT
[FLINK-2445] Improve HadoopOutputFormatTests


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/da989c42
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/da989c42
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/da989c42

Branch: refs/heads/release-1.0
Commit: da989c423306212646f456d410562d9ec3779b24
Parents: edf343a
Author: Martin Liesenberg <martin.liesenberg@gmail.com>
Authored: Thu Feb 11 15:59:01 2016 +0100
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Wed Mar 23 17:37:15 2016 +0100

----------------------------------------------------------------------
 .../hadoop/mapred/HadoopOutputFormatTest.java   | 229 ++++++++++++++++
 .../mapreduce/HadoopOutputFormatTest.java       | 265 ++++++++++++-------
 2 files changed, 395 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/da989c42/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatTest.java
b/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatTest.java
new file mode 100644
index 0000000..409feb4
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatTest.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.hadoop.mapred;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.OutputCommitter;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.util.Progressable;
+import org.junit.Test;
+import org.mockito.Matchers;
+
+import java.io.IOException;
+
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class HadoopOutputFormatTest {
+
+	@Test
+	public void testOpen() throws Exception {
+
+		OutputFormat<String, Long> dummyOutputFormat = mock(DummyOutputFormat.class);
+		DummyOutputCommitter outputCommitter = mock(DummyOutputCommitter.class);
+		JobConf jobConf = spy(new JobConf());
+		when(jobConf.getOutputCommitter()).thenReturn(outputCommitter);
+
+		HadoopOutputFormat<String, Long> outputFormat = new HadoopOutputFormat<>(dummyOutputFormat,
jobConf);
+
+		outputFormat.open(1, 1);
+
+		verify(jobConf, times(2)).getOutputCommitter();
+		verify(outputCommitter, times(1)).setupJob(any(JobContext.class));
+		verify(dummyOutputFormat, times(1)).getRecordWriter(any(FileSystem.class), any(JobConf.class),
anyString(), any(Progressable.class));
+	}
+
+	@Test
+	public void testConfigureWithConfigurable() {
+		ConfigurableDummyOutputFormat dummyOutputFormat = mock(ConfigurableDummyOutputFormat.class);
+		JobConf jobConf = mock(JobConf.class);
+
+		HadoopOutputFormat<String, Long> outputFormat = new HadoopOutputFormat<>(dummyOutputFormat,
jobConf);
+
+		outputFormat.configure(Matchers.<org.apache.flink.configuration.Configuration>any());
+
+		verify(dummyOutputFormat, times(1)).setConf(any(Configuration.class));
+	}
+
+	@Test
+	public void testConfigureWithJobConfigurable() {
+		JobConfigurableDummyOutputFormat dummyOutputFormat = mock(JobConfigurableDummyOutputFormat.class);
+		JobConf jobConf = mock(JobConf.class);
+
+		HadoopOutputFormat<String, Long> outputFormat = new HadoopOutputFormat<>(dummyOutputFormat,
jobConf);
+
+		outputFormat.configure(Matchers.<org.apache.flink.configuration.Configuration>any());
+
+		verify(dummyOutputFormat, times(1)).configure(any(JobConf.class));
+	}
+
+	@Test
+	public void testCloseWithTaskCommit() throws Exception {
+		OutputFormat<String, Long> dummyOutputFormat = mock(DummyOutputFormat.class);
+		DummyOutputCommitter outputCommitter = mock(DummyOutputCommitter.class);
+		when(outputCommitter.needsTaskCommit(any(TaskAttemptContext.class))).thenReturn(true);
+		DummyRecordWriter recordWriter = mock(DummyRecordWriter.class);
+		JobConf jobConf = mock(JobConf.class);
+
+		HadoopOutputFormat<String, Long> outputFormat = new HadoopOutputFormat<>(dummyOutputFormat,
jobConf);
+		outputFormat.recordWriter = recordWriter;
+		outputFormat.outputCommitter = outputCommitter;
+
+		outputFormat.close();
+
+		verify(recordWriter, times(1)).close(any(Reporter.class));
+		verify(outputCommitter, times(1)).commitTask(any(TaskAttemptContext.class));
+	}
+
+	@Test
+	public void testCloseWithoutTaskCommit() throws Exception {
+		OutputFormat<String, Long> dummyOutputFormat = mock(DummyOutputFormat.class);
+		DummyOutputCommitter outputCommitter = mock(DummyOutputCommitter.class);
+		when(outputCommitter.needsTaskCommit(any(TaskAttemptContext.class))).thenReturn(false);
+		DummyRecordWriter recordWriter = mock(DummyRecordWriter.class);
+		JobConf jobConf = mock(JobConf.class);
+
+		HadoopOutputFormat<String, Long> outputFormat = new HadoopOutputFormat<>(dummyOutputFormat,
jobConf);
+		outputFormat.recordWriter = recordWriter;
+		outputFormat.outputCommitter = outputCommitter;
+
+		outputFormat.close();
+
+		verify(recordWriter, times(1)).close(any(Reporter.class));
+		verify(outputCommitter, times(0)).commitTask(any(TaskAttemptContext.class));
+	}
+
+	@Test
+	public void testWriteRecord() throws Exception {
+		OutputFormat<String, Long> dummyOutputFormat = mock(DummyOutputFormat.class);
+		DummyRecordWriter recordWriter = mock(DummyRecordWriter.class);
+		JobConf jobConf = mock(JobConf.class);
+
+		HadoopOutputFormat<String, Long> outputFormat = new HadoopOutputFormat<>(dummyOutputFormat,
jobConf);
+		outputFormat.recordWriter = recordWriter;
+
+		outputFormat.writeRecord(new Tuple2<>("key", 1L));
+
+		verify(recordWriter, times(1)).write(anyString(), anyLong());
+	}
+
+	@Test
+	public void testFinalizeGlobal() throws Exception {
+		OutputFormat<String, Long> dummyOutputFormat = mock(DummyOutputFormat.class);
+		DummyOutputCommitter outputCommitter = mock(DummyOutputCommitter.class);
+		JobConf jobConf = spy(new JobConf());
+		when(jobConf.getOutputCommitter()).thenReturn(outputCommitter);
+
+		HadoopOutputFormat<String, Long> outputFormat = new HadoopOutputFormat<>(dummyOutputFormat,
jobConf);
+
+		outputFormat.finalizeGlobal(1);
+
+		verify(outputCommitter, times(1)).commitJob(any(JobContext.class));
+	}
+
+	public class DummyOutputFormat implements OutputFormat<String, Long> {
+
+		@Override
+		public RecordWriter<String, Long> getRecordWriter(FileSystem fileSystem, JobConf
jobConf, String s, Progressable progressable) throws IOException {
+			return null;
+		}
+
+		@Override
+		public void checkOutputSpecs(FileSystem fileSystem, JobConf jobConf) throws IOException
{
+
+		}
+	}
+
+	public class ConfigurableDummyOutputFormat extends DummyOutputFormat implements Configurable
{
+
+		@Override
+		public void setConf(Configuration configuration) {
+
+		}
+
+		@Override
+		public Configuration getConf() {
+			return null;
+		}
+	}
+
+	public class JobConfigurableDummyOutputFormat extends DummyOutputFormat implements JobConfigurable
{
+
+		@Override
+		public void configure(JobConf jobConf) {
+
+		}
+	}
+
+	public class DummyOutputCommitter extends OutputCommitter {
+
+		@Override
+		public void setupJob(JobContext jobContext) throws IOException {
+
+		}
+
+		@Override
+		public void setupTask(TaskAttemptContext taskAttemptContext) throws IOException {
+
+		}
+
+		@Override
+		public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) throws IOException
{
+			return false;
+		}
+
+		@Override
+		public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException {
+
+		}
+
+		@Override
+		public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException {
+
+		}
+	}
+
+	public class DummyRecordWriter implements RecordWriter<String, Long> {
+
+		@Override
+		public void write(String s, Long aLong) throws IOException {
+
+		}
+
+		@Override
+		public void close(Reporter reporter) throws IOException {
+
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/da989c42/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatTest.java
b/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatTest.java
index 215ca95..7b126fc 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatTest.java
@@ -19,112 +19,179 @@
 package org.apache.flink.api.java.hadoop.mapreduce;
 
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.junit.Test;
 import org.mockito.Mockito;
 
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 
 public class HadoopOutputFormatTest {
 
-    private static final String PATH = "an/ignored/file/";
-    private Map<String, Long> map;
-
-    @Test
-    public void testWriteRecord() {
-        OutputFormat<String, Long> dummyOutputFormat = new DummyOutputFormat();
-        String key = "Test";
-        Long value = 1L;
-        map = new HashMap<>();
-        map.put(key, 0L);
-        try {
-            Job job = Job.getInstance();
-            Tuple2<String, Long> tuple = new Tuple2<>();
-            tuple.setFields(key, value);
-            HadoopOutputFormat<String, Long> hadoopOutputFormat = new HadoopOutputFormat<>(dummyOutputFormat,
job);
-
-            hadoopOutputFormat.recordWriter = new DummyRecordWriter();
-            hadoopOutputFormat.writeRecord(tuple);
-
-            Long expected = map.get(key);
-            assertEquals(expected, value);
-        } catch (IOException e) {
-            fail();
-        }
-    }
-
-    @Test
-    public void testOpen() {
-        OutputFormat<String, Long> dummyOutputFormat = new DummyOutputFormat();
-        try {
-            Job job = Job.getInstance();
-            HadoopOutputFormat<String, Long> hadoopOutputFormat = new HadoopOutputFormat<>(dummyOutputFormat,
job);
-
-            hadoopOutputFormat.recordWriter = new DummyRecordWriter();
-            hadoopOutputFormat.open(1, 4);
-        } catch (IOException e) {
-            fail();
-        }
-    }
-
-    @Test
-    public void testClose() {
-        OutputFormat<String, Long> dummyOutputFormat = new DummyOutputFormat();
-        try {
-            Job job = Job.getInstance();
-            HadoopOutputFormat<String, Long> hadoopOutputFormat = new HadoopOutputFormat<>(dummyOutputFormat,
job);
-
-            hadoopOutputFormat.recordWriter = new DummyRecordWriter();
-
-            final OutputCommitter outputCommitter = Mockito.mock(OutputCommitter.class);
-            Mockito.when(outputCommitter.needsTaskCommit(Mockito.any(TaskAttemptContext.class))).thenReturn(true);
-            Mockito.doNothing().when(outputCommitter).commitTask(Mockito.any(TaskAttemptContext.class));
-            hadoopOutputFormat.outputCommitter = outputCommitter;
-            hadoopOutputFormat.configuration = new Configuration();
-            hadoopOutputFormat.configuration.set("mapred.output.dir", PATH);
-
-            hadoopOutputFormat.close();
-        } catch (IOException e) {
-            fail();
-        }
-    }
-
-
-    class DummyRecordWriter extends RecordWriter<String, Long> {
-        @Override
-        public void write(String key, Long value) throws IOException, InterruptedException
{
-            map.put(key, value);
-        }
-
-        @Override
-        public void close(TaskAttemptContext context) throws IOException, InterruptedException
{
-
-        }
-    }
-
-    class DummyOutputFormat extends OutputFormat<String, Long> {
-        @Override
-        public RecordWriter<String, Long> getRecordWriter(TaskAttemptContext context)
throws IOException, InterruptedException {
-            return null;
-        }
-
-        @Override
-        public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException
{
-
-        }
-
-        @Override
-        public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException,
InterruptedException {
-            final OutputCommitter outputCommitter = Mockito.mock(OutputCommitter.class);
-            Mockito.doNothing().when(outputCommitter).setupJob(Mockito.any(JobContext.class));
-
-            return outputCommitter;
-        }
-    }
+	private static final String MAPRED_OUTPUT_PATH = "an/ignored/file/";
+	private static final String MAPRED_OUTPUT_DIR_KEY = "mapred.output.dir";
+
+	@Test
+	public void testWriteRecord() throws Exception {
+
+		RecordWriter<String, Long> recordWriter = mock(DummyRecordWriter.class);
+		HadoopOutputFormat<String, Long> hadoopOutputFormat = setupHadoopOutputFormat(new
DummyOutputFormat(),
+			Job.getInstance(), recordWriter, null, new Configuration());
+
+		hadoopOutputFormat.writeRecord(new Tuple2<String, Long>());
+
+		verify(recordWriter, times(1)).write(anyString(), anyLong());
+	}
+
+	@Test
+	public void testOpen() throws Exception {
+
+
+		OutputFormat<String, Long> dummyOutputFormat = mock(DummyOutputFormat.class);
+		OutputCommitter outputCommitter = setupOutputCommitter(true);
+		when(dummyOutputFormat.getOutputCommitter(any(TaskAttemptContext.class))).thenReturn(outputCommitter);
+
+		HadoopOutputFormat<String, Long> hadoopOutputFormat = setupHadoopOutputFormat(dummyOutputFormat,
+			Job.getInstance(), new DummyRecordWriter(), setupOutputCommitter(true), new Configuration());
+
+		hadoopOutputFormat.open(1, 4);
+
+		verify(hadoopOutputFormat.outputCommitter, times(1)).setupJob(any(JobContext.class));
+		verify(hadoopOutputFormat.mapreduceOutputFormat, times(1)).getRecordWriter(any(TaskAttemptContext.class));
+	}
+
+	@Test
+	public void testCloseWithNeedsTaskCommitTrue() throws Exception {
+
+		RecordWriter<String, Long> recordWriter = Mockito.mock(DummyRecordWriter.class);
+		OutputCommitter outputCommitter = setupOutputCommitter(true);
+
+		HadoopOutputFormat<String, Long> hadoopOutputFormat = setupHadoopOutputFormat(new
DummyOutputFormat(),
+			Job.getInstance(), recordWriter, outputCommitter, new Configuration());
+
+		hadoopOutputFormat.close();
+
+		verify(outputCommitter, times(1)).commitTask(any(TaskAttemptContext.class));
+		verify(recordWriter, times(1)).close(any(TaskAttemptContext.class));
+	}
+
+	@Test
+	public void testCloseWithNeedsTaskCommitFalse() throws Exception {
+
+		RecordWriter<String, Long> recordWriter = Mockito.mock(DummyRecordWriter.class);
+		OutputCommitter outputCommitter = setupOutputCommitter(false);
+
+		HadoopOutputFormat<String, Long> hadoopOutputFormat = setupHadoopOutputFormat(new
DummyOutputFormat(),
+			Job.getInstance(), recordWriter, outputCommitter, new Configuration());
+
+		hadoopOutputFormat.close();
+
+		verify(outputCommitter, times(0)).commitTask(any(TaskAttemptContext.class));
+		verify(recordWriter, times(1)).close(any(TaskAttemptContext.class));
+	}
+
+	@Test
+	public void testConfigure() throws Exception {
+
+		ConfigurableDummyOutputFormat outputFormat = mock(ConfigurableDummyOutputFormat.class);
+
+		HadoopOutputFormat<String, Long> hadoopOutputFormat = setupHadoopOutputFormat(outputFormat,
Job.getInstance(),
+			null, null, new Configuration());
+
+		hadoopOutputFormat.configure(new org.apache.flink.configuration.Configuration());
+
+		verify(outputFormat, times(1)).setConf(any(Configuration.class));
+	}
+
+	@Test
+	public void testFinalizedGlobal() throws Exception {
+
+		HadoopOutputFormat<String, Long> hadoopOutputFormat = setupHadoopOutputFormat(new
DummyOutputFormat(),
+			Job.getInstance(), null, null, new Configuration());
+
+		hadoopOutputFormat.finalizeGlobal(1);
+
+		verify(hadoopOutputFormat.outputCommitter, times(1)).commitJob(any(JobContext.class));
+	}
+
+	private OutputCommitter setupOutputCommitter(boolean needsTaskCommit) throws IOException
{
+		OutputCommitter outputCommitter = Mockito.mock(OutputCommitter.class);
+		when(outputCommitter.needsTaskCommit(any(TaskAttemptContext.class))).thenReturn(needsTaskCommit);
+		doNothing().when(outputCommitter).commitTask(any(TaskAttemptContext.class));
+
+		return outputCommitter;
+	}
+
+	private HadoopOutputFormat<String, Long> setupHadoopOutputFormat(OutputFormat<String,
Long> outputFormat,
+																	 Job job,
+																	 RecordWriter<String, Long> recordWriter,
+																	 OutputCommitter outputCommitter,
+																	 Configuration configuration) {
+
+		HadoopOutputFormat<String, Long> hadoopOutputFormat = new HadoopOutputFormat<>(outputFormat,
job);
+		hadoopOutputFormat.recordWriter = recordWriter;
+		hadoopOutputFormat.outputCommitter = outputCommitter;
+		hadoopOutputFormat.configuration = configuration;
+		hadoopOutputFormat.configuration.set(MAPRED_OUTPUT_DIR_KEY, MAPRED_OUTPUT_PATH);
+
+		return hadoopOutputFormat;
+	}
+
+	class DummyRecordWriter extends RecordWriter<String, Long> {
+		@Override
+		public void write(String key, Long value) throws IOException, InterruptedException {
+		}
+
+		@Override
+		public void close(TaskAttemptContext context) throws IOException, InterruptedException
{
+
+		}
+	}
+
+	class DummyOutputFormat extends OutputFormat<String, Long> {
+		@Override
+		public RecordWriter<String, Long> getRecordWriter(TaskAttemptContext context) throws
IOException, InterruptedException {
+			return null;
+		}
+
+		@Override
+		public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException
{
+
+		}
+
+		@Override
+		public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException,
InterruptedException {
+			final OutputCommitter outputCommitter = Mockito.mock(OutputCommitter.class);
+			doNothing().when(outputCommitter).setupJob(any(JobContext.class));
+
+			return outputCommitter;
+		}
+	}
+
+	class ConfigurableDummyOutputFormat extends DummyOutputFormat implements Configurable {
+
+		@Override
+		public void setConf(Configuration configuration) {}
+
+		@Override
+		public Configuration getConf() {
+			return null;
+		}
+	}
 }


Mime
View raw message