beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [3/4] beam git commit: Remove obsolete classes
Date Thu, 23 Feb 2017 00:20:26 GMT
Remove obsolete classes


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ff208ccd
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ff208ccd
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ff208ccd

Branch: refs/heads/master
Commit: ff208ccd3e4493ce1eb0690f95c27ad270fe191a
Parents: f303b98
Author: Rafal Wojdyla <rav@spotify.com>
Authored: Thu Feb 16 00:22:28 2017 -0500
Committer: Dan Halperin <dhalperi@google.com>
Committed: Wed Feb 22 16:20:14 2017 -0800

----------------------------------------------------------------------
 .../beam/sdk/io/hdfs/AvroHDFSFileSource.java    | 142 -------------------
 .../beam/sdk/io/hdfs/AvroWrapperCoder.java      | 114 ---------------
 .../SimpleAuthAvroHDFSFileSource.java           |  82 -----------
 .../hdfs/simpleauth/SimpleAuthHDFSFileSink.java | 131 -----------------
 .../simpleauth/SimpleAuthHDFSFileSource.java    | 117 ---------------
 .../sdk/io/hdfs/simpleauth/package-info.java    |  22 ---
 .../beam/sdk/io/hdfs/AvroWrapperCoderTest.java  |  50 -------
 7 files changed, 658 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/ff208ccd/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/AvroHDFSFileSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/AvroHDFSFileSource.java
