hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sun...@apache.org
Subject [17/49] hadoop git commit: HADOOP-13786 Add S3A committer for zero-rename commits to S3 endpoints. Contributed by Steve Loughran and Ryan Blue.
Date Fri, 24 Nov 2017 10:43:28 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java
index 230dbad..02236eb 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java
@@ -26,6 +26,7 @@ import com.amazonaws.event.ProgressEvent;
 import com.amazonaws.event.ProgressEventType;
 import com.amazonaws.event.ProgressListener;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
 import org.junit.FixMethodOrder;
 import org.junit.Test;
 import org.junit.runners.MethodSorters;
@@ -70,16 +71,26 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase
{
 
   private int uploadBlockSize = DEFAULT_UPLOAD_BLOCKSIZE;
   private int partitionSize;
+  private long filesize;
 
   @Override
   public void setup() throws Exception {
     super.setup();
-    final Path testPath = getTestPath();
-    scaleTestDir = new Path(testPath, "scale");
+    scaleTestDir = new Path(getTestPath(), getTestSuiteName());
     hugefile = new Path(scaleTestDir, "hugefile");
     hugefileRenamed = new Path(scaleTestDir, "hugefileRenamed");
+    filesize = getTestPropertyBytes(getConf(), KEY_HUGE_FILESIZE,
+        DEFAULT_HUGE_FILESIZE);
   }
 
+  /**
+   * Get the name of this test suite, which is used in path generation.
+   * Base implementation uses {@link #getBlockOutputBufferName()} for this.
+   * @return the name of the suite.
+   */
+  public String getTestSuiteName() {
+    return getBlockOutputBufferName();
+  }
 
   /**
    * Note that this can get called before test setup.
@@ -88,7 +99,7 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase
{
   @Override
   protected Configuration createScaleConfiguration() {
     Configuration conf = super.createScaleConfiguration();
-    partitionSize = (int)getTestPropertyBytes(conf,
+    partitionSize = (int) getTestPropertyBytes(conf,
         KEY_HUGE_PARTITION_SIZE,
         DEFAULT_PARTITION_SIZE);
     assertTrue("Partition size too small: " + partitionSize,
@@ -99,6 +110,7 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase
{
     conf.setInt(MULTIPART_SIZE, partitionSize);
     conf.set(USER_AGENT_PREFIX, "STestS3AHugeFileCreate");
     conf.set(FAST_UPLOAD_BUFFER, getBlockOutputBufferName());
+    S3ATestUtils.disableFilesystemCaching(conf);
     return conf;
   }
 
@@ -111,17 +123,16 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase
{
   @Test
   public void test_010_CreateHugeFile() throws IOException {
     assertFalse("Please run this test sequentially to avoid timeouts" +
-            " and bandwidth problems", isParallelExecution());
-    long filesize = getTestPropertyBytes(getConf(), KEY_HUGE_FILESIZE,
-        DEFAULT_HUGE_FILESIZE);
+        " and bandwidth problems", isParallelExecution());
     long filesizeMB = filesize / _1MB;
 
     // clean up from any previous attempts
     deleteHugeFile();
 
+    Path fileToCreate = getPathOfFileToCreate();
     describe("Creating file %s of size %d MB" +
             " with partition size %d buffered by %s",
-        hugefile, filesizeMB, partitionSize, getBlockOutputBufferName());
+        fileToCreate, filesizeMB, partitionSize, getBlockOutputBufferName());
 
     // now do a check of available upload time, with a pessimistic bandwidth
     // (that of remote upload tests). If the test times out then not only is
@@ -134,7 +145,7 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase
{
     assertTrue(String.format("Timeout set in %s seconds is too low;" +
             " estimating upload time of %d seconds at 1 MB/s." +
             " Rerun tests with -D%s=%d",
-            timeout, uploadTime, KEY_TEST_TIMEOUT, uploadTime * 2),
+        timeout, uploadTime, KEY_TEST_TIMEOUT, uploadTime * 2),
         uploadTime < timeout);
     assertEquals("File size set in " + KEY_HUGE_FILESIZE + " = " + filesize
             + " is not a multiple of " + uploadBlockSize,
@@ -162,7 +173,7 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase
{
     S3AInstrumentation.OutputStreamStatistics streamStatistics;
     long blocksPer10MB = blocksPerMB * 10;
     ProgressCallback progress = new ProgressCallback(timer);
-    try (FSDataOutputStream out = fs.create(hugefile,
+    try (FSDataOutputStream out = fs.create(fileToCreate,
         true,
         uploadBlockSize,
         progress)) {
@@ -219,14 +230,8 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase
{
         toHuman(timer.nanosPerOperation(putRequestCount)));
     assertEquals("active put requests in \n" + fs,
         0, gaugeValue(putRequestsActive));
-    ContractTestUtils.assertPathExists(fs, "Huge file", hugefile);
-    FileStatus status = fs.getFileStatus(hugefile);
-    ContractTestUtils.assertIsFile(hugefile, status);
-    assertEquals("File size in " + status, filesize, status.getLen());
-    if (progress != null) {
-      progress.verifyNoFailures("Put file " + hugefile
-          + " of size " + filesize);
-    }
+    progress.verifyNoFailures(
+        "Put file " + fileToCreate + " of size " + filesize);
     if (streamStatistics != null) {
       assertEquals("actively allocated blocks in " + streamStatistics,
           0, streamStatistics.blocksActivelyAllocated());
@@ -234,6 +239,39 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase
{
   }
 
   /**
+   * Get the path of the file which is to created. This is normally
+   * {@link #hugefile}
+   * @return the path to use when creating the file.
+   */
+  protected Path getPathOfFileToCreate() {
+    return this.hugefile;
+  }
+
+  protected Path getScaleTestDir() {
+    return scaleTestDir;
+  }
+
+  protected Path getHugefile() {
+    return hugefile;
+  }
+
+  public void setHugefile(Path hugefile) {
+    this.hugefile = hugefile;
+  }
+
+  protected Path getHugefileRenamed() {
+    return hugefileRenamed;
+  }
+
+  protected int getUploadBlockSize() {
+    return uploadBlockSize;
+  }
+
+  protected int getPartitionSize() {
+    return partitionSize;
+  }
+
+  /**
    * Progress callback from AWS. Likely to come in on a different thread.
    */
   private final class ProgressCallback implements Progressable,
@@ -294,22 +332,54 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase
{
     }
 
     private void verifyNoFailures(String operation) {
-      assertEquals("Failures in " + operation +": " + this, 0, failures.get());
+      assertEquals("Failures in " + operation + ": " + this, 0, failures.get());
     }
   }
 
