crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject crunch git commit: CRUNCH-517: Make FileSourceImpl implement ReadableSource.
Date Fri, 08 May 2015 21:42:55 GMT
Repository: crunch
Updated Branches:
  refs/heads/master c9be5e87d -> 9f4193163


CRUNCH-517: Make FileSourceImpl implement ReadableSource.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/9f419316
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/9f419316
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/9f419316

Branch: refs/heads/master
Commit: 9f4193163d739cf1da4049f5793455d41e32b888
Parents: c9be5e8
Author: Josh Wills <jwills@apache.org>
Authored: Tue May 5 09:45:08 2015 +0200
Committer: Josh Wills <jwills@apache.org>
Committed: Fri May 8 22:38:43 2015 +0100

----------------------------------------------------------------------
 .../org/apache/crunch/io/FormattedFileIT.java   |  66 ++++++++++
 .../io/impl/DefaultFileReaderFactory.java       | 131 +++++++++++++++++++
 .../apache/crunch/io/impl/FileReadableData.java |  42 ++++++
 .../apache/crunch/io/impl/FileSourceImpl.java   |  14 +-
 4 files changed, 252 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/9f419316/crunch-core/src/it/java/org/apache/crunch/io/FormattedFileIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/io/FormattedFileIT.java b/crunch-core/src/it/java/org/apache/crunch/io/FormattedFileIT.java