b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/AvroHDFSFileSource.java
deleted file mode 100644
index 92fe5a6..0000000
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/AvroHDFSFileSource.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hdfs;
-
-import com.google.common.base.Function;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import java.io.IOException;
-import java.util.List;
-import javax.annotation.Nullable;
-import org.apache.avro.Schema;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapreduce.AvroJob;
-import org.apache.avro.mapreduce.AvroKeyInputFormat;
-import org.apache.beam.sdk.coders.AvroCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.values.KV;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
-/**
- * A {@code BoundedSource} for reading Avro files resident in a Hadoop filesystem.
- *
- * @param <T> The type of the Avro records to be read from the source.
- */
-public class AvroHDFSFileSource<T> extends HDFSFileSource<AvroKey<T>, NullWritable>
{
-  private static final long serialVersionUID = 0L;
-
-  protected final AvroCoder<T> avroCoder;
-  private final String schemaStr;
-
-  public AvroHDFSFileSource(String filepattern, AvroCoder<T> avroCoder) {
-    this(filepattern, avroCoder, null);
-  }
-
-  public AvroHDFSFileSource(String filepattern,
-                            AvroCoder<T> avroCoder,
-                            HDFSFileSource.SerializableSplit serializableSplit) {
-    super(filepattern,
-        ClassUtil.<AvroKeyInputFormat<T>>castClass(AvroKeyInputFormat.class),
-        ClassUtil.<AvroKey<T>>castClass(AvroKey.class),
-        NullWritable.class, serializableSplit);
-    this.avroCoder = avroCoder;
-    this.schemaStr = avroCoder.getSchema().toString();
-  }
-
-  @Override
-  public Coder<KV<AvroKey<T>, NullWritable>> getDefaultOutputCoder() {
-    AvroWrapperCoder<AvroKey<T>, T> keyCoder = AvroWrapperCoder.of(this.getKeyClass(),
avroCoder);
-    return KvCoder.of(keyCoder, WritableCoder.of(NullWritable.class));
-  }
-
-  @Override
-  public List<? extends AvroHDFSFileSource<T>> splitIntoBundles(
-      long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
-    if (serializableSplit == null) {
-      return Lists.transform(computeSplits(desiredBundleSizeBytes),
-          new Function<InputSplit, AvroHDFSFileSource<T>>() {
-            @Override
-            public AvroHDFSFileSource<T> apply(@Nullable InputSplit inputSplit) {
-              return new AvroHDFSFileSource<>(
-                  filepattern, avroCoder, new SerializableSplit(inputSplit));
-            }
-          });
-    } else {
-      return ImmutableList.of(this);
-    }
-  }
-
-  @Override
-  public BoundedReader<KV<AvroKey<T>, NullWritable>> createReader(PipelineOptions
options)
-      throws IOException {
-    this.validate();
-
-    Schema schema = new Schema.Parser().parse(schemaStr);
-    if (serializableSplit == null) {
-      return new AvroHDFSFileReader<>(this, filepattern, formatClass, schema);
-    } else {
-      return new AvroHDFSFileReader<>(this, filepattern, formatClass, schema,
-          serializableSplit.getSplit());
-    }
-  }
-
-  static class AvroHDFSFileReader<T> extends HDFSFileReader<AvroKey<T>, NullWritable>
{
-    public AvroHDFSFileReader(BoundedSource<KV<AvroKey<T>, NullWritable>>
source,
-                              String filepattern,
-                              Class<? extends FileInputFormat<?, ?>> formatClass,
-                              Schema schema) throws IOException {
-      this(source, filepattern, formatClass, schema, null);
-    }
-
-    public AvroHDFSFileReader(BoundedSource<KV<AvroKey<T>, NullWritable>>
source,
-                              String filepattern,
-                              Class<? extends FileInputFormat<?, ?>> formatClass,
-                              Schema schema, InputSplit split) throws IOException {
-      super(source, filepattern, formatClass, split);
-      AvroJob.setInputKeySchema(job, schema);
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    protected KV<AvroKey<T>, NullWritable> nextPair() throws IOException, InterruptedException
{
-      AvroKey<T> key = currentReader.getCurrentKey();
-      NullWritable value = currentReader.getCurrentValue();
-
-      // clone the record to work around identical element issue due to object reuse
-      Coder<T> avroCoder = ((AvroHDFSFileSource<T>) this.getCurrentSource()).avroCoder;
-      key = new AvroKey<>(CoderUtils.clone(avroCoder, key.datum()));
-
-      return KV.of(key, value);
-    }
-
-  }
-
-  static class ClassUtil {
-    @SuppressWarnings("unchecked")
-    static <T> Class<T> castClass(Class<?> aClass) {
-      return (Class<T>) aClass;
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/ff208ccd/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/AvroWrapperCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/AvroWrapperCoder.java
b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/AvroWrapperCoder.java
deleted file mode 100644
index 7e01846..0000000
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/AvroWrapperCoder.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hdfs;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Collections;
-import java.util.List;
-import org.apache.avro.mapred.AvroWrapper;
-import org.apache.beam.sdk.coders.AvroCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.StandardCoder;
-import org.apache.beam.sdk.util.CloudObject;
-import org.apache.beam.sdk.util.PropertyNames;
-
-/**
- * A {@code AvroWrapperCoder} is a {@link Coder} for a Java class that implements {@link
- * AvroWrapper}.
- *
- * @param <WrapperT> the type of the wrapper
- * @param <DatumT> the type of the datum
- */
-public class AvroWrapperCoder<WrapperT extends AvroWrapper<DatumT>, DatumT>
-    extends StandardCoder<WrapperT> {
-  private static final long serialVersionUID = 0L;
-
-  private final Class<WrapperT> wrapperType;
-  private final AvroCoder<DatumT> datumCoder;
-
-  private AvroWrapperCoder(Class<WrapperT> wrapperType, AvroCoder<DatumT> datumCoder)
{
-    this.wrapperType = wrapperType;
-    this.datumCoder = datumCoder;
-  }
-
-  /**
-   * Return a {@code AvroWrapperCoder} instance for the provided element class.
-   * @param <WrapperT> the type of the wrapper
-   * @param <DatumT> the type of the datum
-   */
-  public static <WrapperT extends AvroWrapper<DatumT>, DatumT>
-  AvroWrapperCoder<WrapperT, DatumT>of(Class<WrapperT> wrapperType, AvroCoder<DatumT>
datumCoder) {
-    return new AvroWrapperCoder<>(wrapperType, datumCoder);
-  }
-
-  @JsonCreator
-  @SuppressWarnings({"unchecked", "rawtypes"})
-  public static AvroWrapperCoder<?, ?> of(
-      @JsonProperty("wrapperType") String wrapperType,
-      @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) List<Coder<?>> components)
-      throws ClassNotFoundException {
-    Class<?> clazz = Class.forName(wrapperType);
-    if (!AvroWrapper.class.isAssignableFrom(clazz)) {
-      throw new ClassNotFoundException(
-          "Class " + wrapperType + " does not implement AvroWrapper");
-    }
-    checkArgument(components.size() == 1, "Expecting 1 component, got " + components.size());
-    return of((Class<? extends AvroWrapper>) clazz, (AvroCoder<?>) components.get(0));
-  }
-
-  @Override
-  public void encode(WrapperT value, OutputStream outStream, Context context) throws IOException
{
-    datumCoder.encode(value.datum(), outStream, context);
-  }
-
-  @Override
-  public WrapperT decode(InputStream inStream, Context context) throws IOException {
-    try {
-      WrapperT wrapper = wrapperType.newInstance();
-      wrapper.datum(datumCoder.decode(inStream, context));
-      return wrapper;
-    } catch (InstantiationException | IllegalAccessException e) {
-      throw new CoderException("unable to deserialize record", e);
-    }
-  }
-
-  @Override
-  public List<? extends Coder<?>> getCoderArguments() {
-    return Collections.singletonList(datumCoder);
-  }
-
-  @Override
-  public CloudObject initializeCloudObject() {
-    CloudObject result = CloudObject.forClass(getClass());
-    result.put("wrapperType", wrapperType.getName());
-    return result;
-  }
-
-  @Override
-  public void verifyDeterministic() throws NonDeterministicException {
-    datumCoder.verifyDeterministic();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/ff208ccd/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/simpleauth/SimpleAuthAvroHDFSFileSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/simpleauth/SimpleAuthAvroHDFSFileSource.java
b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/simpleauth/SimpleAuthAvroHDFSFileSource.java
deleted file mode 100644
index 547413f..0000000
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/simpleauth/SimpleAuthAvroHDFSFileSource.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hdfs.simpleauth;
-
-import com.google.common.base.Function;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import java.util.List;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.coders.AvroCoder;
-import org.apache.beam.sdk.io.hdfs.AvroHDFSFileSource;
-import org.apache.beam.sdk.io.hdfs.HDFSFileSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.hadoop.mapreduce.InputSplit;
-
-/**
- * Source for Avros on Hadoop/HDFS with Simple Authentication.
- *
- * <p>Allows to set arbitrary username as HDFS user, which is used for reading Avro
from HDFS.
- */
-public class SimpleAuthAvroHDFSFileSource<T> extends AvroHDFSFileSource<T> {
-  // keep this field to pass Hadoop user between workers
-  private final String username;
-
-  /**
-   * Create a {@code SimpleAuthAvroHDFSFileSource} based on a file or a file pattern specification.
-   * @param username HDFS username.
-   */
-  public SimpleAuthAvroHDFSFileSource(String filepattern,
-                                      AvroCoder<T> avroCoder,
-                                      String username) {
-    super(filepattern, avroCoder);
-    this.username = username;
-  }
-
-  /**
-   * Create a {@code SimpleAuthAvroHDFSFileSource} based on a single Hadoop input split,
which won't
-   * be split up further.
-   * @param username HDFS username.
-   */
-  public SimpleAuthAvroHDFSFileSource(String filepattern,
-                                      AvroCoder<T> avroCoder,
-                                      HDFSFileSource.SerializableSplit serializableSplit,
-                                      String username) {
-    super(filepattern, avroCoder, serializableSplit);
-    this.username = username;
-  }
-
-  @Override
-  public List<? extends AvroHDFSFileSource<T>> splitIntoBundles(long desiredBundleSizeBytes,
-                                                                PipelineOptions options)
-      throws Exception {
-    if (serializableSplit == null) {
-      return Lists.transform(computeSplits(desiredBundleSizeBytes),
-          new Function<InputSplit, AvroHDFSFileSource<T>>() {
-            @Override
-            public AvroHDFSFileSource<T> apply(@Nullable InputSplit inputSplit) {
-              return new SimpleAuthAvroHDFSFileSource<>(
-                  filepattern, avroCoder, new HDFSFileSource.SerializableSplit(inputSplit),
-                  username);
-            }
-          });
-    } else {
-      return ImmutableList.of(this);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/ff208ccd/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/simpleauth/SimpleAuthHDFSFileSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/simpleauth/SimpleAuthHDFSFileSink.java
b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/simpleauth/SimpleAuthHDFSFileSink.java
deleted file mode 100644
index 28accfa..0000000
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/simpleauth/SimpleAuthHDFSFileSink.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hdfs.simpleauth;
-
-import java.security.PrivilegedExceptionAction;
-import org.apache.beam.sdk.io.Sink;
-import org.apache.beam.sdk.io.hdfs.HDFSFileSink;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.values.KV;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.security.UserGroupInformation;
-
-/**
- * A {@code Sink} for writing records to a Hadoop filesystem using a Hadoop file-based output
- * format with Simple Authentication.
- *
- * <p>Allows arbitrary username as HDFS user, which is used for writing to HDFS.
- *
- * @param <K> The type of keys to be written to the sink.
- * @param <V> The type of values to be written to the sink.
- */
-public class SimpleAuthHDFSFileSink<K, V> extends HDFSFileSink<K, V> {
-  private final String username;
-
-  public SimpleAuthHDFSFileSink(String path,
-                                Class<? extends FileOutputFormat<K, V>> formatClass,
-                                Configuration conf,
-                                String username) {
-    super(path, formatClass, conf);
-    this.username = username;
-  }
-
-  @Override
-  public WriteOperation<KV<K, V>, ?> createWriteOperation(PipelineOptions options)
{
-    return new SimpleAuthHDFSWriteOperation<>(this, path, formatClass, username);
-  }
-
-  /** {{@link WriteOperation}} for HDFS with Simple Authentication. */
-  public static class SimpleAuthHDFSWriteOperation<K, V> extends HDFSWriteOperation<K,
V> {
-    private final String username;
-
-    SimpleAuthHDFSWriteOperation(Sink<KV<K, V>> sink,
-                                 String path,
-                                 Class<? extends FileOutputFormat<K, V>> formatClass,
-                                 String username) {
-      super(sink, path, formatClass);
-      this.username = username;
-    }
-
-    @Override
-    public void finalize(final Iterable<String> writerResults, final PipelineOptions
options)
-        throws Exception {
-      UserGroupInformation.createRemoteUser(username).doAs(new PrivilegedExceptionAction<Void>()
{
-        @Override
-        public Void run() throws Exception {
-          superFinalize(writerResults, options);
-          return null;
-        }
-      });
-    }
-
-    private void superFinalize(Iterable<String> writerResults, PipelineOptions options)
-        throws Exception {
-      super.finalize(writerResults, options);
-    }
-
-    @Override
-    public Writer<KV<K, V>, String> createWriter(PipelineOptions options) throws
Exception {
-      return new SimpleAuthHDFSWriter<>(this, path, formatClass, username);
-    }
-  }
-
-  /** {{@link Writer}} for HDFS files with Simple Authentication. */
-  public static class SimpleAuthHDFSWriter<K, V> extends HDFSWriter<K, V> {
-    private final UserGroupInformation ugi;
-
-    public SimpleAuthHDFSWriter(SimpleAuthHDFSWriteOperation<K, V> writeOperation,
-                                String path,
-                                Class<? extends FileOutputFormat<K, V>> formatClass,
-                                String username) {
-      super(writeOperation, path, formatClass);
-      ugi = UserGroupInformation.createRemoteUser(username);
-    }
-
-    @Override
-    public void open(final String uId) throws Exception {
-      ugi.doAs(new PrivilegedExceptionAction<Void>() {
-        @Override
-        public Void run() throws Exception {
-          superOpen(uId);
-          return null;
-        }
-      });
-    }
-
-    private void superOpen(String uId) throws Exception {
-      super.open(uId);
-    }
-
-    @Override
-    public String close() throws Exception {
-      return ugi.doAs(new PrivilegedExceptionAction<String>() {
-        @Override
-        public String run() throws Exception {
-          return superClose();
-        }
-      });
-    }
-
-    private String superClose() throws Exception {
-      return super.close();
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/ff208ccd/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/simpleauth/SimpleAuthHDFSFileSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/simpleauth/SimpleAuthHDFSFileSource.java
b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/simpleauth/SimpleAuthHDFSFileSource.java
deleted file mode 100644
index 22191f0..0000000
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/simpleauth/SimpleAuthHDFSFileSource.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hdfs.simpleauth;
-
-import com.google.common.base.Function;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import java.util.List;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.io.hdfs.HDFSFileSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.values.KV;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
-/**
- * Source for Hadoop/HDFS with Simple Authentication.
- *
- * <p>Allows to set arbitrary username as HDFS user, which is used for reading from
HDFS.
- */
-public class SimpleAuthHDFSFileSource<K, V> extends HDFSFileSource<K, V> {
-  private final String username;
-  /**
-   * Create a {@code SimpleAuthHDFSFileSource} based on a single Hadoop input split, which
won't be
-   * split up further.
-   * @param username HDFS username.
-   */
-  protected SimpleAuthHDFSFileSource(String filepattern,
-                                     Class<? extends FileInputFormat<?, ?>> formatClass,
-                                     Class<K> keyClass,
-                                     Class<V> valueClass,
-                                     HDFSFileSource.SerializableSplit serializableSplit,
-                                     String username) {
-    super(filepattern, formatClass, keyClass, valueClass, serializableSplit);
-    this.username = username;
-  }
-
-  /**
-   * Create a {@code SimpleAuthHDFSFileSource} based on a file or a file pattern specification.
-   * @param username HDFS username.
-   */
-  protected SimpleAuthHDFSFileSource(String filepattern,
-                                     Class<? extends FileInputFormat<?, ?>> formatClass,
-                                     Class<K> keyClass,
-                                     Class<V> valueClass,
-                                     String username) {
-    super(filepattern, formatClass, keyClass, valueClass);
-    this.username = username;
-  }
-
-  /**
-   * Creates a {@code Read} transform that will read from an {@code SimpleAuthHDFSFileSource}
-   * with the given file name or pattern ("glob") using the given Hadoop {@link FileInputFormat},
-   * with key-value types specified
-   * by the given key class and value class.
-   * @param username HDFS username.
-   */
-  public static <K, V, T extends FileInputFormat<K, V>> Read.Bounded<KV<K,
V>> readFrom(
-      String filepattern,
-      Class<T> formatClass,
-      Class<K> keyClass,
-      Class<V> valueClass,
-      String username) {
-    return Read.from(from(filepattern, formatClass, keyClass, valueClass, username));
-  }
-
-  /**
-   * Creates a {@code SimpleAuthHDFSFileSource} that reads from the given file name or pattern
-   * ("glob") using the given Hadoop {@link FileInputFormat}, with key-value types specified
by the
-   * given key class and value class.
-   * @param username HDFS username.
-   */
-  public static <K, V, T extends FileInputFormat<K, V>> HDFSFileSource<K,
V> from(
-      String filepattern,
-      Class<T> formatClass,
-      Class<K> keyClass,
-      Class<V> valueClass,
-      String username) {
-    return new SimpleAuthHDFSFileSource<>(filepattern, formatClass, keyClass, valueClass,
username);
-  }
-
-  @Override
-  public List<? extends BoundedSource<KV<K, V>>> splitIntoBundles(
-      long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
-    if (serializableSplit == null) {
-      return Lists.transform(computeSplits(desiredBundleSizeBytes),
-          new Function<InputSplit, BoundedSource<KV<K, V>>>() {
-            @Nullable
-            @Override
-            public BoundedSource<KV<K, V>> apply(@Nullable InputSplit inputSplit)
{
-              return new SimpleAuthHDFSFileSource<>(filepattern, formatClass, keyClass,
-                  valueClass, new HDFSFileSource.SerializableSplit(inputSplit),
-                  username);
-            }
-          });
-    } else {
-      return ImmutableList.of(this);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/ff208ccd/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/simpleauth/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/simpleauth/package-info.java
b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/simpleauth/package-info.java
deleted file mode 100644
index 201f227..0000000
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/simpleauth/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Transforms used to read from the Hadoop file system (HDFS) with authentication.
- */
-package org.apache.beam.sdk.io.hdfs.simpleauth;

http://git-wip-us.apache.org/repos/asf/beam/blob/ff208ccd/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/AvroWrapperCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/AvroWrapperCoderTest.java
b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/AvroWrapperCoderTest.java
deleted file mode 100644
index 4d9c819..0000000
--- a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/AvroWrapperCoderTest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hdfs;
-
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapred.AvroValue;
-import org.apache.beam.sdk.coders.AvroCoder;
-import org.apache.beam.sdk.testing.CoderProperties;
-import org.junit.Test;
-
-/**
- * Tests for AvroWrapperCoder.
- */
-public class AvroWrapperCoderTest {
-
-  @Test
-  public void testAvroKeyEncoding() throws Exception {
-    AvroKey<Integer> value = new AvroKey<>(42);
-    AvroWrapperCoder<AvroKey<Integer>, Integer> coder = AvroWrapperCoder.of(
-        AvroHDFSFileSource.ClassUtil.<AvroKey<Integer>>castClass(AvroKey.class),
-        AvroCoder.of(Integer.class));
-
-    CoderProperties.coderDecodeEncodeEqual(coder, value);
-  }
-
-  @Test
-  public void testAvroValueEncoding() throws Exception {
-    AvroValue<Integer> value = new AvroValue<>(42);
-    AvroWrapperCoder<AvroValue<Integer>, Integer> coder = AvroWrapperCoder.of(
-        AvroHDFSFileSource.ClassUtil.<AvroValue<Integer>>castClass(AvroValue.class),
-        AvroCoder.of(Integer.class));
-
-    CoderProperties.coderDecodeEncodeEqual(coder, value);
-  }
-}


Mime
View raw message