+  /**
+   * Assume that the huge file exists; skip the test if it does not.
+   * @throws IOException IO failure
+   */
   void assumeHugeFileExists() throws IOException {
+    assumeFileExists(this.hugefile);
+  }
+
+  /**
+   * Assume a specific file exists.
+   * @param file file to look for
+   * @throws IOException IO problem
+   */
+  private void assumeFileExists(Path file) throws IOException {
     S3AFileSystem fs = getFileSystem();
-    ContractTestUtils.assertPathExists(fs, "huge file not created", hugefile);
-    FileStatus status = fs.getFileStatus(hugefile);
-    ContractTestUtils.assertIsFile(hugefile, status);
-    assertTrue("File " + hugefile + " is empty", status.getLen() > 0);
+    ContractTestUtils.assertPathExists(fs, "huge file not created",
+        file);
+    FileStatus status = fs.getFileStatus(file);
+    ContractTestUtils.assertIsFile(file, status);
+    assertTrue("File " + file + " is empty", status.getLen() > 0);
   }
 
   private void logFSState() {
     LOG.info("File System state after operation:\n{}", getFileSystem());
   }
 
+  /**
+   * This is the set of actions to perform when verifying the file actually
+   * was created. With the s3guard committer, the file doesn't come into
+   * existence; a different set of assertions must be checked.
+   */
+  @Test
+  public void test_030_postCreationAssertions() throws Throwable {
+    S3AFileSystem fs = getFileSystem();
+    ContractTestUtils.assertPathExists(fs, "Huge file", hugefile);
+    FileStatus status = fs.getFileStatus(hugefile);
+    ContractTestUtils.assertIsFile(hugefile, status);
+    assertEquals("File size in " + status, filesize, status.getLen());
+  }
+
+  /**
+   * Read in the file using Positioned read(offset) calls.
+   * @throws Throwable failure
+   */
   @Test
   public void test_040_PositionedReadHugeFile() throws Throwable {
     assumeHugeFileExists();
@@ -323,11 +393,11 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase
{
     describe("Positioned reads of %s %s", filetype, hugefile);
     S3AFileSystem fs = getFileSystem();
     FileStatus status = fs.getFileStatus(hugefile);
-    long filesize = status.getLen();
+    long size = status.getLen();
     int ops = 0;
     final int bufferSize = 8192;
     byte[] buffer = new byte[bufferSize];
-    long eof = filesize - 1;
+    long eof = size - 1;
 
     ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
     ContractTestUtils.NanoTimer readAtByte0, readAtByte0Again, readAtEOF;
@@ -348,23 +418,27 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase
{
       ops++;
       LOG.info("Final stream state: {}", in);
     }
-    long mb = Math.max(filesize / _1MB, 1);
+    long mb = Math.max(size / _1MB, 1);
 
     logFSState();
-    timer.end("time to performed positioned reads of %s of %d MB ",
+    timer.end("time to perform positioned reads of %s of %d MB ",
         filetype, mb);
     LOG.info("Time per positioned read = {} nS",
         toHuman(timer.nanosPerOperation(ops)));
   }
 
+  /**
+   * Read in the entire file using read() calls.
+   * @throws Throwable failure
+   */
   @Test
   public void test_050_readHugeFile() throws Throwable {
     assumeHugeFileExists();
     describe("Reading %s", hugefile);
     S3AFileSystem fs = getFileSystem();
     FileStatus status = fs.getFileStatus(hugefile);
-    long filesize = status.getLen();
-    long blocks = filesize / uploadBlockSize;
+    long size = status.getLen();
+    long blocks = size / uploadBlockSize;
     byte[] data = new byte[uploadBlockSize];
 
     ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
@@ -375,11 +449,11 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase
{
       LOG.info("Final stream state: {}", in);
     }
 
-    long mb = Math.max(filesize / _1MB, 1);
+    long mb = Math.max(size / _1MB, 1);
     timer.end("time to read file of %d MB ", mb);
     LOG.info("Time per MB to read = {} nS",
         toHuman(timer.nanosPerOperation(mb)));
-    bandwidth(timer, filesize);
+    bandwidth(timer, size);
     logFSState();
   }
 
@@ -389,18 +463,18 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase
{
     describe("renaming %s to %s", hugefile, hugefileRenamed);
     S3AFileSystem fs = getFileSystem();
     FileStatus status = fs.getFileStatus(hugefile);
-    long filesize = status.getLen();
+    long size = status.getLen();
     fs.delete(hugefileRenamed, false);
     ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
     fs.rename(hugefile, hugefileRenamed);
-    long mb = Math.max(filesize / _1MB, 1);
+    long mb = Math.max(size / _1MB, 1);
     timer.end("time to rename file of %d MB", mb);
     LOG.info("Time per MB to rename = {} nS",
         toHuman(timer.nanosPerOperation(mb)));
-    bandwidth(timer, filesize);
+    bandwidth(timer, size);
     logFSState();
     FileStatus destFileStatus = fs.getFileStatus(hugefileRenamed);
-    assertEquals(filesize, destFileStatus.getLen());
+    assertEquals(size, destFileStatus.getLen());
 
     // rename back
     ContractTestUtils.NanoTimer timer2 = new ContractTestUtils.NanoTimer();
@@ -408,24 +482,49 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase
{
     timer2.end("Renaming back");
     LOG.info("Time per MB to rename = {} nS",
         toHuman(timer2.nanosPerOperation(mb)));
-    bandwidth(timer2, filesize);
+    bandwidth(timer2, size);
   }
 
+  /**
+   * Cleanup: delete the files.
+   */
   @Test
-  public void test_999_DeleteHugeFiles() throws IOException {
-    deleteHugeFile();
-    ContractTestUtils.NanoTimer timer2 = new ContractTestUtils.NanoTimer();
-    S3AFileSystem fs = getFileSystem();
-    fs.delete(hugefileRenamed, false);
-    timer2.end("time to delete %s", hugefileRenamed);
-    ContractTestUtils.rm(fs, getTestPath(), true, true);
+  public void test_800_DeleteHugeFiles() throws IOException {
+    try {
+      deleteHugeFile();
+      delete(hugefileRenamed, false);
+    } finally {
+      ContractTestUtils.rm(getFileSystem(), getTestPath(), true, false);
+    }
+  }
+
+  /**
+   * After all the work, dump the statistics.
+   */
+  @Test
+  public void test_900_dumpStats() {
+    StringBuilder sb = new StringBuilder();
+
+    getFileSystem().getStorageStatistics()
+        .forEach(kv -> sb.append(kv.toString()).append("\n"));
+
+    LOG.info("Statistics\n{}", sb);
   }
 
   protected void deleteHugeFile() throws IOException {
-    describe("Deleting %s", hugefile);
-    NanoTimer timer = new NanoTimer();
-    getFileSystem().delete(hugefile, false);
-    timer.end("time to delete %s", hugefile);
+    delete(hugefile, false);
+  }
+
+  /**
+   * Delete any file, time how long it took.
+   * @param path path to delete
+   * @param recursive recursive flag
+   */
+  protected void delete(Path path, boolean recursive) throws IOException {
+    describe("Deleting %s", path);
+    ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
+    getFileSystem().delete(path, recursive);
+    timer.end("time to delete %s", path);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/resources/core-site.xml
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/resources/core-site.xml b/hadoop-tools/hadoop-aws/src/test/resources/core-site.xml
index e8200da..b68f559 100644
--- a/hadoop-tools/hadoop-aws/src/test/resources/core-site.xml
+++ b/hadoop-tools/hadoop-aws/src/test/resources/core-site.xml
@@ -27,7 +27,7 @@
     <name>hadoop.tmp.dir</name>
     <value>target/build/test</value>
     <description>A base for other temporary directories.</description>
-    <final>true</final>
+    <final>false</final>
   </property>
 
   <property>
@@ -36,6 +36,13 @@
     <description>The endpoint for s3a://landsat-pds URLs</description>
   </property>
 
+  <property>
+    <name>fs.s3a.bucket.landsat-pds.multipart.purge</name>
+    <value>false</value>
+    <description>Don't try to purge uploads in the read-only bucket, as
+    it will only create log noise.</description>
+  </property>
+
   <!-- Make sure S3Guard is disabled for read-only bucket tests. -->
   <property>
     <name>fs.s3a.bucket.landsat-pds.metadatastore.impl</name>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties b/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties
index acbe7f1..383dec8 100644
--- a/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties
+++ b/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties
@@ -18,15 +18,50 @@ log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} (%F:%M(%L)) -
%m%n
 
 log4j.logger.org.apache.hadoop.util.NativeCodeLoader=ERROR
+log4j.logger.org.apache.hadoop.conf.Configuration.deprecation=WARN
+log4j.logger.org.apache.hadoop.util.GSet=WARN
+# MiniDFS clusters can be noisy
+log4j.logger.org.apache.hadoop.hdfs.server=ERROR
+log4j.logger.org.apache.hadoop.metrics2=WARN
+log4j.logger.org.apache.hadoop.net.NetworkTopology=WARN
+log4j.logger.org.apache.hadoop.util.JvmPauseMonitor=WARN
+log4j.logger.org.apache.hadoop.ipc=WARN
+log4j.logger.org.apache.hadoop.http=WARN
+log4j.logger.org.apache.hadoop.security.authentication.server.AuthenticationFilter=WARN
+log4j.logger.org.apache.hadoop.util.HostsFileReader=WARN
+log4j.logger.org.apache.commons.beanutils=WARN
+log4j.logger.org.apache.hadoop.hdfs.StateChange=WARN
+log4j.logger.BlockStateChange=WARN
+log4j.logger.org.apache.hadoop.hdfs.DFSUtil=WARN
+
+## YARN can be noisy too
+log4j.logger.org.apache.hadoop.yarn.server.resourcemanager.scheduler=WARN
+log4j.logger.org.apache.hadoop.yarn.server.nodemanager=WARN
+log4j.logger.org.apache.hadoop.yarn.event=WARN
+log4j.logger.org.apache.hadoop.yarn.util.ResourceCalculatorPlugin=ERROR
+log4j.logger.org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor=WARN
+log4j.logger.org.apache.hadoop.mapred.IndexCache=WARN
+log4j.logger.org.apache.hadoop.yarn.webapp.WebApps=WARN
+log4j.logger.org.apache.hadoop.yarn.server.resourcemanager.security=WARN
+log4j.logger.org.apache.hadoop.yarn.util.AbstractLivelinessMonitor=WARN
+log4j.logger.org.apache.hadoop.security.token.delegation=WARN
+log4j.logger.org.apache.hadoop.mapred.ShuffleHandler=WARN
+log4j.logger.org.apache.hadoop.ipc.Server=WARN
+#log4j.logger.=WARN
+
 
 # for debugging low level S3a operations, uncomment these lines
 # Log all S3A classes
 #log4j.logger.org.apache.hadoop.fs.s3a=DEBUG
-
+#log4j.logger.org.apache.hadoop.fs.s3a.S3AUtils=INFO
+#log4j.logger.org.apache.hadoop.fs.s3a.Listing=INFO
 # Log S3Guard classes
 #log4j.logger.org.apache.hadoop.fs.s3a.s3guard=DEBUG
 
-# Enable debug logging of AWS DynamoDB client
+# Log Committer classes
+#log4j.logger.org.apache.hadoop.fs.s3a.commit=DEBUG
+
+# Enable debug logging of AWS Dynamo client
 #log4j.logger.com.amazonaws.services.dynamodbv2.AmazonDynamoDB=DEBUG
 
 # Log all HTTP requests made; includes S3 interaction. This may
@@ -35,3 +70,6 @@ log4j.logger.org.apache.hadoop.util.NativeCodeLoader=ERROR
 
 # Turn on low level HTTP protocol debugging
 #log4j.logger.com.amazonaws.thirdparty.apache.http=DEBUG
+
+log4j.logger.org.apache.hadoop.mapreduce.lib.output=DEBUG
+log4j.logger.org.apache.hadoop.fs.s3a.S3AStorageStatistics=INFO

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/JsonSerDeser.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/JsonSerDeser.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/JsonSerDeser.java
index eddff20..491bcf9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/JsonSerDeser.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/JsonSerDeser.java
@@ -18,212 +18,47 @@
 
 package org.apache.hadoop.registry.client.binding;
 
-import com.fasterxml.jackson.core.JsonParseException;
 import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.JsonMappingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializationFeature;
-import com.google.common.base.Preconditions;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.registry.client.exceptions.InvalidRecordException;
 import org.apache.hadoop.registry.client.exceptions.NoRecordException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.hadoop.util.JsonSerialization;
 
-import java.io.DataOutputStream;
 import java.io.EOFException;
-import java.io.File;
-import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.io.InputStream;
 
 /**
  * Support for marshalling objects to and from JSON.
  *  <p>
- * It constructs an object mapper as an instance field.
- * and synchronizes access to those methods
- * which use the mapper
+ * This extends {@link JsonSerialization} with the notion
+ * of a marker field in the JSON file, with
+ * <ol>
+ *   <li>a fail-fast check for it before even trying to parse.</li>
+ *   <li>Specific IOException subclasses for a failure.</li>
+ * </ol>
+ * The rationale for this is not only to support different things in the,
+ * registry, but the fact that all ZK nodes have a size &gt; 0 when examined.
+ *
  * @param <T> Type to marshal.
  */
-@InterfaceAudience.Private()
+@InterfaceAudience.Private
 @InterfaceStability.Evolving
-public class JsonSerDeser<T> {
+public class JsonSerDeser<T> extends JsonSerialization<T> {
 
-  private static final Logger LOG = LoggerFactory.getLogger(JsonSerDeser.class);
   private static final String UTF_8 = "UTF-8";
   public static final String E_NO_DATA = "No data at path";
   public static final String E_DATA_TOO_SHORT = "Data at path too short";
   public static final String E_MISSING_MARKER_STRING =
       "Missing marker string: ";
 
-  private final Class<T> classType;
-  private final ObjectMapper mapper;
-
   /**
    * Create an instance bound to a specific type
    * @param classType class to marshall
    */
   public JsonSerDeser(Class<T> classType) {
-    Preconditions.checkArgument(classType != null, "null classType");
-    this.classType = classType;
-    this.mapper = new ObjectMapper();
-    mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-  }
-
-  /**
-   * Get the simple name of the class type to be marshalled
-   * @return the name of the class being marshalled
-   */
-  public String getName() {
-    return classType.getSimpleName();
-  }
-
-  /**
-   * Convert from JSON
-   *
-   * @param json input
-   * @return the parsed JSON
-   * @throws IOException IO problems
-   * @throws JsonParseException If the input is not well-formatted
-   * @throws JsonMappingException failure to map from the JSON to this class
-   */
-  @SuppressWarnings("unchecked")
-  public synchronized T fromJson(String json)
-      throws IOException, JsonParseException, JsonMappingException {
-    try {
-      return mapper.readValue(json, classType);
-    } catch (IOException e) {
-      LOG.error("Exception while parsing json : " + e + "\n" + json, e);
-      throw e;
-    }
-  }
-
-  /**
-   * Convert from a JSON file
-   * @param jsonFile input file
-   * @return the parsed JSON
-   * @throws IOException IO problems
-   * @throws JsonParseException If the input is not well-formatted
-   * @throws JsonMappingException failure to map from the JSON to this class
-   */
-  @SuppressWarnings("unchecked")
-  public synchronized T fromFile(File jsonFile)
-      throws IOException, JsonParseException, JsonMappingException {
-    try {
-      return mapper.readValue(jsonFile, classType);
-    } catch (IOException e) {
-      LOG.error("Exception while parsing json file {}: {}", jsonFile, e);
-      throw e;
-    }
-  }
-
-  /**
-   * Convert from a JSON file
-   * @param resource input file
-   * @return the parsed JSON
-   * @throws IOException IO problems
-   * @throws JsonParseException If the input is not well-formatted
-   * @throws JsonMappingException failure to map from the JSON to this class
-   */
-  @SuppressWarnings({"IOResourceOpenedButNotSafelyClosed"})
-  public synchronized T fromResource(String resource)
-      throws IOException, JsonParseException, JsonMappingException {
-    InputStream resStream = null;
-    try {
-      resStream = this.getClass().getResourceAsStream(resource);
-      if (resStream == null) {
-        throw new FileNotFoundException(resource);
-      }
-      return mapper.readValue(resStream, classType);
-    } catch (IOException e) {
-      LOG.error("Exception while parsing json resource {}: {}", resource, e);
-      throw e;
-    } finally {
-      IOUtils.closeStream(resStream);
-    }
-  }
-
-  /**
-   * clone by converting to JSON and back again.
-   * This is much less efficient than any Java clone process.
-   * @param instance instance to duplicate
-   * @return a new instance
-   * @throws IOException problems.
-   */
-  public T fromInstance(T instance) throws IOException {
-    return fromJson(toJson(instance));
-  }
-
-  /**
-   * Load from a Hadoop filesystem
-   * @param fs filesystem
-   * @param path path
-   * @return a loaded CD
-   * @throws IOException IO problems
-   * @throws EOFException if not enough bytes were read in
-   * @throws JsonParseException parse problems
-   * @throws JsonMappingException O/J mapping problems
-   */
-  public T load(FileSystem fs, Path path)
-      throws IOException, JsonParseException, JsonMappingException {
-    FileStatus status = fs.getFileStatus(path);
-    long len = status.getLen();
-    byte[] b = new byte[(int) len];
-    FSDataInputStream dataInputStream = fs.open(path);
-    int count = dataInputStream.read(b);
-    if (count != len) {
-      throw new EOFException(path.toString() + ": read finished prematurely");
-    }
-    return fromBytes(path.toString(), b);
-  }
-
-  /**
-   * Save a cluster description to a hadoop filesystem
-   * @param fs filesystem
-   * @param path path
-   * @param overwrite should any existing file be overwritten
-   * @throws IOException IO exception
-   */
-  public void save(FileSystem fs, Path path, T instance,
-      boolean overwrite) throws
-      IOException {
-    FSDataOutputStream dataOutputStream = fs.create(path, overwrite);
-    writeJsonAsBytes(instance, dataOutputStream);
-  }
-
-  /**
-   * Write the json as bytes -then close the file
-   * @param dataOutputStream an outout stream that will always be closed
-   * @throws IOException on any failure
-   */
-  private void writeJsonAsBytes(T instance,
-      DataOutputStream dataOutputStream) throws IOException {
-    try {
-      byte[] b = toBytes(instance);
-      dataOutputStream.write(b);
-    } finally {
-      dataOutputStream.close();
-    }
-  }
-
-  /**
-   * Convert JSON To bytes
-   * @param instance instance to convert
-   * @return a byte array
-   * @throws IOException
-   */
-  public byte[] toBytes(T instance) throws IOException {
-    String json = toJson(instance);
-    return json.getBytes(UTF_8);
+    super(classType, false, false);
   }
 
   /**
@@ -233,9 +68,10 @@ public class JsonSerDeser<T> {
    * @throws IOException all problems
    * @throws EOFException not enough data
    * @throws InvalidRecordException if the parsing failed -the record is invalid
+   * @throws NoRecordException if the data is not considered a record: either
+   * it is too short or it did not contain the marker string.
    */
-  public T fromBytes(String path, byte[] bytes) throws IOException,
-      InvalidRecordException {
+  public T fromBytes(String path, byte[] bytes) throws IOException {
     return fromBytes(path, bytes, "");
   }
 
@@ -258,7 +94,7 @@ public class JsonSerDeser<T> {
    * it is too short or it did not contain the marker string.
    */
   public T fromBytes(String path, byte[] bytes, String marker)
-      throws IOException, NoRecordException, InvalidRecordException {
+      throws IOException {
     int len = bytes.length;
     if (len == 0 ) {
       throw new NoRecordException(path, E_NO_DATA);
@@ -278,30 +114,4 @@ public class JsonSerDeser<T> {
     }
   }
 
-  /**
-   * Convert an instance to a JSON string
-   * @param instance instance to convert
-   * @return a JSON string description
-   * @throws JsonProcessingException Json generation problems
-   */
-  public synchronized String toJson(T instance) throws JsonProcessingException {
-    mapper.configure(SerializationFeature.INDENT_OUTPUT, true);
-    return mapper.writeValueAsString(instance);
-  }
-
-  /**
-   * Convert an instance to a string form for output. This is a robust
-   * operation which will convert any JSON-generating exceptions into
-   * error text.
-   * @param instance non-null instance
-   * @return a JSON string
-   */
-  public String toString(T instance) {
-    Preconditions.checkArgument(instance != null, "Null instance argument");
-    try {
-      return toJson(instance);
-    } catch (JsonProcessingException e) {
-      return "Failed to convert to a string: " + e;
-    }
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message