new file mode 100644
index 0000000..2b15eac
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/io/FormattedFileIT.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class FormattedFileIT {
+
+  @Rule
+  public TemporaryPath tmpDir = TemporaryPaths.create();
+
+  @Test
+  public void testReadFormattedFile() throws Exception {
+    String urlsFile = tmpDir.copyResourceFileName("urls.txt");
+    Pipeline p = new MRPipeline(FormattedFileIT.class, tmpDir.getDefaultConfiguration());
+    PTable<LongWritable, Text> urls = p.read(From.formattedFile(urlsFile,
+        TextInputFormat.class, LongWritable.class, Text.class));
+    List<String> expect = ImmutableList.of("A", "A", "A", "B", "B", "C", "D", "E",
"F", "F", "");
+    List<String> actual = Lists.newArrayList(Iterables.transform(urls.materialize(),
+        new Function<Pair<LongWritable, Text>, String>() {
+          @Override
+          public String apply(Pair<LongWritable, Text> pair) {
+            String str = pair.second().toString();
+            if (str.isEmpty()) {
+              return str;
+            }
+            return str.substring(4, 5);
+          }
+        }));
+    assertEquals(expect, actual);
+    p.done();
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/9f419316/crunch-core/src/main/java/org/apache/crunch/io/impl/DefaultFileReaderFactory.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/DefaultFileReaderFactory.java
b/crunch-core/src/main/java/org/apache/crunch/io/impl/DefaultFileReaderFactory.java
new file mode 100644
index 0000000..90c15fa
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/DefaultFileReaderFactory.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.impl;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.UnmodifiableIterator;
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.crunch.hadoop.mapreduce.TaskAttemptContextFactory;
+import org.apache.crunch.io.FileReaderFactory;
+import org.apache.crunch.io.FormatBundle;
+import org.apache.crunch.types.PType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+class DefaultFileReaderFactory<T> implements FileReaderFactory<T> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DefaultFileReaderFactory.class);
+
+  private final FormatBundle<? extends InputFormat> bundle;
+  private final PType<T> ptype;
+
+  public DefaultFileReaderFactory(FormatBundle<? extends InputFormat> bundle, PType<T>
ptype) {
+    this.bundle = bundle;
+    this.ptype = ptype;
+  }
+
+  @Override
+  public Iterator<T> read(FileSystem fs, Path path) {
+    final Configuration conf = new Configuration(fs.getConf());
+    bundle.configure(conf);
+    ptype.initialize(conf);
+
+    final InputFormat fmt = ReflectionUtils.newInstance(bundle.getFormatClass(), conf);
+    final TaskAttemptContext ctxt = TaskAttemptContextFactory.create(conf, new TaskAttemptID());
+    try {
+      Job job = new Job(conf);
+      FileInputFormat.addInputPath(job, path);
+      return Iterators.concat(Lists.transform(fmt.getSplits(job), new Function<InputSplit,
Iterator<T>>() {
+        @Override
+        public Iterator<T> apply(InputSplit split) {
+          try {
+            RecordReader reader = fmt.createRecordReader(split, ctxt);
+            reader.initialize(split, ctxt);
+            return new RecordReaderIterator<T>(reader, ptype);
+          } catch (Exception e) {
+            LOG.error("Error reading split: " + split, e);
+            throw new CrunchRuntimeException(e);
+          }
+        }
+      }).iterator());
+    } catch (Exception e) {
+      LOG.error("Error reading path: " + path, e);
+      throw new CrunchRuntimeException(e);
+    }
+  }
+
+  private static class RecordReaderIterator<T> extends UnmodifiableIterator<T>
{
+
+    private final RecordReader reader;
+    private final PType<T> ptype;
+    private T cur;
+    private boolean hasNext;
+
+    public RecordReaderIterator(RecordReader reader, PType<T> ptype) {
+      this.reader = reader;
+      this.ptype = ptype;
+      try {
+        this.hasNext = reader.nextKeyValue();
+        if (hasNext) {
+          Object converted = ptype.getConverter().convertInput(
+                  reader.getCurrentKey(), reader.getCurrentValue());
+          this.cur = ptype.getInputMapFn().map(converted);
+        }
+      } catch (Exception e) {
+        throw new CrunchRuntimeException(e);
+      }
+    }
+
+    @Override
+    public boolean hasNext() {
+      return hasNext;
+    }
+
+    @Override
+    public T next() {
+      T ret = cur;
+      try {
+        hasNext = reader.nextKeyValue();
+        if (hasNext) {
+          Object converted = ptype.getConverter().convertInput(
+                  reader.getCurrentKey(), reader.getCurrentValue());
+          this.cur = ptype.getInputMapFn().map(converted);
+        }
+      } catch (Exception e) {
+        throw new CrunchRuntimeException(e);
+      }
+      return ret;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/9f419316/crunch-core/src/main/java/org/apache/crunch/io/impl/FileReadableData.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileReadableData.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileReadableData.java
new file mode 100644
index 0000000..9d9bb93
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileReadableData.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.impl;
+
+import org.apache.crunch.io.FileReaderFactory;
+import org.apache.crunch.io.FormatBundle;
+import org.apache.crunch.types.PType;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputFormat;
+
+import java.util.List;
+
+class FileReadableData<T> extends ReadableDataImpl<T> {
+
+  private final FormatBundle<? extends InputFormat> bundle;
+  private final PType<T> ptype;
+
+  public FileReadableData(List<Path> paths, FormatBundle<? extends InputFormat>
bundle, PType<T> ptype) {
+    super(paths);
+    this.bundle = bundle;
+    this.ptype = ptype;
+  }
+  @Override
+  protected FileReaderFactory<T> getFileReaderFactory() {
+    return new DefaultFileReaderFactory<T>(bundle, ptype);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/9f419316/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
index b42d815..27a1167 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
@@ -24,12 +24,14 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.crunch.ReadableData;
 import org.apache.crunch.Source;
 import org.apache.crunch.impl.mr.run.CrunchInputFormat;
 import org.apache.crunch.io.CompositePathIterable;
 import org.apache.crunch.io.CrunchInputs;
 import org.apache.crunch.io.FileReaderFactory;
 import org.apache.crunch.io.FormatBundle;
+import org.apache.crunch.io.ReadableSource;
 import org.apache.crunch.io.SourceTargetHelper;
 import org.apache.crunch.types.Converter;
 import org.apache.crunch.types.PType;
@@ -41,7 +43,7 @@ import org.apache.hadoop.mapreduce.Job;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class FileSourceImpl<T> implements Source<T> {
+public class FileSourceImpl<T> implements ReadableSource<T> {
 
   private static final Logger LOG = LoggerFactory.getLogger(FileSourceImpl.class);
 
@@ -177,4 +179,14 @@ public class FileSourceImpl<T> implements Source<T> {
   public String toString() {
     return new StringBuilder().append(inputBundle.getName()).append("(").append(pathsAsString()).append(")").toString();
   }
+
+  @Override
+  public Iterable<T> read(Configuration conf) throws IOException {
+    return read(conf, new DefaultFileReaderFactory<T>(inputBundle, ptype));
+  }
+
+  @Override
+  public ReadableData<T> asReadable() {
+    return new FileReadableData<T>(paths, inputBundle, ptype);
+  }
 }


Mime
View raw message