Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 55F70200D41 for ; Wed, 22 Nov 2017 18:20:53 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 530BF160C11; Wed, 22 Nov 2017 17:20:53 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E98B1160BEC for ; Wed, 22 Nov 2017 18:20:51 +0100 (CET) Received: (qmail 52718 invoked by uid 500); 22 Nov 2017 17:20:51 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 52709 invoked by uid 99); 22 Nov 2017 17:20:50 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 22 Nov 2017 17:20:50 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D620DDFD78; Wed, 22 Nov 2017 17:20:50 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: stevel@apache.org To: common-commits@hadoop.apache.org Date: Wed, 22 Nov 2017 17:20:50 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [01/14] hadoop git commit: HADOOP-13786 Add S3A committer for zero-rename commits to S3 endpoints. Contributed by Steve Loughran and Ryan Blue. archived-at: Wed, 22 Nov 2017 17:20:53 -0000 Repository: hadoop Updated Branches: refs/heads/trunk 782ba3bf9 -> de8b6ca5e http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java index 230dbad..02236eb 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java @@ -26,6 +26,7 @@ import com.amazonaws.event.ProgressEvent; import com.amazonaws.event.ProgressEventType; import com.amazonaws.event.ProgressListener; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.s3a.S3ATestUtils; import org.junit.FixMethodOrder; import org.junit.Test; import org.junit.runners.MethodSorters; @@ -70,16 +71,26 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase { private int uploadBlockSize = DEFAULT_UPLOAD_BLOCKSIZE; private int partitionSize; + private long filesize; @Override public void setup() throws Exception { super.setup(); - final Path testPath = getTestPath(); - scaleTestDir = new Path(testPath, "scale"); + scaleTestDir = new Path(getTestPath(), getTestSuiteName()); hugefile = new Path(scaleTestDir, "hugefile"); hugefileRenamed = new Path(scaleTestDir, "hugefileRenamed"); + filesize = getTestPropertyBytes(getConf(), KEY_HUGE_FILESIZE, + DEFAULT_HUGE_FILESIZE); } + /** + * Get the name of this test suite, which is used in path generation. + * Base implementation uses {@link #getBlockOutputBufferName()} for this. + * @return the name of the suite. + */ + public String getTestSuiteName() { + return getBlockOutputBufferName(); + } /** * Note that this can get called before test setup. @@ -88,7 +99,7 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase { @Override protected Configuration createScaleConfiguration() { Configuration conf = super.createScaleConfiguration(); - partitionSize = (int)getTestPropertyBytes(conf, + partitionSize = (int) getTestPropertyBytes(conf, KEY_HUGE_PARTITION_SIZE, DEFAULT_PARTITION_SIZE); assertTrue("Partition size too small: " + partitionSize, @@ -99,6 +110,7 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase { conf.setInt(MULTIPART_SIZE, partitionSize); conf.set(USER_AGENT_PREFIX, "STestS3AHugeFileCreate"); conf.set(FAST_UPLOAD_BUFFER, getBlockOutputBufferName()); + S3ATestUtils.disableFilesystemCaching(conf); return conf; } @@ -111,17 +123,16 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase { @Test public void test_010_CreateHugeFile() throws IOException { assertFalse("Please run this test sequentially to avoid timeouts" + - " and bandwidth problems", isParallelExecution()); - long filesize = getTestPropertyBytes(getConf(), KEY_HUGE_FILESIZE, - DEFAULT_HUGE_FILESIZE); + " and bandwidth problems", isParallelExecution()); long filesizeMB = filesize / _1MB; // clean up from any previous attempts deleteHugeFile(); + Path fileToCreate = getPathOfFileToCreate(); describe("Creating file %s of size %d MB" + " with partition size %d buffered by %s", - hugefile, filesizeMB, partitionSize, getBlockOutputBufferName()); + fileToCreate, filesizeMB, partitionSize, getBlockOutputBufferName()); // now do a check of available upload time, with a pessimistic bandwidth // (that of remote upload tests). If the test times out then not only is @@ -134,7 +145,7 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase { assertTrue(String.format("Timeout set in %s seconds is too low;" + " estimating upload time of %d seconds at 1 MB/s." + " Rerun tests with -D%s=%d", - timeout, uploadTime, KEY_TEST_TIMEOUT, uploadTime * 2), + timeout, uploadTime, KEY_TEST_TIMEOUT, uploadTime * 2), uploadTime < timeout); assertEquals("File size set in " + KEY_HUGE_FILESIZE + " = " + filesize + " is not a multiple of " + uploadBlockSize, @@ -162,7 +173,7 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase { S3AInstrumentation.OutputStreamStatistics streamStatistics; long blocksPer10MB = blocksPerMB * 10; ProgressCallback progress = new ProgressCallback(timer); - try (FSDataOutputStream out = fs.create(hugefile, + try (FSDataOutputStream out = fs.create(fileToCreate, true, uploadBlockSize, progress)) { @@ -219,14 +230,8 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase { toHuman(timer.nanosPerOperation(putRequestCount))); assertEquals("active put requests in \n" + fs, 0, gaugeValue(putRequestsActive)); - ContractTestUtils.assertPathExists(fs, "Huge file", hugefile); - FileStatus status = fs.getFileStatus(hugefile); - ContractTestUtils.assertIsFile(hugefile, status); - assertEquals("File size in " + status, filesize, status.getLen()); - if (progress != null) { - progress.verifyNoFailures("Put file " + hugefile - + " of size " + filesize); - } + progress.verifyNoFailures( + "Put file " + fileToCreate + " of size " + filesize); if (streamStatistics != null) { assertEquals("actively allocated blocks in " + streamStatistics, 0, streamStatistics.blocksActivelyAllocated()); @@ -234,6 +239,39 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase { } /** + * Get the path of the file which is to created. This is normally + * {@link #hugefile} + * @return the path to use when creating the file. + */ + protected Path getPathOfFileToCreate() { + return this.hugefile; + } + + protected Path getScaleTestDir() { + return scaleTestDir; + } + + protected Path getHugefile() { + return hugefile; + } + + public void setHugefile(Path hugefile) { + this.hugefile = hugefile; + } + + protected Path getHugefileRenamed() { + return hugefileRenamed; + } + + protected int getUploadBlockSize() { + return uploadBlockSize; + } + + protected int getPartitionSize() { + return partitionSize; + } + + /** * Progress callback from AWS. Likely to come in on a different thread. */ private final class ProgressCallback implements Progressable, @@ -294,22 +332,54 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase { } private void verifyNoFailures(String operation) { - assertEquals("Failures in " + operation +": " + this, 0, failures.get()); + assertEquals("Failures in " + operation + ": " + this, 0, failures.get()); } } + /** + * Assume that the huge file exists; skip the test if it does not. + * @throws IOException IO failure + */ void assumeHugeFileExists() throws IOException { + assumeFileExists(this.hugefile); + } + + /** + * Assume a specific file exists. + * @param file file to look for + * @throws IOException IO problem + */ + private void assumeFileExists(Path file) throws IOException { S3AFileSystem fs = getFileSystem(); - ContractTestUtils.assertPathExists(fs, "huge file not created", hugefile); - FileStatus status = fs.getFileStatus(hugefile); - ContractTestUtils.assertIsFile(hugefile, status); - assertTrue("File " + hugefile + " is empty", status.getLen() > 0); + ContractTestUtils.assertPathExists(fs, "huge file not created", + file); + FileStatus status = fs.getFileStatus(file); + ContractTestUtils.assertIsFile(file, status); + assertTrue("File " + file + " is empty", status.getLen() > 0); } private void logFSState() { LOG.info("File System state after operation:\n{}", getFileSystem()); } + /** + * This is the set of actions to perform when verifying the file actually + * was created. With the s3guard committer, the file doesn't come into + * existence; a different set of assertions must be checked. + */ + @Test + public void test_030_postCreationAssertions() throws Throwable { + S3AFileSystem fs = getFileSystem(); + ContractTestUtils.assertPathExists(fs, "Huge file", hugefile); + FileStatus status = fs.getFileStatus(hugefile); + ContractTestUtils.assertIsFile(hugefile, status); + assertEquals("File size in " + status, filesize, status.getLen()); + } + + /** + * Read in the file using Positioned read(offset) calls. + * @throws Throwable failure + */ @Test public void test_040_PositionedReadHugeFile() throws Throwable { assumeHugeFileExists(); @@ -323,11 +393,11 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase { describe("Positioned reads of %s %s", filetype, hugefile); S3AFileSystem fs = getFileSystem(); FileStatus status = fs.getFileStatus(hugefile); - long filesize = status.getLen(); + long size = status.getLen(); int ops = 0; final int bufferSize = 8192; byte[] buffer = new byte[bufferSize]; - long eof = filesize - 1; + long eof = size - 1; ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer(); ContractTestUtils.NanoTimer readAtByte0, readAtByte0Again, readAtEOF; @@ -348,23 +418,27 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase { ops++; LOG.info("Final stream state: {}", in); } - long mb = Math.max(filesize / _1MB, 1); + long mb = Math.max(size / _1MB, 1); logFSState(); - timer.end("time to performed positioned reads of %s of %d MB ", + timer.end("time to perform positioned reads of %s of %d MB ", filetype, mb); LOG.info("Time per positioned read = {} nS", toHuman(timer.nanosPerOperation(ops))); } + /** + * Read in the entire file using read() calls. + * @throws Throwable failure + */ @Test public void test_050_readHugeFile() throws Throwable { assumeHugeFileExists(); describe("Reading %s", hugefile); S3AFileSystem fs = getFileSystem(); FileStatus status = fs.getFileStatus(hugefile); - long filesize = status.getLen(); - long blocks = filesize / uploadBlockSize; + long size = status.getLen(); + long blocks = size / uploadBlockSize; byte[] data = new byte[uploadBlockSize]; ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer(); @@ -375,11 +449,11 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase { LOG.info("Final stream state: {}", in); } - long mb = Math.max(filesize / _1MB, 1); + long mb = Math.max(size / _1MB, 1); timer.end("time to read file of %d MB ", mb); LOG.info("Time per MB to read = {} nS", toHuman(timer.nanosPerOperation(mb))); - bandwidth(timer, filesize); + bandwidth(timer, size); logFSState(); } @@ -389,18 +463,18 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase { describe("renaming %s to %s", hugefile, hugefileRenamed); S3AFileSystem fs = getFileSystem(); FileStatus status = fs.getFileStatus(hugefile); - long filesize = status.getLen(); + long size = status.getLen(); fs.delete(hugefileRenamed, false); ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer(); fs.rename(hugefile, hugefileRenamed); - long mb = Math.max(filesize / _1MB, 1); + long mb = Math.max(size / _1MB, 1); timer.end("time to rename file of %d MB", mb); LOG.info("Time per MB to rename = {} nS", toHuman(timer.nanosPerOperation(mb))); - bandwidth(timer, filesize); + bandwidth(timer, size); logFSState(); FileStatus destFileStatus = fs.getFileStatus(hugefileRenamed); - assertEquals(filesize, destFileStatus.getLen()); + assertEquals(size, destFileStatus.getLen()); // rename back ContractTestUtils.NanoTimer timer2 = new ContractTestUtils.NanoTimer(); @@ -408,24 +482,49 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase { timer2.end("Renaming back"); LOG.info("Time per MB to rename = {} nS", toHuman(timer2.nanosPerOperation(mb))); - bandwidth(timer2, filesize); + bandwidth(timer2, size); } + /** + * Cleanup: delete the files. + */ @Test - public void test_999_DeleteHugeFiles() throws IOException { - deleteHugeFile(); - ContractTestUtils.NanoTimer timer2 = new ContractTestUtils.NanoTimer(); - S3AFileSystem fs = getFileSystem(); - fs.delete(hugefileRenamed, false); - timer2.end("time to delete %s", hugefileRenamed); - ContractTestUtils.rm(fs, getTestPath(), true, true); + public void test_800_DeleteHugeFiles() throws IOException { + try { + deleteHugeFile(); + delete(hugefileRenamed, false); + } finally { + ContractTestUtils.rm(getFileSystem(), getTestPath(), true, false); + } + } + + /** + * After all the work, dump the statistics. + */ + @Test + public void test_900_dumpStats() { + StringBuilder sb = new StringBuilder(); + + getFileSystem().getStorageStatistics() + .forEach(kv -> sb.append(kv.toString()).append("\n")); + + LOG.info("Statistics\n{}", sb); } protected void deleteHugeFile() throws IOException { - describe("Deleting %s", hugefile); - NanoTimer timer = new NanoTimer(); - getFileSystem().delete(hugefile, false); - timer.end("time to delete %s", hugefile); + delete(hugefile, false); + } + + /** + * Delete any file, time how long it took. + * @param path path to delete + * @param recursive recursive flag + */ + protected void delete(Path path, boolean recursive) throws IOException { + describe("Deleting %s", path); + ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer(); + getFileSystem().delete(path, recursive); + timer.end("time to delete %s", path); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/resources/core-site.xml ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/resources/core-site.xml b/hadoop-tools/hadoop-aws/src/test/resources/core-site.xml index e8200da..b68f559 100644 --- a/hadoop-tools/hadoop-aws/src/test/resources/core-site.xml +++ b/hadoop-tools/hadoop-aws/src/test/resources/core-site.xml @@ -27,7 +27,7 @@ hadoop.tmp.dir target/build/test A base for other temporary directories. - true + false @@ -36,6 +36,13 @@ The endpoint for s3a://landsat-pds URLs + + fs.s3a.bucket.landsat-pds.multipart.purge + false + Don't try to purge uploads in the read-only bucket, as + it will only create log noise. + + fs.s3a.bucket.landsat-pds.metadatastore.impl http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties b/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties index acbe7f1..383dec8 100644 --- a/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties +++ b/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties @@ -18,15 +18,50 @@ log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} (%F:%M(%L)) - %m%n log4j.logger.org.apache.hadoop.util.NativeCodeLoader=ERROR +log4j.logger.org.apache.hadoop.conf.Configuration.deprecation=WARN +log4j.logger.org.apache.hadoop.util.GSet=WARN +# MiniDFS clusters can be noisy +log4j.logger.org.apache.hadoop.hdfs.server=ERROR +log4j.logger.org.apache.hadoop.metrics2=WARN +log4j.logger.org.apache.hadoop.net.NetworkTopology=WARN +log4j.logger.org.apache.hadoop.util.JvmPauseMonitor=WARN +log4j.logger.org.apache.hadoop.ipc=WARN +log4j.logger.org.apache.hadoop.http=WARN +log4j.logger.org.apache.hadoop.security.authentication.server.AuthenticationFilter=WARN +log4j.logger.org.apache.hadoop.util.HostsFileReader=WARN +log4j.logger.org.apache.commons.beanutils=WARN +log4j.logger.org.apache.hadoop.hdfs.StateChange=WARN +log4j.logger.BlockStateChange=WARN +log4j.logger.org.apache.hadoop.hdfs.DFSUtil=WARN + +## YARN can be noisy too +log4j.logger.org.apache.hadoop.yarn.server.resourcemanager.scheduler=WARN +log4j.logger.org.apache.hadoop.yarn.server.nodemanager=WARN +log4j.logger.org.apache.hadoop.yarn.event=WARN +log4j.logger.org.apache.hadoop.yarn.util.ResourceCalculatorPlugin=ERROR +log4j.logger.org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor=WARN +log4j.logger.org.apache.hadoop.mapred.IndexCache=WARN +log4j.logger.org.apache.hadoop.yarn.webapp.WebApps=WARN +log4j.logger.org.apache.hadoop.yarn.server.resourcemanager.security=WARN +log4j.logger.org.apache.hadoop.yarn.util.AbstractLivelinessMonitor=WARN +log4j.logger.org.apache.hadoop.security.token.delegation=WARN +log4j.logger.org.apache.hadoop.mapred.ShuffleHandler=WARN +log4j.logger.org.apache.hadoop.ipc.Server=WARN +#log4j.logger.=WARN + # for debugging low level S3a operations, uncomment these lines # Log all S3A classes #log4j.logger.org.apache.hadoop.fs.s3a=DEBUG - +#log4j.logger.org.apache.hadoop.fs.s3a.S3AUtils=INFO +#log4j.logger.org.apache.hadoop.fs.s3a.Listing=INFO # Log S3Guard classes #log4j.logger.org.apache.hadoop.fs.s3a.s3guard=DEBUG -# Enable debug logging of AWS DynamoDB client +# Log Committer classes +#log4j.logger.org.apache.hadoop.fs.s3a.commit=DEBUG + +# Enable debug logging of AWS Dynamo client #log4j.logger.com.amazonaws.services.dynamodbv2.AmazonDynamoDB=DEBUG # Log all HTTP requests made; includes S3 interaction. This may @@ -35,3 +70,6 @@ log4j.logger.org.apache.hadoop.util.NativeCodeLoader=ERROR # Turn on low level HTTP protocol debugging #log4j.logger.com.amazonaws.thirdparty.apache.http=DEBUG + +log4j.logger.org.apache.hadoop.mapreduce.lib.output=DEBUG +log4j.logger.org.apache.hadoop.fs.s3a.S3AStorageStatistics=INFO http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/JsonSerDeser.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/JsonSerDeser.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/JsonSerDeser.java index eddff20..491bcf9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/JsonSerDeser.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/JsonSerDeser.java @@ -18,212 +18,47 @@ package org.apache.hadoop.registry.client.binding; -import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.JsonMappingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.SerializationFeature; -import com.google.common.base.Preconditions; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.registry.client.exceptions.InvalidRecordException; import org.apache.hadoop.registry.client.exceptions.NoRecordException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.hadoop.util.JsonSerialization; -import java.io.DataOutputStream; import java.io.EOFException; -import java.io.File; -import java.io.FileNotFoundException; import java.io.IOException; -import java.io.InputStream; /** * Support for marshalling objects to and from JSON. *

- * It constructs an object mapper as an instance field. - * and synchronizes access to those methods - * which use the mapper + * This extends {@link JsonSerialization} with the notion + * of a marker field in the JSON file, with + *

    + *
  1. a fail-fast check for it before even trying to parse.
  2. + *
  3. Specific IOException subclasses for a failure.
  4. + *
+ * The rationale for this is not only to support different things in the, + * registry, but the fact that all ZK nodes have a size > 0 when examined. + * * @param Type to marshal. */ -@InterfaceAudience.Private() +@InterfaceAudience.Private @InterfaceStability.Evolving -public class JsonSerDeser { +public class JsonSerDeser extends JsonSerialization { - private static final Logger LOG = LoggerFactory.getLogger(JsonSerDeser.class); private static final String UTF_8 = "UTF-8"; public static final String E_NO_DATA = "No data at path"; public static final String E_DATA_TOO_SHORT = "Data at path too short"; public static final String E_MISSING_MARKER_STRING = "Missing marker string: "; - private final Class classType; - private final ObjectMapper mapper; - /** * Create an instance bound to a specific type * @param classType class to marshall */ public JsonSerDeser(Class classType) { - Preconditions.checkArgument(classType != null, "null classType"); - this.classType = classType; - this.mapper = new ObjectMapper(); - mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - } - - /** - * Get the simple name of the class type to be marshalled - * @return the name of the class being marshalled - */ - public String getName() { - return classType.getSimpleName(); - } - - /** - * Convert from JSON - * - * @param json input - * @return the parsed JSON - * @throws IOException IO problems - * @throws JsonParseException If the input is not well-formatted - * @throws JsonMappingException failure to map from the JSON to this class - */ - @SuppressWarnings("unchecked") - public synchronized T fromJson(String json) - throws IOException, JsonParseException, JsonMappingException { - try { - return mapper.readValue(json, classType); - } catch (IOException e) { - LOG.error("Exception while parsing json : " + e + "\n" + json, e); - throw e; - } - } - - /** - * Convert from a JSON file - * @param jsonFile input file - * @return the parsed JSON - * @throws IOException IO problems - * @throws JsonParseException If the input is not well-formatted - * @throws JsonMappingException failure to map from the JSON to this class - */ - @SuppressWarnings("unchecked") - public synchronized T fromFile(File jsonFile) - throws IOException, JsonParseException, JsonMappingException { - try { - return mapper.readValue(jsonFile, classType); - } catch (IOException e) { - LOG.error("Exception while parsing json file {}: {}", jsonFile, e); - throw e; - } - } - - /** - * Convert from a JSON file - * @param resource input file - * @return the parsed JSON - * @throws IOException IO problems - * @throws JsonParseException If the input is not well-formatted - * @throws JsonMappingException failure to map from the JSON to this class - */ - @SuppressWarnings({"IOResourceOpenedButNotSafelyClosed"}) - public synchronized T fromResource(String resource) - throws IOException, JsonParseException, JsonMappingException { - InputStream resStream = null; - try { - resStream = this.getClass().getResourceAsStream(resource); - if (resStream == null) { - throw new FileNotFoundException(resource); - } - return mapper.readValue(resStream, classType); - } catch (IOException e) { - LOG.error("Exception while parsing json resource {}: {}", resource, e); - throw e; - } finally { - IOUtils.closeStream(resStream); - } - } - - /** - * clone by converting to JSON and back again. - * This is much less efficient than any Java clone process. - * @param instance instance to duplicate - * @return a new instance - * @throws IOException problems. - */ - public T fromInstance(T instance) throws IOException { - return fromJson(toJson(instance)); - } - - /** - * Load from a Hadoop filesystem - * @param fs filesystem - * @param path path - * @return a loaded CD - * @throws IOException IO problems - * @throws EOFException if not enough bytes were read in - * @throws JsonParseException parse problems - * @throws JsonMappingException O/J mapping problems - */ - public T load(FileSystem fs, Path path) - throws IOException, JsonParseException, JsonMappingException { - FileStatus status = fs.getFileStatus(path); - long len = status.getLen(); - byte[] b = new byte[(int) len]; - FSDataInputStream dataInputStream = fs.open(path); - int count = dataInputStream.read(b); - if (count != len) { - throw new EOFException(path.toString() + ": read finished prematurely"); - } - return fromBytes(path.toString(), b); - } - - /** - * Save a cluster description to a hadoop filesystem - * @param fs filesystem - * @param path path - * @param overwrite should any existing file be overwritten - * @throws IOException IO exception - */ - public void save(FileSystem fs, Path path, T instance, - boolean overwrite) throws - IOException { - FSDataOutputStream dataOutputStream = fs.create(path, overwrite); - writeJsonAsBytes(instance, dataOutputStream); - } - - /** - * Write the json as bytes -then close the file - * @param dataOutputStream an outout stream that will always be closed - * @throws IOException on any failure - */ - private void writeJsonAsBytes(T instance, - DataOutputStream dataOutputStream) throws IOException { - try { - byte[] b = toBytes(instance); - dataOutputStream.write(b); - } finally { - dataOutputStream.close(); - } - } - - /** - * Convert JSON To bytes - * @param instance instance to convert - * @return a byte array - * @throws IOException - */ - public byte[] toBytes(T instance) throws IOException { - String json = toJson(instance); - return json.getBytes(UTF_8); + super(classType, false, false); } /** @@ -233,9 +68,10 @@ public class JsonSerDeser { * @throws IOException all problems * @throws EOFException not enough data * @throws InvalidRecordException if the parsing failed -the record is invalid + * @throws NoRecordException if the data is not considered a record: either + * it is too short or it did not contain the marker string. */ - public T fromBytes(String path, byte[] bytes) throws IOException, - InvalidRecordException { + public T fromBytes(String path, byte[] bytes) throws IOException { return fromBytes(path, bytes, ""); } @@ -258,7 +94,7 @@ public class JsonSerDeser { * it is too short or it did not contain the marker string. */ public T fromBytes(String path, byte[] bytes, String marker) - throws IOException, NoRecordException, InvalidRecordException { + throws IOException { int len = bytes.length; if (len == 0 ) { throw new NoRecordException(path, E_NO_DATA); @@ -278,30 +114,4 @@ public class JsonSerDeser { } } - /** - * Convert an instance to a JSON string - * @param instance instance to convert - * @return a JSON string description - * @throws JsonProcessingException Json generation problems - */ - public synchronized String toJson(T instance) throws JsonProcessingException { - mapper.configure(SerializationFeature.INDENT_OUTPUT, true); - return mapper.writeValueAsString(instance); - } - - /** - * Convert an instance to a string form for output. This is a robust - * operation which will convert any JSON-generating exceptions into - * error text. - * @param instance non-null instance - * @return a JSON string - */ - public String toString(T instance) { - Preconditions.checkArgument(instance != null, "Null instance argument"); - try { - return toJson(instance); - } catch (JsonProcessingException e) { - return "Failed to convert to a string: " + e; - } - } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org