hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r1471561 - in /hadoop/common/branches/branch-1-win: ./ src/mapred/org/apache/hadoop/mapred/ src/mapred/org/apache/hadoop/mapreduce/ src/test/org/apache/hadoop/mapreduce/
Date Wed, 24 Apr 2013 17:46:42 GMT
Author: acmurthy
Date: Wed Apr 24 17:45:51 2013
New Revision: 1471561

URL: http://svn.apache.org/r1471561
Log:
MAPREDUCE-4737. Ensure that mapreduce APIs are semantically consistent with mapred API w.r.t
Mapper.cleanup and Reducer.cleanup; in the sense that cleanup is now called even if there
is an error. The old mapred API already ensures that Mapper.close and Reducer.close are invoked
during error handling. Note that it is an incompatible change, however end-users can override
Mapper.run and Reducer.run to get the old (inconsistent) behaviour. Contributed by Arun C.
Murthy.

Added:
    hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/TestMapperReducerCleanup.java
Modified:
    hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt
    hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/MapTask.java
    hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
    hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/Mapper.java
    hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/Reducer.java

Modified: hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt?rev=1471561&r1=1471560&r2=1471561&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt (original)
+++ hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt Wed Apr 24 17:45:51 2013
@@ -381,3 +381,11 @@ Branch-hadoop-1-win (branched from branc
     MAPREDUCE-5066. Added a timeout for the job.end.notification.url.
     (Ivan Mitic via acmurthy)
 
+    MAPREDUCE-4737. Ensure that mapreduce APIs are semantically consistent
+    with mapred API w.r.t Mapper.cleanup and Reducer.cleanup; in the sense
+    that cleanup is now called even if there is an error. The old mapred API
+    already ensures that Mapper.close and Reducer.close are invoked during
+    error handling. Note that it is an incompatible change, however end-users
+    can override Mapper.run and Reducer.run to get the old (inconsistent)
+    behaviour. (acmurthy)
+

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=1471561&r1=1471560&r2=1471561&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/MapTask.java Wed
Apr 24 17:45:51 2013
@@ -48,12 +48,11 @@ import org.apache.hadoop.fs.FileSystem.S
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.io.serializer.Deserializer;
@@ -62,13 +61,8 @@ import org.apache.hadoop.io.serializer.S
 import org.apache.hadoop.mapred.IFile.Writer;
 import org.apache.hadoop.mapred.Merger.Segment;
 import org.apache.hadoop.mapred.SortedRanges.SkipRangeIterator;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapreduce.split.JobSplit;
-import org.apache.hadoop.mapreduce.split.JobSplit.SplitMetaInfo;
-import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
-import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
 import org.apache.hadoop.util.IndexedSortable;
 import org.apache.hadoop.util.IndexedSorter;
 import org.apache.hadoop.util.Progress;
@@ -435,10 +429,14 @@ class MapTask extends Task {
     try {
       runner.run(in, new OldOutputCollector(collector, conf), reporter);
       collector.flush();
-    } finally {
-      //close
-      in.close();                               // close input
+      
+      in.close();
+      in = null;
       collector.close();
+      collector = null;
+    } finally {
+      closeQuietly(in);
+      closeQuietly(collector);
     }
   }
 
@@ -763,7 +761,9 @@ class MapTask extends Task {
       input.initialize(split, mapperContext);
       mapper.run(mapperContext);
       input.close();
+      input = null;
       output.close(mapperContext);
+      output = null;
     } catch (NoSuchMethodException e) {
       throw new IOException("Can't find Context constructor", e);
     } catch (InstantiationException e) {
@@ -772,6 +772,9 @@ class MapTask extends Task {
       throw new IOException("Can't invoke Context constructor", e);
     } catch (IllegalAccessException e) {
       throw new IOException("Can't invoke Context constructor", e);
+    } finally {
+      closeQuietly(input);
+      closeQuietly(output, mapperContext);
     }
   }
 
@@ -1738,5 +1741,59 @@ class MapTask extends Task {
       super(s);
     }
   }
