Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 1DD24200C69 for ; Sat, 22 Apr 2017 01:38:23 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 1C5C4160BB5; Fri, 21 Apr 2017 23:38:23 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 4D85F160B97 for ; Sat, 22 Apr 2017 01:38:21 +0200 (CEST) Received: (qmail 42119 invoked by uid 500); 21 Apr 2017 23:38:20 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 42106 invoked by uid 99); 21 Apr 2017 23:38:20 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 21 Apr 2017 23:38:20 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 554D1ED4A1; Fri, 21 Apr 2017 23:38:20 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: lcwik@apache.org To: commits@beam.apache.org Date: Fri, 21 Apr 2017 23:38:20 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/4] beam git commit: [BEAM-1871] Move Xml IO and related classes to new sdks/java/io/xml package. archived-at: Fri, 21 Apr 2017 23:38:23 -0000 Repository: beam Updated Branches: refs/heads/master 022d5b657 -> 62f041e56 http://git-wip-us.apache.org/repos/asf/beam/blob/393a90c7/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/package-info.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/package-info.java new file mode 100644 index 0000000..9c5089a --- /dev/null +++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Transforms for reading and writing Xml files. + */ +package org.apache.beam.sdk.io.xml; http://git-wip-us.apache.org/repos/asf/beam/blob/393a90c7/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java new file mode 100644 index 0000000..5f1330d --- /dev/null +++ b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.xml; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +import com.google.common.collect.ImmutableList; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; +import javax.xml.bind.annotation.XmlRootElement; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.StandardCoder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.testing.CoderProperties; +import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.util.SerializableUtils; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Unit tests for {@link JAXBCoder}. */ +@RunWith(JUnit4.class) +public class JAXBCoderTest { + + @XmlRootElement + static class TestType { + private String testString = null; + private int testInt; + + public TestType() {} + + public TestType(String testString, int testInt) { + this.testString = testString; + this.testInt = testInt; + } + + public String getTestString() { + return testString; + } + + public void setTestString(String testString) { + this.testString = testString; + } + + public int getTestInt() { + return testInt; + } + + public void setTestInt(int testInt) { + this.testInt = testInt; + } + + @Override + public int hashCode() { + int hashCode = 1; + hashCode = 31 * hashCode + (testString == null ? 0 : testString.hashCode()); + hashCode = 31 * hashCode + testInt; + return hashCode; + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof TestType)) { + return false; + } + + TestType other = (TestType) obj; + return (testString == null || testString.equals(other.testString)) + && (testInt == other.testInt); + } + } + + @Test + public void testEncodeDecodeOuter() throws Exception { + JAXBCoder coder = JAXBCoder.of(TestType.class); + + byte[] encoded = CoderUtils.encodeToByteArray(coder, new TestType("abc", 9999)); + assertEquals(new TestType("abc", 9999), CoderUtils.decodeFromByteArray(coder, encoded)); + } + + @Test + public void testEncodeDecodeAfterClone() throws Exception { + JAXBCoder coder = SerializableUtils.clone(JAXBCoder.of(TestType.class)); + + byte[] encoded = CoderUtils.encodeToByteArray(coder, new TestType("abc", 9999)); + assertEquals(new TestType("abc", 9999), CoderUtils.decodeFromByteArray(coder, encoded)); + } + + @Test + public void testEncodeDecodeNested() throws Exception { + JAXBCoder jaxbCoder = JAXBCoder.of(TestType.class); + TestCoder nesting = new TestCoder(jaxbCoder); + + byte[] encoded = CoderUtils.encodeToByteArray(nesting, new TestType("abc", 9999)); + assertEquals( + new TestType("abc", 9999), CoderUtils.decodeFromByteArray(nesting, encoded)); + } + + @Test + public void testEncodeDecodeMultithreaded() throws Throwable { + final JAXBCoder coder = JAXBCoder.of(TestType.class); + int numThreads = 100; + + final CountDownLatch ready = new CountDownLatch(numThreads); + final CountDownLatch start = new CountDownLatch(1); + final CountDownLatch done = new CountDownLatch(numThreads); + + final AtomicReference thrown = new AtomicReference<>(); + + Executor executor = Executors.newCachedThreadPool(); + for (int i = 0; i < numThreads; i++) { + final TestType elem = new TestType("abc", i); + final int index = i; + executor.execute( + new Runnable() { + @Override + public void run() { + ready.countDown(); + try { + start.await(); + } catch (InterruptedException e) { + } + + try { + byte[] encoded = CoderUtils.encodeToByteArray(coder, elem); + assertEquals( + new TestType("abc", index), CoderUtils.decodeFromByteArray(coder, encoded)); + } catch (Throwable e) { + thrown.compareAndSet(null, e); + } + done.countDown(); + } + }); + } + ready.await(); + start.countDown(); + + done.await(); + Throwable actuallyThrown = thrown.get(); + if (actuallyThrown != null) { + throw actuallyThrown; + } + } + + /** + * A coder that surrounds the value with two values, to demonstrate nesting. + */ + private static class TestCoder extends StandardCoder { + private final JAXBCoder jaxbCoder; + public TestCoder(JAXBCoder jaxbCoder) { + this.jaxbCoder = jaxbCoder; + } + + @Override + public void encode(TestType value, OutputStream outStream, Context context) + throws CoderException, IOException { + Context nestedContext = context.nested(); + VarIntCoder.of().encode(3, outStream, nestedContext); + jaxbCoder.encode(value, outStream, nestedContext); + VarLongCoder.of().encode(22L, outStream, context); + } + + @Override + public TestType decode(InputStream inStream, Context context) + throws CoderException, IOException { + Context nestedContext = context.nested(); + VarIntCoder.of().decode(inStream, nestedContext); + TestType result = jaxbCoder.decode(inStream, nestedContext); + VarLongCoder.of().decode(inStream, context); + return result; + } + + @Override + public List> getCoderArguments() { + return ImmutableList.of(jaxbCoder); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + jaxbCoder.verifyDeterministic(); + } + } + + @Test + public void testEncodable() throws Exception { + CoderProperties.coderSerializable(JAXBCoder.of(TestType.class)); + } + + @Test + public void testEncodingId() throws Exception { + Coder coder = JAXBCoder.of(TestType.class); + CoderProperties.coderHasEncodingId( + coder, TestType.class.getName()); + } + + @Test + public void testEncodedTypeDescriptor() throws Exception { + assertThat( + JAXBCoder.of(TestType.class).getEncodedTypeDescriptor(), + equalTo(TypeDescriptor.of(TestType.class))); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/393a90c7/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java new file mode 100644 index 0000000..a6e1b87 --- /dev/null +++ b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.xml; + +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import com.google.common.collect.Lists; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileOutputStream; +import java.io.FileReader; +import java.nio.channels.WritableByteChannel; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlRootElement; +import javax.xml.bind.annotation.XmlType; +import org.apache.beam.sdk.io.xml.XmlSink.XmlWriteOperation; +import org.apache.beam.sdk.io.xml.XmlSink.XmlWriter; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for XmlSink. + */ +@RunWith(JUnit4.class) +public class XmlSinkTest { + @Rule + public TemporaryFolder tmpFolder = new TemporaryFolder(); + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + private String testRootElement = "testElement"; + private String testFilePrefix = "/path/to/testPrefix"; + + /** + * An XmlWriter correctly writes objects as Xml elements with an enclosing root element. + */ + @Test + public void testXmlWriter() throws Exception { + PipelineOptions options = PipelineOptionsFactory.create(); + XmlWriteOperation writeOp = + XmlIO.write() + .toFilenamePrefix(testFilePrefix) + .withRecordClass(Bird.class) + .withRootElement("birds") + .createSink() + .createWriteOperation(options); + XmlWriter writer = writeOp.createWriter(options); + + List bundle = + Lists.newArrayList(new Bird("bemused", "robin"), new Bird("evasive", "goose")); + List lines = Arrays.asList("", "", "robin", + "bemused", "", "", "goose", + "evasive", "", ""); + runTestWrite(writer, bundle, lines); + } + + /** + * Builder methods correctly initialize an XML Sink. + */ + @Test + public void testBuildXmlWriteTransform() { + XmlIO.Write write = + XmlIO.write() + .toFilenamePrefix(testFilePrefix) + .withRecordClass(Bird.class) + .withRootElement(testRootElement); + assertEquals(Bird.class, write.getRecordClass()); + assertEquals(testRootElement, write.getRootElement()); + assertEquals(testFilePrefix, write.getFilenamePrefix()); + } + + /** Validation ensures no fields are missing. */ + @Test + public void testValidateXmlSinkMissingRecordClass() { + thrown.expect(NullPointerException.class); + XmlIO.write() + .withRootElement(testRootElement) + .toFilenamePrefix(testFilePrefix) + .validate(null); + } + + @Test + public void testValidateXmlSinkMissingRootElement() { + thrown.expect(NullPointerException.class); + XmlIO.write().withRecordClass(Bird.class).toFilenamePrefix(testFilePrefix).validate(null); + } + + @Test + public void testValidateXmlSinkMissingFilePrefix() { + thrown.expect(NullPointerException.class); + XmlIO.write().withRecordClass(Bird.class).withRootElement(testRootElement).validate(null); + } + + /** + * An XML Sink correctly creates an XmlWriteOperation. + */ + @Test + public void testCreateWriteOperations() { + PipelineOptions options = PipelineOptionsFactory.create(); + XmlSink sink = + XmlIO.write() + .withRecordClass(Bird.class) + .withRootElement(testRootElement) + .toFilenamePrefix(testFilePrefix) + .createSink(); + XmlWriteOperation writeOp = sink.createWriteOperation(options); + Path outputPath = new File(testFilePrefix).toPath(); + Path tempPath = new File(writeOp.getTemporaryDirectory()).toPath(); + assertEquals(outputPath.getParent(), tempPath.getParent()); + assertThat( + tempPath.getFileName().toString(), containsString("temp-beam-" + outputPath.getFileName())); + } + + /** + * An XmlWriteOperation correctly creates an XmlWriter. + */ + @Test + public void testCreateWriter() throws Exception { + PipelineOptions options = PipelineOptionsFactory.create(); + XmlWriteOperation writeOp = + XmlIO.write() + .withRecordClass(Bird.class) + .withRootElement(testRootElement) + .toFilenamePrefix(testFilePrefix) + .createSink() + .createWriteOperation(options); + XmlWriter writer = writeOp.createWriter(options); + Path outputPath = new File(testFilePrefix).toPath(); + Path tempPath = new File(writer.getWriteOperation().getTemporaryDirectory()).toPath(); + assertEquals(outputPath.getParent(), tempPath.getParent()); + assertThat( + tempPath.getFileName().toString(), containsString("temp-beam-" + outputPath.getFileName())); + assertNotNull(writer.marshaller); + } + + @Test + public void testDisplayData() { + XmlIO.Write write = XmlIO.write() + .toFilenamePrefix("foobar") + .withRootElement("bird") + .withRecordClass(Integer.class); + + DisplayData displayData = DisplayData.from(write); + + assertThat(displayData, hasDisplayItem("fileNamePattern", "foobar-SSSSS-of-NNNNN.xml")); + assertThat(displayData, hasDisplayItem("rootElement", "bird")); + assertThat(displayData, hasDisplayItem("recordClass", Integer.class)); + } + + /** + * Write a bundle with an XmlWriter and verify the output is expected. + */ + private void runTestWrite(XmlWriter writer, List bundle, List expected) + throws Exception { + File tmpFile = tmpFolder.newFile("foo.txt"); + try (FileOutputStream fileOutputStream = new FileOutputStream(tmpFile)) { + writeBundle(writer, bundle, fileOutputStream.getChannel()); + } + List lines = new ArrayList<>(); + try (BufferedReader reader = new BufferedReader(new FileReader(tmpFile))) { + for (;;) { + String line = reader.readLine(); + if (line == null) { + break; + } + line = line.trim(); + if (line.length() > 0) { + lines.add(line); + } + } + assertEquals(expected, lines); + } + } + + /** + * Write a bundle with an XmlWriter. + */ + private void writeBundle(XmlWriter writer, List elements, WritableByteChannel channel) + throws Exception { + writer.prepareWrite(channel); + writer.writeHeader(); + for (T elem : elements) { + writer.write(elem); + } + writer.writeFooter(); + } + + /** + * Test JAXB annotated class. + */ + @SuppressWarnings("unused") + @XmlRootElement(name = "bird") + @XmlType(propOrder = {"name", "adjective"}) + private static final class Bird { + private String name; + private String adjective; + + @XmlElement(name = "species") + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getAdjective() { + return adjective; + } + + public void setAdjective(String adjective) { + this.adjective = adjective; + } + + public Bird() {} + + public Bird(String adjective, String name) { + this.adjective = adjective; + this.name = name; + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/393a90c7/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSourceTest.java b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSourceTest.java new file mode 100644 index 0000000..5b33be3 --- /dev/null +++ b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSourceTest.java @@ -0,0 +1,893 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.xml; + +import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionExhaustive; +import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionFails; +import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionSucceedsAndConsistent; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; +import static org.hamcrest.Matchers.both; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.containsString; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +import com.google.common.collect.ImmutableList; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import javax.xml.bind.annotation.XmlAttribute; +import javax.xml.bind.annotation.XmlRootElement; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.Source.Reader; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.NeedsRunner; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.values.PCollection; +import org.hamcrest.Matchers; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests XmlSource. + */ +@RunWith(JUnit4.class) +public class XmlSourceTest { + + @Rule + public TestPipeline p = TestPipeline.create(); + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Rule + public ExpectedException exception = ExpectedException.none(); + + String tinyXML = + "ThomasHenry" + + "James"; + + String xmlWithMultiByteElementName = + "<දුම්රියන්><දුම්රිය>Thomas<දුම්රිය>Henry" + + "<දුම්රිය>James"; + + String xmlWithMultiByteChars = + "Thomas¥Hen¶ry" + + "Jamßes"; + + String trainXML = + "" + + "Thomas1blue" + + "Henry3green" + + "Toby7brown" + + "Gordon4blue" + + "Emily-1red" + + "Percy6green" + + ""; + + String trainXMLWithEmptyTags = + "" + + "" + + "Thomas1blue" + + "Henry3green" + + "" + + "Toby7brown" + + "Gordon4blue" + + "Emily-1red" + + "Percy6green" + + ""; + + String trainXMLWithAttributes = + "" + + "Thomas1blue" + + "Henry3green" + + "Toby7brown" + + "Gordon4blue" + + "Emily-1red" + + "Percy6green" + + ""; + + String trainXMLWithSpaces = + "" + + "Thomas 1blue" + + "Henry3green\n" + + "Toby7 brown " + + "Gordon 4blue\n\t" + + "Emily-1\tred" + + "\nPercy 6 green" + + ""; + + String trainXMLWithAllFeaturesMultiByte = + "<දුම්රියන්>" + + "<දුම්රිය/>" + + "<දුම්රිය size=\"small\"> Thomas¥1blue" + + "" + + "<දුම්රිය size=\"big\">He nry3green" + + "<දුම්රිය size=\"small\">Toby 7br¶own" + + "" + + "<දුම්රිය/>" + + "<දුම්රිය size=\"big\">Gordon4 blue" + + "<දුම්රිය size=\"small\">Emily-1red" + + "<දුම්රිය size=\"small\">Percy6green" + + "" + + ""; + + String trainXMLWithAllFeaturesSingleByte = + "" + + "" + + " Thomas1blue" + + "" + + "He nry3green" + + "Toby 7brown" + + "" + + "" + + "Gordon4 blue" + + "Emily-1red" + + "Percy6green" + + "" + + ""; + + @XmlRootElement + static class Train { + public static final int TRAIN_NUMBER_UNDEFINED = -1; + public String name = null; + public String color = null; + public int number = TRAIN_NUMBER_UNDEFINED; + + @XmlAttribute(name = "size") + public String size = null; + + public Train() {} + + public Train(String name, int number, String color, String size) { + this.name = name; + this.number = number; + this.color = color; + this.size = size; + } + + @Override + public int hashCode() { + int hashCode = 1; + hashCode = 31 * hashCode + (name == null ? 0 : name.hashCode()); + hashCode = 31 * hashCode + number; + hashCode = 31 * hashCode + (color == null ? 0 : name.hashCode()); + hashCode = 31 * hashCode + (size == null ? 0 : name.hashCode()); + return hashCode; + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof Train)) { + return false; + } + + Train other = (Train) obj; + return (name == null || name.equals(other.name)) && (number == other.number) + && (color == null || color.equals(other.color)) + && (size == null || size.equals(other.size)); + } + + @Override + public String toString() { + String str = "Train["; + boolean first = true; + if (name != null) { + str = str + "name=" + name; + first = false; + } + if (number != Integer.MIN_VALUE) { + if (!first) { + str = str + ","; + } + str = str + "number=" + number; + first = false; + } + if (color != null) { + if (!first) { + str = str + ","; + } + str = str + "color=" + color; + first = false; + } + if (size != null) { + if (!first) { + str = str + ","; + } + str = str + "size=" + size; + } + str = str + "]"; + return str; + } + } + + private List generateRandomTrainList(int size) { + String[] names = {"Thomas", "Henry", "Gordon", "Emily", "Toby", "Percy", "Mavis", "Edward", + "Bertie", "Harold", "Hiro", "Terence", "Salty", "Trevor"}; + int[] numbers = {-1, 1, 2, 3, 4, 5, 6, 7, 8, 9}; + String[] colors = {"red", "blue", "green", "orange", "brown", "black", "white"}; + String[] sizes = {"small", "medium", "big"}; + + Random random = new Random(System.currentTimeMillis()); + + List trains = new ArrayList<>(); + for (int i = 0; i < size; i++) { + trains.add(new Train(names[random.nextInt(names.length - 1)], + numbers[random.nextInt(numbers.length - 1)], colors[random.nextInt(colors.length - 1)], + sizes[random.nextInt(sizes.length - 1)])); + } + + return trains; + } + + private String trainToXMLElement(Train train) { + return "" + train.name + "" + + train.number + "" + train.color + ""; + } + + private File createRandomTrainXML(String fileName, List trains) throws IOException { + File file = tempFolder.newFile(fileName); + try (BufferedWriter writer = new BufferedWriter(new FileWriter(file))) { + writer.write(""); + writer.newLine(); + for (Train train : trains) { + String str = trainToXMLElement(train); + writer.write(str); + writer.newLine(); + } + writer.write(""); + writer.newLine(); + } + return file; + } + + private List readEverythingFromReader(Reader reader) throws IOException { + List results = new ArrayList<>(); + for (boolean available = reader.start(); available; available = reader.advance()) { + Train train = reader.getCurrent(); + results.add(train); + } + return results; + } + + @Test + public void testReadXMLTiny() throws IOException { + File file = tempFolder.newFile("trainXMLTiny"); + Files.write(file.toPath(), tinyXML.getBytes(StandardCharsets.UTF_8)); + + BoundedSource source = + XmlIO.read() + .from(file.toPath().toString()) + .withRootElement("trains") + .withRecordElement("train") + .withRecordClass(Train.class) + .withMinBundleSize(1024) + .createSource(); + + List expectedResults = ImmutableList.of( + new Train("Thomas", Train.TRAIN_NUMBER_UNDEFINED, null, null), + new Train("Henry", Train.TRAIN_NUMBER_UNDEFINED, null, null), + new Train("James", Train.TRAIN_NUMBER_UNDEFINED, null, null)); + + assertThat( + trainsToStrings(expectedResults), + containsInAnyOrder( + trainsToStrings(readEverythingFromReader(source.createReader(null))).toArray())); + } + + @Test + public void testReadXMLWithMultiByteChars() throws IOException { + File file = tempFolder.newFile("trainXMLTiny"); + Files.write(file.toPath(), xmlWithMultiByteChars.getBytes(StandardCharsets.UTF_8)); + + BoundedSource source = + XmlIO.read() + .from(file.toPath().toString()) + .withRootElement("trains") + .withRecordElement("train") + .withRecordClass(Train.class) + .withMinBundleSize(1024) + .createSource(); + + List expectedResults = ImmutableList.of( + new Train("Thomas¥", Train.TRAIN_NUMBER_UNDEFINED, null, null), + new Train("Hen¶ry", Train.TRAIN_NUMBER_UNDEFINED, null, null), + new Train("Jamßes", Train.TRAIN_NUMBER_UNDEFINED, null, null)); + + assertThat( + trainsToStrings(expectedResults), + containsInAnyOrder( + trainsToStrings(readEverythingFromReader(source.createReader(null))).toArray())); + } + + @Test + @Ignore( + "Multi-byte characters in XML are not supported because the parser " + + "currently does not correctly report byte offsets") + public void testReadXMLWithMultiByteElementName() throws IOException { + File file = tempFolder.newFile("trainXMLTiny"); + Files.write(file.toPath(), xmlWithMultiByteElementName.getBytes(StandardCharsets.UTF_8)); + + BoundedSource source = + XmlIO.read() + .from(file.toPath().toString()) + .withRootElement("දුම්රියන්") + .withRecordElement("දුම්රිය") + .withRecordClass(Train.class) + .withMinBundleSize(1024) + .createSource(); + + List expectedResults = ImmutableList.of( + new Train("Thomas", Train.TRAIN_NUMBER_UNDEFINED, null, null), + new Train("Henry", Train.TRAIN_NUMBER_UNDEFINED, null, null), + new Train("James", Train.TRAIN_NUMBER_UNDEFINED, null, null)); + + assertThat( + trainsToStrings(expectedResults), + containsInAnyOrder( + trainsToStrings(readEverythingFromReader(source.createReader(null))).toArray())); + } + + @Test + public void testSplitWithEmptyBundleAtEnd() throws Exception { + File file = tempFolder.newFile("trainXMLTiny"); + Files.write(file.toPath(), tinyXML.getBytes(StandardCharsets.UTF_8)); + + BoundedSource source = + XmlIO.read() + .from(file.toPath().toString()) + .withRootElement("trains") + .withRecordElement("train") + .withRecordClass(Train.class) + .withMinBundleSize(10) + .createSource(); + List> splits = source.split(50, null); + + assertTrue(splits.size() > 2); + + List results = new ArrayList<>(); + for (BoundedSource split : splits) { + results.addAll(readEverythingFromReader(split.createReader(null))); + } + + List expectedResults = ImmutableList.of( + new Train("Thomas", Train.TRAIN_NUMBER_UNDEFINED, null, null), + new Train("Henry", Train.TRAIN_NUMBER_UNDEFINED, null, null), + new Train("James", Train.TRAIN_NUMBER_UNDEFINED, null, null)); + + assertThat( + trainsToStrings(expectedResults), containsInAnyOrder(trainsToStrings(results).toArray())); + } + + List trainsToStrings(List input) { + List strings = new ArrayList<>(); + for (Object data : input) { + strings.add(data.toString()); + } + return strings; + } + + @Test + public void testReadXMLSmall() throws IOException { + File file = tempFolder.newFile("trainXMLSmall"); + Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8)); + + BoundedSource source = + XmlIO.read() + .from(file.toPath().toString()) + .withRootElement("trains") + .withRecordElement("train") + .withRecordClass(Train.class) + .withMinBundleSize(1024) + .createSource(); + + List expectedResults = + ImmutableList.of(new Train("Thomas", 1, "blue", null), new Train("Henry", 3, "green", null), + new Train("Toby", 7, "brown", null), new Train("Gordon", 4, "blue", null), + new Train("Emily", -1, "red", null), new Train("Percy", 6, "green", null)); + + assertThat( + trainsToStrings(expectedResults), + containsInAnyOrder( + trainsToStrings(readEverythingFromReader(source.createReader(null))).toArray())); + } + + @Test + public void testReadXMLNoRootElement() throws IOException { + File file = tempFolder.newFile("trainXMLSmall"); + Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8)); + + BoundedSource source = + XmlIO.read() + .from(file.toPath().toString()) + .withRecordElement("train") + .withRecordClass(Train.class) + .createSource(); + + exception.expect(NullPointerException.class); + exception.expectMessage( + "rootElement is null. Use builder method withRootElement() to set this."); + readEverythingFromReader(source.createReader(null)); + } + + @Test + public void testReadXMLNoRecordElement() throws IOException { + File file = tempFolder.newFile("trainXMLSmall"); + Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8)); + + BoundedSource source = + XmlIO.read() + .from(file.toPath().toString()) + .withRootElement("trains") + .withRecordClass(Train.class) + .createSource(); + + exception.expect(NullPointerException.class); + exception.expectMessage( + "recordElement is null. Use builder method withRecordElement() to set this."); + readEverythingFromReader(source.createReader(null)); + } + + @Test + public void testReadXMLNoRecordClass() throws IOException { + File file = tempFolder.newFile("trainXMLSmall"); + Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8)); + + BoundedSource source = + XmlIO.read() + .from(file.toPath().toString()) + .withRootElement("trains") + .withRecordElement("train") + .createSource(); + + exception.expect(NullPointerException.class); + exception.expectMessage( + "recordClass is null. Use builder method withRecordClass() to set this."); + readEverythingFromReader(source.createReader(null)); + } + + @Test + public void testReadXMLIncorrectRootElement() throws IOException { + File file = tempFolder.newFile("trainXMLSmall"); + Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8)); + + BoundedSource source = + XmlIO.read() + .from(file.toPath().toString()) + .withRootElement("something") + .withRecordElement("train") + .withRecordClass(Train.class) + .createSource(); + + exception.expectMessage("Unexpected close tag ; expected ."); + readEverythingFromReader(source.createReader(null)); + } + + @Test + public void testReadXMLIncorrectRecordElement() throws IOException { + File file = tempFolder.newFile("trainXMLSmall"); + Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8)); + + BoundedSource source = + XmlIO.read() + .from(file.toPath().toString()) + .withRootElement("trains") + .withRecordElement("something") + .withRecordClass(Train.class) + .createSource(); + + assertEquals(readEverythingFromReader(source.createReader(null)), new ArrayList()); + } + + @XmlRootElement + private static class WrongTrainType { + @SuppressWarnings("unused") + public String something; + } + + @Test + public void testReadXMLInvalidRecordClass() throws IOException { + File file = tempFolder.newFile("trainXMLSmall"); + Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8)); + + BoundedSource source = + XmlIO.read() + .from(file.toPath().toString()) + .withRootElement("trains") + .withRecordElement("train") + .withRecordClass(WrongTrainType.class) + .createSource(); + + exception.expect(RuntimeException.class); + + // JAXB internationalizes the error message. So this is all we can match for. + exception.expectMessage(both(containsString("name")).and(Matchers.containsString("something"))); + try (Reader reader = source.createReader(null)) { + + List results = new ArrayList<>(); + for (boolean available = reader.start(); available; available = reader.advance()) { + WrongTrainType train = reader.getCurrent(); + results.add(train); + } + } + } + + @Test + public void testReadXMLNoBundleSize() throws IOException { + File file = tempFolder.newFile("trainXMLSmall"); + Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8)); + + BoundedSource source = + XmlIO.read() + .from(file.toPath().toString()) + .withRootElement("trains") + .withRecordElement("train") + .withRecordClass(Train.class) + .createSource(); + + List expectedResults = + ImmutableList.of(new Train("Thomas", 1, "blue", null), new Train("Henry", 3, "green", null), + new Train("Toby", 7, "brown", null), new Train("Gordon", 4, "blue", null), + new Train("Emily", -1, "red", null), new Train("Percy", 6, "green", null)); + + assertThat( + trainsToStrings(expectedResults), + containsInAnyOrder( + trainsToStrings(readEverythingFromReader(source.createReader(null))).toArray())); + } + + + @Test + public void testReadXMLWithEmptyTags() throws IOException { + File file = tempFolder.newFile("trainXMLSmall"); + Files.write(file.toPath(), trainXMLWithEmptyTags.getBytes(StandardCharsets.UTF_8)); + + BoundedSource source = + XmlIO.read() + .from(file.toPath().toString()) + .withRootElement("trains") + .withRecordElement("train") + .withRecordClass(Train.class) + .withMinBundleSize(1024) + .createSource(); + + List expectedResults = ImmutableList.of(new Train("Thomas", 1, "blue", null), + new Train("Henry", 3, "green", null), new Train("Toby", 7, "brown", null), + new Train("Gordon", 4, "blue", null), new Train("Emily", -1, "red", null), + new Train("Percy", 6, "green", null), new Train(), new Train()); + + assertThat( + trainsToStrings(expectedResults), + containsInAnyOrder( + trainsToStrings(readEverythingFromReader(source.createReader(null))).toArray())); + } + + @Test + @Category(NeedsRunner.class) + public void testReadXMLSmallPipeline() throws IOException { + File file = tempFolder.newFile("trainXMLSmall"); + Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8)); + + PCollection output = + p.apply( + "ReadFileData", + XmlIO.read() + .from(file.toPath().toString()) + .withRootElement("trains") + .withRecordElement("train") + .withRecordClass(Train.class) + .withMinBundleSize(1024)); + + List expectedResults = + ImmutableList.of(new Train("Thomas", 1, "blue", null), new Train("Henry", 3, "green", null), + new Train("Toby", 7, "brown", null), new Train("Gordon", 4, "blue", null), + new Train("Emily", -1, "red", null), new Train("Percy", 6, "green", null)); + + PAssert.that(output).containsInAnyOrder(expectedResults); + p.run(); + } + + @Test + public void testReadXMLWithAttributes() throws IOException { + File file = tempFolder.newFile("trainXMLSmall"); + Files.write(file.toPath(), trainXMLWithAttributes.getBytes(StandardCharsets.UTF_8)); + + BoundedSource source = + XmlIO.read() + .from(file.toPath().toString()) + .withRootElement("trains") + .withRecordElement("train") + .withRecordClass(Train.class) + .withMinBundleSize(1024) + .createSource(); + + List expectedResults = ImmutableList.of(new Train("Thomas", 1, "blue", "small"), + new Train("Henry", 3, "green", "big"), new Train("Toby", 7, "brown", "small"), + new Train("Gordon", 4, "blue", "big"), new Train("Emily", -1, "red", "small"), + new Train("Percy", 6, "green", "small")); + + assertThat( + trainsToStrings(expectedResults), + containsInAnyOrder( + trainsToStrings(readEverythingFromReader(source.createReader(null))).toArray())); + } + + @Test + public void testReadXMLWithWhitespaces() throws IOException { + File file = tempFolder.newFile("trainXMLSmall"); + Files.write(file.toPath(), trainXMLWithSpaces.getBytes(StandardCharsets.UTF_8)); + + BoundedSource source = + XmlIO.read() + .from(file.toPath().toString()) + .withRootElement("trains") + .withRecordElement("train") + .withRecordClass(Train.class) + .withMinBundleSize(1024) + .createSource(); + + List expectedResults = ImmutableList.of(new Train("Thomas ", 1, "blue", null), + new Train("Henry", 3, "green", null), new Train("Toby", 7, " brown ", null), + new Train("Gordon", 4, "blue", null), new Train("Emily", -1, "red", null), + new Train("Percy", 6, "green", null)); + + assertThat( + trainsToStrings(expectedResults), + containsInAnyOrder( + trainsToStrings(readEverythingFromReader(source.createReader(null))).toArray())); + } + + @Test + public void testReadXMLLarge() throws IOException { + String fileName = "temp.xml"; + List trains = generateRandomTrainList(100); + File file = createRandomTrainXML(fileName, trains); + + BoundedSource source = + XmlIO.read() + .from(file.toPath().toString()) + .withRootElement("trains") + .withRecordElement("train") + .withRecordClass(Train.class) + .withMinBundleSize(1024) + .createSource(); + + assertThat( + trainsToStrings(trains), + containsInAnyOrder( + trainsToStrings(readEverythingFromReader(source.createReader(null))).toArray())); + } + + @Test + @Category(NeedsRunner.class) + public void testReadXMLLargePipeline() throws IOException { + String fileName = "temp.xml"; + List trains = generateRandomTrainList(100); + File file = createRandomTrainXML(fileName, trains); + + PCollection output = + p.apply( + "ReadFileData", + XmlIO.read() + .from(file.toPath().toString()) + .withRootElement("trains") + .withRecordElement("train") + .withRecordClass(Train.class) + .withMinBundleSize(1024)); + + PAssert.that(output).containsInAnyOrder(trains); + p.run(); + } + + @Test + public void testSplitWithEmptyBundles() throws Exception { + String fileName = "temp.xml"; + List trains = generateRandomTrainList(10); + File file = createRandomTrainXML(fileName, trains); + + BoundedSource source = + XmlIO.read() + .from(file.toPath().toString()) + .withRootElement("trains") + .withRecordElement("train") + .withRecordClass(Train.class) + .withMinBundleSize(10) + .createSource(); + List> splits = source.split(100, null); + + assertTrue(splits.size() > 2); + + List results = new ArrayList<>(); + for (BoundedSource split : splits) { + results.addAll(readEverythingFromReader(split.createReader(null))); + } + + assertThat(trainsToStrings(trains), containsInAnyOrder(trainsToStrings(results).toArray())); + } + + @Test + public void testXMLWithSplits() throws Exception { + String fileName = "temp.xml"; + List trains = generateRandomTrainList(100); + File file = createRandomTrainXML(fileName, trains); + + BoundedSource source = + XmlIO.read() + .from(file.toPath().toString()) + .withRootElement("trains") + .withRecordElement("train") + .withRecordClass(Train.class) + .withMinBundleSize(10) + .createSource(); + List> splits = source.split(256, null); + + // Not a trivial split + assertTrue(splits.size() > 2); + + List results = new ArrayList<>(); + for (BoundedSource split : splits) { + results.addAll(readEverythingFromReader(split.createReader(null))); + } + assertThat(trainsToStrings(trains), containsInAnyOrder(trainsToStrings(results).toArray())); + } + + @Test + public void testSplitAtFraction() throws Exception { + PipelineOptions options = PipelineOptionsFactory.create(); + String fileName = "temp.xml"; + List trains = generateRandomTrainList(100); + File file = createRandomTrainXML(fileName, trains); + + BoundedSource fileSource = + XmlIO.read() + .from(file.toPath().toString()) + .withRootElement("trains") + .withRecordElement("train") + .withRecordClass(Train.class) + .withMinBundleSize(10) + .createSource(); + + List> splits = + fileSource.split(file.length() / 3, null); + for (BoundedSource splitSource : splits) { + int numItems = readEverythingFromReader(splitSource.createReader(null)).size(); + // Should not split while unstarted. + assertSplitAtFractionFails(splitSource, 0, 0.7, options); + assertSplitAtFractionSucceedsAndConsistent(splitSource, 1, 0.7, options); + assertSplitAtFractionSucceedsAndConsistent(splitSource, 15, 0.7, options); + assertSplitAtFractionFails(splitSource, 0, 0.0, options); + assertSplitAtFractionFails(splitSource, 20, 0.3, options); + assertSplitAtFractionFails(splitSource, numItems, 1.0, options); + + // After reading 100 elements we will be approximately at position + // 0.99 * (endOffset - startOffset) hence trying to split at fraction 0.9 will be + // unsuccessful. + assertSplitAtFractionFails(splitSource, numItems, 0.9, options); + + // Following passes since we can always find a fraction that is extremely close to 1 such that + // the position suggested by the fraction will be larger than the position the reader is at + // after reading "items - 1" elements. + // This also passes for "numItemsToReadBeforeSplit = items" if the position at suggested + // fraction is larger than the position the reader is at after reading all "items" elements + // (i.e., the start position of the last element). This is true for most cases but will not + // be true if reader position is only one less than the end position. (i.e., the last element + // of the bundle start at the last byte that belongs to the bundle). + assertSplitAtFractionSucceedsAndConsistent(splitSource, numItems - 1, 0.999, options); + } + } + + @Test + public void testSplitAtFractionExhaustiveSingleByte() throws Exception { + PipelineOptions options = PipelineOptionsFactory.create(); + File file = tempFolder.newFile("trainXMLSmall"); + Files.write(file.toPath(), trainXMLWithAllFeaturesSingleByte.getBytes(StandardCharsets.UTF_8)); + + BoundedSource source = + XmlIO.read() + .from(file.toPath().toString()) + .withRootElement("trains") + .withRecordElement("train") + .withRecordClass(Train.class) + .createSource(); + assertSplitAtFractionExhaustive(source, options); + } + + @Test + @Ignore( + "Multi-byte characters in XML are not supported because the parser " + + "currently does not correctly report byte offsets") + public void testSplitAtFractionExhaustiveMultiByte() throws Exception { + PipelineOptions options = PipelineOptionsFactory.create(); + File file = tempFolder.newFile("trainXMLSmall"); + Files.write(file.toPath(), trainXMLWithAllFeaturesMultiByte.getBytes(StandardCharsets.UTF_8)); + + BoundedSource source = + XmlIO.read() + .from(file.toPath().toString()) + .withRootElement("දුම්රියන්") + .withRecordElement("දුම්රිය") + .withRecordClass(Train.class) + .createSource(); + assertSplitAtFractionExhaustive(source, options); + } + + @Test + @Category(NeedsRunner.class) + public void testReadXMLFilePattern() throws IOException { + List trains1 = generateRandomTrainList(20); + File file = createRandomTrainXML("temp1.xml", trains1); + List trains2 = generateRandomTrainList(10); + createRandomTrainXML("temp2.xml", trains2); + List trains3 = generateRandomTrainList(15); + createRandomTrainXML("temp3.xml", trains3); + generateRandomTrainList(8); + createRandomTrainXML("otherfile.xml", trains1); + + PCollection output = + p.apply( + "ReadFileData", + XmlIO.read() + .from(file.getParent() + "/" + "temp*.xml") + .withRootElement("trains") + .withRecordElement("train") + .withRecordClass(Train.class) + .withMinBundleSize(1024)); + + List expectedResults = new ArrayList<>(); + expectedResults.addAll(trains1); + expectedResults.addAll(trains2); + expectedResults.addAll(trains3); + + PAssert.that(output).containsInAnyOrder(expectedResults); + p.run(); + } + + @Test + public void testDisplayData() { + DisplayData displayData = + DisplayData.from( + XmlIO.read() + .from("foo.xml") + .withRootElement("bird") + .withRecordElement("cat") + .withMinBundleSize(1234) + .withRecordClass(Integer.class)); + + assertThat(displayData, hasDisplayItem("filePattern", "foo.xml")); + assertThat(displayData, hasDisplayItem("rootElement", "bird")); + assertThat(displayData, hasDisplayItem("recordElement", "cat")); + assertThat(displayData, hasDisplayItem("recordClass", Integer.class)); + assertThat(displayData, hasDisplayItem("minBundleSize", 1234)); + } +}