beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [1/3] beam git commit: [BEAM-2016] Delete HdfsFileSource & Sink
Date Fri, 05 May 2017 16:57:22 GMT
Repository: beam
Updated Branches:
  refs/heads/master 3bffe0e00 -> 610bda168


http://git-wip-us.apache.org/repos/asf/beam/blob/7512a73c/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java
b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java
deleted file mode 100644
index 9fa6606..0000000
--- a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hdfs;
-
-import static org.junit.Assert.assertEquals;
-
-import com.google.common.base.MoreObjects;
-import java.io.File;
-import java.nio.charset.Charset;
-import java.nio.file.Files;
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-import java.util.UUID;
-import org.apache.avro.file.DataFileReader;
-import org.apache.avro.file.FileReader;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.beam.sdk.coders.AvroCoder;
-import org.apache.beam.sdk.coders.DefaultCoder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.values.KV;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-/**
- * Tests for HDFSFileSinkTest.
- */
-public class HDFSFileSinkTest {
-
-  @Rule
-  public TemporaryFolder tmpFolder = new TemporaryFolder();
-
-  private final String part0 = "part-r-00000";
-  private final String foobar = "foobar";
-
-  private <T> void doWrite(Sink<T> sink,
-                           PipelineOptions options,
-                           Iterable<T> toWrite) throws Exception {
-    Sink.WriteOperation<T, String> writeOperation =
-        (Sink.WriteOperation<T, String>) sink.createWriteOperation();
-    Sink.Writer<T, String> writer = writeOperation.createWriter(options);
-    writer.openUnwindowed(UUID.randomUUID().toString(),  -1, -1);
-    for (T t: toWrite) {
-      writer.write(t);
-    }
-    String writeResult = writer.close();
-    writeOperation.finalize(Collections.singletonList(writeResult), options);
-  }
-
-  @Test
-  public void testWriteSingleRecord() throws Exception {
-    PipelineOptions options = PipelineOptionsFactory.create();
-    File file = tmpFolder.newFolder();
-
-    HDFSFileSink<String, NullWritable, Text> sink =
-        HDFSFileSink.to(
-            file.toString(),
-            SequenceFileOutputFormat.class,
-            NullWritable.class,
-            Text.class,
-            new SerializableFunction<String, KV<NullWritable, Text>>() {
-              @Override
-              public KV<NullWritable, Text> apply(String input) {
-                return KV.of(NullWritable.get(), new Text(input));
-              }
-            });
-
-    doWrite(sink, options, Collections.singletonList(foobar));
-
-    SequenceFile.Reader.Option opts =
-        SequenceFile.Reader.file(new Path(file.toString(), part0));
-    SequenceFile.Reader reader = new SequenceFile.Reader(new Configuration(), opts);
-    assertEquals(NullWritable.class.getName(), reader.getKeyClassName());
-    assertEquals(Text.class.getName(), reader.getValueClassName());
-    NullWritable k = NullWritable.get();
-    Text v = new Text();
-    assertEquals(true, reader.next(k, v));
-    assertEquals(NullWritable.get(), k);
-    assertEquals(new Text(foobar), v);
-  }
-
-  @Test
-  public void testToText() throws Exception {
-    PipelineOptions options = PipelineOptionsFactory.create();
-    File file = tmpFolder.newFolder();
-
-    HDFSFileSink<String, NullWritable, Text> sink = HDFSFileSink.toText(file.toString());
-
-    doWrite(sink, options, Collections.singletonList(foobar));
-
-    List<String> strings = Files.readAllLines(new File(file.toString(), part0).toPath(),
-        Charset.forName("UTF-8"));
-    assertEquals(Collections.singletonList(foobar), strings);
-  }
-
-  @DefaultCoder(AvroCoder.class)
-  static class GenericClass {
-    int intField;
-    String stringField;
-    public GenericClass() {}
-    public GenericClass(int intValue, String stringValue) {
-      this.intField = intValue;
-      this.stringField = stringValue;
-    }
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(getClass())
-          .add("intField", intField)
-          .add("stringField", stringField)
-          .toString();
-    }
-    @Override
-    public int hashCode() {
-      return Objects.hash(intField, stringField);
-    }
-    @Override
-    public boolean equals(Object other) {
-      if (other == null || !(other instanceof GenericClass)) {
-        return false;
-      }
-      GenericClass o = (GenericClass) other;
-      return Objects.equals(intField, o.intField) && Objects.equals(stringField,
o.stringField);
-    }
-  }
-
-  @Test
-  public void testToAvro() throws Exception {
-    PipelineOptions options = PipelineOptionsFactory.create();
-    File file = tmpFolder.newFolder();
-
-    HDFSFileSink<GenericClass, AvroKey<GenericClass>, NullWritable> sink = HDFSFileSink.toAvro(
-        file.toString(),
-        AvroCoder.of(GenericClass.class),
-        new Configuration(false));
-
-    doWrite(sink, options, Collections.singletonList(new GenericClass(3, "foobar")));
-
-    GenericDatumReader datumReader = new GenericDatumReader();
-    FileReader<GenericData.Record> reader =
-        DataFileReader.openReader(new File(file.getAbsolutePath(), part0 + ".avro"), datumReader);
-    GenericData.Record next = reader.next(null);
-    assertEquals("foobar", next.get("stringField").toString());
-    assertEquals(3, next.get("intField"));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/7512a73c/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java
b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java
deleted file mode 100644
index a964239..0000000
--- a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hdfs;
-
-import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.Source;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.SourceTestUtils;
-import org.apache.beam.sdk.values.KV;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.Writer;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-/**
- * Tests for HDFSFileSource.
- */
-public class HDFSFileSourceTest {
-
-  private Random random = new Random(0L);
-
-  @Rule
-  public TemporaryFolder tmpFolder = new TemporaryFolder();
-
-  @Test
-  public void testFullyReadSingleFile() throws Exception {
-    PipelineOptions options = PipelineOptionsFactory.create();
-    List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 10,
0);
-    File file = createFileWithData("tmp.seq", expectedResults);
-
-    HDFSFileSource<KV<IntWritable, Text>, IntWritable, Text> source =
-            HDFSFileSource.from(
-                    file.toString(), SequenceFileInputFormat.class, IntWritable.class, Text.class);
-
-    assertEquals(file.length(), source.getEstimatedSizeBytes(null));
-
-    assertThat(expectedResults, containsInAnyOrder(readFromSource(source, options).toArray()));
-  }
-
-  @Test
-  public void testFullyReadSingleFileWithSpaces() throws Exception {
-    PipelineOptions options = PipelineOptionsFactory.create();
-    List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 10,
0);
-    File file = createFileWithData("tmp data.seq", expectedResults);
-
-    HDFSFileSource<KV<IntWritable, Text>, IntWritable, Text> source =
-            HDFSFileSource.from(
-                    file.toString(), SequenceFileInputFormat.class, IntWritable.class, Text.class);
-
-    assertEquals(file.length(), source.getEstimatedSizeBytes(null));
-
-    assertThat(expectedResults, containsInAnyOrder(readFromSource(source, options).toArray()));
-  }
-
-  @Test
-  public void testFullyReadFilePattern() throws IOException {
-    PipelineOptions options = PipelineOptionsFactory.create();
-    List<KV<IntWritable, Text>> data1 = createRandomRecords(3, 10, 0);
-    File file1 = createFileWithData("file1", data1);
-
-    List<KV<IntWritable, Text>> data2 = createRandomRecords(3, 10, 10);
-    createFileWithData("file2", data2);
-
-    List<KV<IntWritable, Text>> data3 = createRandomRecords(3, 10, 20);
-    createFileWithData("file3", data3);
-
-    List<KV<IntWritable, Text>> data4 = createRandomRecords(3, 10, 30);
-    createFileWithData("otherfile", data4);
-
-    List<KV<IntWritable, Text>> expectedResults = new ArrayList<>();
-    expectedResults.addAll(data1);
-    expectedResults.addAll(data2);
-    expectedResults.addAll(data3);
-
-    HDFSFileSource<KV<IntWritable, Text>, IntWritable, Text> source =
-        HDFSFileSource.from(
-            new File(file1.getParent(), "file*").toString(), SequenceFileInputFormat.class,
-            IntWritable.class, Text.class);
-
-    assertThat(expectedResults, containsInAnyOrder(readFromSource(source, options).toArray()));
-  }
-
-  @Test
-  public void testCloseUnstartedFilePatternReader() throws IOException {
-    PipelineOptions options = PipelineOptionsFactory.create();
-    List<KV<IntWritable, Text>> data1 = createRandomRecords(3, 10, 0);
-    File file1 = createFileWithData("file1", data1);
-
-    List<KV<IntWritable, Text>> data2 = createRandomRecords(3, 10, 10);
-    createFileWithData("file2", data2);
-
-    List<KV<IntWritable, Text>> data3 = createRandomRecords(3, 10, 20);
-    createFileWithData("file3", data3);
-
-    List<KV<IntWritable, Text>> data4 = createRandomRecords(3, 10, 30);
-    createFileWithData("otherfile", data4);
-
-    HDFSFileSource<KV<IntWritable, Text>, IntWritable, Text> source =
-        HDFSFileSource.from(
-            new File(file1.getParent(), "file*").toString(),
-            SequenceFileInputFormat.class, IntWritable.class, Text.class);
-    Source.Reader<KV<IntWritable, Text>> reader = source.createReader(options);
-
-    // Closing an unstarted FilePatternReader should not throw an exception.
-    try {
-      reader.close();
-    } catch (Exception e) {
-      fail("Closing an unstarted FilePatternReader should not throw an exception");
-    }
-  }
-
-  @Test
-  public void testSplits() throws Exception {
-    PipelineOptions options = PipelineOptionsFactory.create();
-
-    List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 10000,
0);
-    File file = createFileWithData("tmp.seq", expectedResults);
-
-    HDFSFileSource<KV<IntWritable, Text>, IntWritable, Text> source =
-        HDFSFileSource.from(
-            file.toString(), SequenceFileInputFormat.class, IntWritable.class, Text.class);
-
-    // Assert that the source produces the expected records
-    assertEquals(expectedResults, readFromSource(source, options));
-
-    // Split with a small bundle size (has to be at least size of sync interval)
-    List<? extends BoundedSource<KV<IntWritable, Text>>> splits = source
-        .split(SequenceFile.SYNC_INTERVAL, options);
-    assertTrue(splits.size() > 2);
-    SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options);
-    int nonEmptySplits = 0;
-    for (BoundedSource<KV<IntWritable, Text>> subSource : splits) {
-      if (readFromSource(subSource, options).size() > 0) {
-        nonEmptySplits += 1;
-      }
-    }
-    assertTrue(nonEmptySplits > 2);
-  }
-
-  @Test
-  public void testSplitEstimatedSize() throws Exception {
-    PipelineOptions options = PipelineOptionsFactory.create();
-
-    List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 10000,
0);
-    File file = createFileWithData("tmp.avro", expectedResults);
-
-    HDFSFileSource<KV<IntWritable, Text>, IntWritable, Text> source =
-        HDFSFileSource.from(file.toString(), SequenceFileInputFormat.class,
-            IntWritable.class, Text.class);
-
-    long originalSize = source.getEstimatedSizeBytes(options);
-    long splitTotalSize = 0;
-    List<? extends BoundedSource<KV<IntWritable, Text>>> splits = source.split(
-        SequenceFile.SYNC_INTERVAL, options
-    );
-    for (BoundedSource<KV<IntWritable, Text>> splitSource : splits) {
-      splitTotalSize += splitSource.getEstimatedSizeBytes(options);
-    }
-    // Assert that the estimated size of the whole is the sum of its parts
-    assertEquals(originalSize, splitTotalSize);
-  }
-
-  private File createFileWithData(String filename, List<KV<IntWritable, Text>>
records)
-      throws IOException {
-    File tmpFile = tmpFolder.newFile(filename);
-    try (Writer writer = SequenceFile.createWriter(new Configuration(),
-        Writer.keyClass(IntWritable.class), Writer.valueClass(Text.class),
-        Writer.file(new Path(tmpFile.toURI())))) {
-
-      for (KV<IntWritable, Text> record : records) {
-        writer.append(record.getKey(), record.getValue());
-      }
-    }
-    return tmpFile;
-  }
-
-  private List<KV<IntWritable, Text>> createRandomRecords(int dataItemLength,
-                                                          int numItems, int offset) {
-    List<KV<IntWritable, Text>> records = new ArrayList<>();
-    for (int i = 0; i < numItems; i++) {
-      IntWritable key = new IntWritable(i + offset);
-      Text value = new Text(createRandomString(dataItemLength));
-      records.add(KV.of(key, value));
-    }
-    return records;
-  }
-
-  private String createRandomString(int length) {
-    char[] chars = "abcdefghijklmnopqrstuvwxyz".toCharArray();
-    StringBuilder builder = new StringBuilder();
-    for (int i = 0; i < length; i++) {
-      builder.append(chars[random.nextInt(chars.length)]);
-    }
-    return builder.toString();
-  }
-
-}


Mime
View raw message