+  
+  private <INKEY,INVALUE,OUTKEY,OUTVALUE>
+  void closeQuietly(RecordReader<INKEY, INVALUE> c) {
+    if (c != null) {
+      try {
+        c.close();
+      } catch (IOException ie) {
+        // Ignore
+        LOG.info("Ignoring exception during close for " + c, ie);
+      }
+    }
+  }
+
+  private <OUTKEY, OUTVALUE>
+  void closeQuietly(MapOutputCollector<OUTKEY, OUTVALUE> c) {
+    if (c != null) {
+      try {
+        c.close();
+      } catch (Exception ie) {
+        // Ignore
+        LOG.info("Ignoring exception during close for " + c, ie);
+      }
+    }
+  }
+  
+  private <INKEY, INVALUE, OUTKEY, OUTVALUE>
+  void closeQuietly(
+      org.apache.hadoop.mapreduce.RecordReader<INKEY, INVALUE> c) {
+    if (c != null) {
+      try {
+        c.close();
+      } catch (Exception ie) {
+        // Ignore
+        LOG.info("Ignoring exception during close for " + c, ie);
+      }
+    }
+  }
+
+
+
+  private <INKEY, INVALUE, OUTKEY, OUTVALUE>
+  void closeQuietly(
+      org.apache.hadoop.mapreduce.RecordWriter<OUTKEY, OUTVALUE> c,
+      org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context 
+          mapperContext) {
+    if (c != null) {
+      try {
+        c.close(mapperContext);
+      } catch (Exception ie) {
+        // Ignore
+        LOG.info("Ignoring exception during close for " + c, ie);
+      }
+    }
+  }
 
 }

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=1471561&r1=1471560&r2=1471561&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
(original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
Wed Apr 24 17:45:51 2013
@@ -488,14 +488,16 @@ class ReduceTask extends Task {
     // make output collector
     String finalName = getOutputName(getPartition());
 
-    final RecordWriter<OUTKEY, OUTVALUE> out = new OldTrackingRecordWriter<OUTKEY,
OUTVALUE>(
-        reduceOutputCounter, job, reporter, finalName);
+    RecordWriter<OUTKEY, OUTVALUE> out = 
+        new OldTrackingRecordWriter<OUTKEY, OUTVALUE>(
+            reduceOutputCounter, job, reporter, finalName);
+    final RecordWriter<OUTKEY, OUTVALUE> finalOut = out;
     
     OutputCollector<OUTKEY,OUTVALUE> collector = 
       new OutputCollector<OUTKEY,OUTVALUE>() {
         public void collect(OUTKEY key, OUTVALUE value)
           throws IOException {
-          out.write(key, value);
+          finalOut.write(key, value);
           // indicate that progress update needs to be sent
           reporter.progress();
         }
@@ -528,18 +530,14 @@ class ReduceTask extends Task {
 
       //Clean up: repeated in catch block below
       reducer.close();
+      reducer = null;
+      
       out.close(reporter);
+      out = null;
       //End of clean up.
-    } catch (IOException ioe) {
-      try {
-        reducer.close();
-      } catch (IOException ignored) {}
-        
-      try {
-        out.close(reporter);
-      } catch (IOException ignored) {}
-      
-      throw ioe;
+    } finally {
+      IOUtils.cleanup(LOG, reducer);
+      closeQuietly(out, reporter);
     }
   }
 
@@ -647,8 +645,11 @@ class ReduceTask extends Task {
                                                trackedRW, committer,
                                                reporter, comparator, keyClass,
                                                valueClass);
-    reducer.run(reducerContext);
-    trackedRW.close(reducerContext);
+    try {
+      reducer.run(reducerContext);
+    } finally {
+      trackedRW.close(reducerContext);
+    }
   }
 
   private static enum CopyOutputErrorType {
@@ -2959,4 +2960,15 @@ class ReduceTask extends Task {
     return Integer.numberOfTrailingZeros(hob) +
       (((hob >>> 1) & value) == 0 ? 0 : 1);
   }
+  
+  private <OUTKEY, OUTVALUE>
+  void closeQuietly(RecordWriter<OUTKEY, OUTVALUE> c, Reporter r) {
+    if (c != null) {
+      try {
+        c.close(r);
+      } catch (Exception e) {
+        LOG.info("Exception in closing " + c, e);
+      }
+    }
+  }
 }

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/Mapper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/Mapper.java?rev=1471561&r1=1471560&r2=1471561&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/Mapper.java
(original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/Mapper.java
Wed Apr 24 17:45:51 2013
@@ -140,9 +140,12 @@ public class Mapper<KEYIN, VALUEIN, KEYO
    */
   public void run(Context context) throws IOException, InterruptedException {
     setup(context);
-    while (context.nextKeyValue()) {
-      map(context.getCurrentKey(), context.getCurrentValue(), context);
+    try {
+      while (context.nextKeyValue()) {
+        map(context.getCurrentKey(), context.getCurrentValue(), context);
+      }
+    } finally {
+      cleanup(context);
     }
-    cleanup(context);
   }
 }
\ No newline at end of file

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/Reducer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/Reducer.java?rev=1471561&r1=1471560&r2=1471561&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/Reducer.java
(original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/Reducer.java
Wed Apr 24 17:45:51 2013
@@ -172,9 +172,12 @@ public class Reducer<KEYIN,VALUEIN,KEYOU
    */
   public void run(Context context) throws IOException, InterruptedException {
     setup(context);
-    while (context.nextKey()) {
-      reduce(context.getCurrentKey(), context.getValues(), context);
+    try {
+      while (context.nextKey()) {
+        reduce(context.getCurrentKey(), context.getValues(), context);
+      }
+    } finally {
+      cleanup(context);
     }
-    cleanup(context);
   }
 }

Added: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/TestMapperReducerCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/TestMapperReducerCleanup.java?rev=1471561&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/TestMapperReducerCleanup.java
(added)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/TestMapperReducerCleanup.java
Wed Apr 24 17:45:51 2013
@@ -0,0 +1,320 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.io.BufferedWriter;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.examples.WordCount.IntSumReducer;
+import org.apache.hadoop.examples.WordCount.TokenizerMapper;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestMapperReducerCleanup {
+
+  static boolean mapCleanup = false;
+  static boolean reduceCleanup = false;
+  static boolean recordReaderCleanup = false;
+  static boolean recordWriterCleanup = false;
+  
+  static void reset() {
+    mapCleanup = false;
+    reduceCleanup = false; 
+    recordReaderCleanup = false;
+    recordWriterCleanup = false;
+  }
+  
+  private static class FailingMapper
+      extends Mapper<LongWritable, Text, LongWritable, Text> {
+
+    /** Map method with different behavior based on the thread id */
+    public void map(LongWritable key, Text val, Context c)
+        throws IOException, InterruptedException {
+      throw new IOException("TestMapperReducerCleanup");
+    }
+
+    protected void cleanup(Context context) 
+        throws IOException, InterruptedException {
+      mapCleanup = true;
+      super.cleanup(context);
+    }
+  }
+
+  private static class TrackingTokenizerMapper extends TokenizerMapper {
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @Override
+    protected void cleanup(org.apache.hadoop.mapreduce.Mapper.Context context)
+        throws IOException, InterruptedException {
+      mapCleanup = true;
+      super.cleanup(context);
+    }
+    
+  }
+
+  private static class FailingReducer
+      extends Reducer<LongWritable, Text, LongWritable, LongWritable> {
+
+    public void reduce(LongWritable key, Iterable<Text> vals, Context context)
+        throws IOException, InterruptedException {
+      throw new IOException("TestMapperReducerCleanup");
+    }
+
+    protected void cleanup(Context context) 
+        throws IOException, InterruptedException {
+      reduceCleanup = true;
+      super.cleanup(context);
+    }
+  }
+
+  private static class TrackingIntSumReducer extends IntSumReducer {
+
+    protected void cleanup(Context context) 
+        throws IOException, InterruptedException {
+      reduceCleanup = true;
+      super.cleanup(context);
+    }
+}
+
+  public static class TrackingTextInputFormat extends TextInputFormat {
+
+    public static class TrackingRecordReader extends LineRecordReader {
+      @Override
+      public synchronized void close() throws IOException {
+        recordReaderCleanup = true;
+        super.close();
+      }
+    }
+
+    @Override
+    public RecordReader<LongWritable, Text> createRecordReader(
+        InputSplit split, TaskAttemptContext context) {
+      return new TrackingRecordReader();
+    }
+  }
+
+  @SuppressWarnings("rawtypes")
+  public static class TrackingTextOutputFormat extends TextOutputFormat {
+    
+    public static class TrackingRecordWriter extends LineRecordWriter {
+
+      public TrackingRecordWriter(DataOutputStream out) {
+        super(out);
+      }
+
+      @Override
+      public synchronized void close(TaskAttemptContext context)
+          throws IOException {
+        recordWriterCleanup = true;
+        super.close(context);
+      }
+
+    }
+    
+    @Override
+    public RecordWriter getRecordWriter(TaskAttemptContext job)
+        throws IOException, InterruptedException {
+      Configuration conf = job.getConfiguration();
+
+      Path file = getDefaultWorkFile(job, "");
+      FileSystem fs = file.getFileSystem(conf);
+      FSDataOutputStream fileOut = fs.create(file, false);
+      
+      return new TrackingRecordWriter(fileOut);
+    }
+  
+  }
+
+
+  /**
+   * Create a single input file in the input directory.
+   * @param dirPath the directory in which the file resides
+   * @param id the file id number
+   * @param numRecords how many records to write to each file.
+   */
+  private void createInputFile(Path dirPath, int id, int numRecords)
+      throws IOException {
+    final String MESSAGE = "This is a line in a file: ";
+
+    Path filePath = new Path(dirPath, "" + id);
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getLocal(conf);
+
+    OutputStream os = fs.create(filePath);
+    BufferedWriter w = new BufferedWriter(new OutputStreamWriter(os));
+
+    for (int i = 0; i < numRecords; i++) {
+      w.write(MESSAGE + id + " " + i + "\n");
+    }
+
+    w.close();
+  }
+
+  private final String INPUT_DIR = "input";
+  private final String OUTPUT_DIR = "output";
+
+  private Path getInputPath() {
+    String dataDir = System.getProperty("test.build.data");
+    if (null == dataDir) {
+      return new Path(INPUT_DIR);
+    } else {
+      return new Path(new Path(dataDir), INPUT_DIR);
+    }
+  }
+
+  private Path getOutputPath() {
+    String dataDir = System.getProperty("test.build.data");
+    if (null == dataDir) {
+      return new Path(OUTPUT_DIR);
+    } else {
+      return new Path(new Path(dataDir), OUTPUT_DIR);
+    }
+  }
+
+  private Path createInput() throws IOException {
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getLocal(conf);
+    Path inputPath = getInputPath();
+
+    // Clear the input directory if it exists, first.
+    if (fs.exists(inputPath)) {
+      fs.delete(inputPath, true);
+    }
+
+    // Create an input file
+    createInputFile(inputPath, 0, 10);
+
+    return inputPath;
+  }
+
+  @Test
+  public void testMapCleanup() throws Exception {
+    reset();
+    
+    Job job = new Job();
+
+    Path inputPath = createInput();
+    Path outputPath = getOutputPath();
+
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getLocal(conf);
+
+    if (fs.exists(outputPath)) {
+      fs.delete(outputPath, true);
+    }
+
+    job.setMapperClass(FailingMapper.class);
+    job.setInputFormatClass(TrackingTextInputFormat.class);
+    job.setOutputFormatClass(TrackingTextOutputFormat.class);
+    job.setNumReduceTasks(0);
+    FileInputFormat.addInputPath(job, inputPath);
+    FileOutputFormat.setOutputPath(job, outputPath);
+
+    job.waitForCompletion(true);
+
+    Assert.assertTrue(mapCleanup);
+    Assert.assertTrue(recordReaderCleanup);
+    Assert.assertTrue(recordWriterCleanup);
+  }
+
+  @Test
+  public void testReduceCleanup() throws Exception {
+    reset();
+    
+    Job job = new Job();
+
+    Path inputPath = createInput();
+    Path outputPath = getOutputPath();
+
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getLocal(conf);
+
+    if (fs.exists(outputPath)) {
+      fs.delete(outputPath, true);
+    }
+
+    job.setMapperClass(TrackingTokenizerMapper.class);
+    job.setReducerClass(FailingReducer.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(IntWritable.class);
+    job.setInputFormatClass(TrackingTextInputFormat.class);
+    job.setOutputFormatClass(TrackingTextOutputFormat.class);
+    job.setNumReduceTasks(1);
+    FileInputFormat.addInputPath(job, inputPath);
+    FileOutputFormat.setOutputPath(job, outputPath);
+
+    job.waitForCompletion(true);
+
+    Assert.assertTrue(mapCleanup);
+    Assert.assertTrue(reduceCleanup);
+    Assert.assertTrue(recordReaderCleanup);
+    Assert.assertTrue(recordWriterCleanup);
+  }
+  
+  @Test
+  public void testJobSuccessCleanup() throws Exception {
+    reset();
+    
+    Job job = new Job();
+
+    Path inputPath = createInput();
+    Path outputPath = getOutputPath();
+
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getLocal(conf);
+
+    if (fs.exists(outputPath)) {
+      fs.delete(outputPath, true);
+    }
+
+    job.setMapperClass(TrackingTokenizerMapper.class);
+    job.setReducerClass(TrackingIntSumReducer.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(IntWritable.class);
+    job.setInputFormatClass(TrackingTextInputFormat.class);
+    job.setOutputFormatClass(TrackingTextOutputFormat.class);
+    job.setNumReduceTasks(1);
+    FileInputFormat.addInputPath(job, inputPath);
+    FileOutputFormat.setOutputPath(job, outputPath);
+
+    job.waitForCompletion(true);
+
+    Assert.assertTrue(mapCleanup);
+    Assert.assertTrue(reduceCleanup);
+    Assert.assertTrue(recordReaderCleanup);
+    Assert.assertTrue(recordWriterCleanup);
+  }
+
+}



Mime
View raw message