Return-Path: X-Original-To: apmail-crunch-commits-archive@www.apache.org Delivered-To: apmail-crunch-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id AA5DF10907 for ; Tue, 23 Apr 2013 20:41:09 +0000 (UTC) Received: (qmail 44183 invoked by uid 500); 23 Apr 2013 20:41:06 -0000 Delivered-To: apmail-crunch-commits-archive@crunch.apache.org Received: (qmail 43974 invoked by uid 500); 23 Apr 2013 20:41:05 -0000 Mailing-List: contact commits-help@crunch.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@crunch.apache.org Delivered-To: mailing list commits@crunch.apache.org Received: (qmail 43215 invoked by uid 99); 23 Apr 2013 20:41:04 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 23 Apr 2013 20:41:04 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 4067D8231AB; Tue, 23 Apr 2013 20:41:04 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jwills@apache.org To: commits@crunch.apache.org Date: Tue, 23 Apr 2013 20:41:11 -0000 Message-Id: <14fdaa2fbeca45b580701941e34cc22b@git.apache.org> In-Reply-To: <5c5792aee18145bb82d063830978ad2a@git.apache.org> References: <5c5792aee18145bb82d063830978ad2a@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [09/43] CRUNCH-196: crunch -> crunch-core rename to fix build issues http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java b/crunch/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java deleted file mode 100644 index 4d2b88a..0000000 --- a/crunch/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java +++ /dev/null @@ -1,89 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.io.impl; - -import java.io.IOException; - -import org.apache.commons.lang.builder.HashCodeBuilder; -import org.apache.crunch.Source; -import org.apache.crunch.SourceTarget; -import org.apache.crunch.Target; -import org.apache.crunch.io.OutputHandler; -import org.apache.crunch.types.PType; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.Job; - -class SourceTargetImpl implements SourceTarget { - - protected final Source source; - protected final Target target; - - public SourceTargetImpl(Source source, Target target) { - this.source = source; - this.target = target; - } - - @Override - public PType getType() { - return source.getType(); - } - - @Override - public void configureSource(Job job, int inputId) throws IOException { - source.configureSource(job, inputId); - } - - @Override - public long getSize(Configuration configuration) { - return source.getSize(configuration); - } - - @Override - public boolean accept(OutputHandler handler, PType ptype) { - return target.accept(handler, ptype); - } - - @Override - public SourceTarget asSourceTarget(PType ptype) { - return target.asSourceTarget(ptype); - } - - @Override - public boolean equals(Object other) { - if (other == null || !(other.getClass().equals(getClass()))) { - return false; - } - SourceTargetImpl sti = (SourceTargetImpl) other; - return source.equals(sti.source) && target.equals(sti.target); - } - - @Override - public int hashCode() { - return new HashCodeBuilder().append(source).append(target).toHashCode(); - } - - @Override - public String toString() { - return source.toString(); - } - - @Override - public void handleExisting(WriteMode strategy, Configuration conf) { - target.handleExisting(strategy, conf); - } -} http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/impl/TableSourcePathTargetImpl.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/TableSourcePathTargetImpl.java b/crunch/src/main/java/org/apache/crunch/io/impl/TableSourcePathTargetImpl.java deleted file mode 100644 index a8ff639..0000000 --- a/crunch/src/main/java/org/apache/crunch/io/impl/TableSourcePathTargetImpl.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.io.impl; - -import org.apache.crunch.Pair; -import org.apache.crunch.TableSource; -import org.apache.crunch.io.FileNamingScheme; -import org.apache.crunch.io.PathTarget; -import org.apache.crunch.io.SequentialFileNamingScheme; -import org.apache.crunch.types.PTableType; - -public class TableSourcePathTargetImpl extends SourcePathTargetImpl> implements TableSource { - - public TableSourcePathTargetImpl(TableSource source, PathTarget target) { - this(source, target, new SequentialFileNamingScheme()); - } - - public TableSourcePathTargetImpl(TableSource source, PathTarget target, FileNamingScheme fileNamingScheme) { - super(source, target, fileNamingScheme); - } - - @Override - public PTableType getTableType() { - return ((TableSource) source).getTableType(); - } -} http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/impl/TableSourceTargetImpl.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/TableSourceTargetImpl.java b/crunch/src/main/java/org/apache/crunch/io/impl/TableSourceTargetImpl.java deleted file mode 100644 index 965b0f9..0000000 --- a/crunch/src/main/java/org/apache/crunch/io/impl/TableSourceTargetImpl.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.io.impl; - -import org.apache.crunch.Pair; -import org.apache.crunch.TableSource; -import org.apache.crunch.Target; -import org.apache.crunch.types.PTableType; - -public class TableSourceTargetImpl extends SourceTargetImpl> implements TableSource { - - public TableSourceTargetImpl(TableSource source, Target target) { - super(source, target); - } - - @Override - public PTableType getTableType() { - return ((TableSource) source).getTableType(); - } -} http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/package-info.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/package-info.java b/crunch/src/main/java/org/apache/crunch/io/package-info.java deleted file mode 100644 index 022bc99..0000000 --- a/crunch/src/main/java/org/apache/crunch/io/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Data input and output for Pipelines. - */ -package org.apache.crunch.io; http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileHelper.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileHelper.java b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileHelper.java deleted file mode 100644 index ba07506..0000000 --- a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileHelper.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.io.seq; - -import org.apache.crunch.MapFn; -import org.apache.crunch.types.PType; -import org.apache.crunch.types.writable.WritableType; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.util.ReflectionUtils; - -class SeqFileHelper { - static Writable newInstance(PType ptype, Configuration conf) { - return (Writable) ReflectionUtils.newInstance(((WritableType) ptype).getSerializationClass(), conf); - } - - static MapFn getInputMapFn(PType ptype) { - return ptype.getInputMapFn(); - } -} http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java deleted file mode 100644 index 3f45644..0000000 --- a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java +++ /dev/null @@ -1,112 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.io.seq; - -import java.io.IOException; -import java.util.Iterator; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.crunch.MapFn; -import org.apache.crunch.fn.IdentityFn; -import org.apache.crunch.io.FileReaderFactory; -import org.apache.crunch.io.impl.AutoClosingIterator; -import org.apache.crunch.types.Converter; -import org.apache.crunch.types.PTableType; -import org.apache.crunch.types.PType; -import org.apache.crunch.types.writable.Writables; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.util.ReflectionUtils; - -import com.google.common.collect.Iterators; -import com.google.common.collect.UnmodifiableIterator; - -public class SeqFileReaderFactory implements FileReaderFactory { - - private static final Log LOG = LogFactory.getLog(SeqFileReaderFactory.class); - - private final Converter converter; - private final MapFn mapFn; - private final Writable key; - private final Writable value; - - public SeqFileReaderFactory(PType ptype) { - this.converter = ptype.getConverter(); - this.mapFn = ptype.getInputMapFn(); - if (ptype instanceof PTableType) { - PTableType ptt = (PTableType) ptype; - this.key = SeqFileHelper.newInstance(ptt.getKeyType(), null); - this.value = SeqFileHelper.newInstance(ptt.getValueType(), null); - } else { - this.key = NullWritable.get(); - this.value = SeqFileHelper.newInstance(ptype, null); - } - } - - public SeqFileReaderFactory(Class clazz) { - PType ptype = Writables.writables(clazz); - this.converter = ptype.getConverter(); - this.mapFn = ptype.getInputMapFn(); - this.key = NullWritable.get(); - this.value = (Writable) ReflectionUtils.newInstance(clazz, null); - } - - @Override - public Iterator read(FileSystem fs, final Path path) { - mapFn.initialize(); - try { - final SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, fs.getConf()); - return new AutoClosingIterator(reader, new UnmodifiableIterator() { - boolean nextChecked = false; - boolean hasNext = false; - - @Override - public boolean hasNext() { - if (nextChecked == true) { - return hasNext; - } - try { - hasNext = reader.next(key, value); - nextChecked = true; - return hasNext; - } catch (IOException e) { - LOG.info("Error reading from path: " + path, e); - return false; - } - } - - @Override - public T next() { - if (!nextChecked && !hasNext()) { - return null; - } - nextChecked = false; - return mapFn.map(converter.convertInput(key, value)); - } - }); - } catch (IOException e) { - LOG.info("Could not read seqfile at path: " + path, e); - return Iterators.emptyIterator(); - } - } - -} http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileSource.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileSource.java b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileSource.java deleted file mode 100644 index 8fac4ae..0000000 --- a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileSource.java +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.io.seq; - -import java.io.IOException; - -import org.apache.crunch.io.CompositePathIterable; -import org.apache.crunch.io.ReadableSource; -import org.apache.crunch.io.impl.FileSourceImpl; -import org.apache.crunch.types.PType; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; - -public class SeqFileSource extends FileSourceImpl implements ReadableSource { - - public SeqFileSource(Path path, PType ptype) { - super(path, ptype, SequenceFileInputFormat.class); - } - - @Override - public Iterable read(Configuration conf) throws IOException { - FileSystem fs = path.getFileSystem(conf); - return CompositePathIterable.create(fs, path, new SeqFileReaderFactory(ptype)); - } - - @Override - public String toString() { - return "SeqFile(" + path.toString() + ")"; - } -} http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileSourceTarget.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileSourceTarget.java b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileSourceTarget.java deleted file mode 100644 index adc739f..0000000 --- a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileSourceTarget.java +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.io.seq; - -import org.apache.crunch.io.FileNamingScheme; -import org.apache.crunch.io.SequentialFileNamingScheme; -import org.apache.crunch.io.impl.ReadableSourcePathTargetImpl; -import org.apache.crunch.types.PType; -import org.apache.hadoop.fs.Path; - -public class SeqFileSourceTarget extends ReadableSourcePathTargetImpl { - - public SeqFileSourceTarget(String path, PType ptype) { - this(new Path(path), ptype); - } - - public SeqFileSourceTarget(Path path, PType ptype) { - this(path, ptype, new SequentialFileNamingScheme()); - } - - public SeqFileSourceTarget(Path path, PType ptype, FileNamingScheme fileNamingScheme) { - super(new SeqFileSource(path, ptype), new SeqFileTarget(path), fileNamingScheme); - } - - @Override - public String toString() { - return target.toString(); - } -} http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableSource.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableSource.java b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableSource.java deleted file mode 100644 index 7a63272..0000000 --- a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableSource.java +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.io.seq; - -import java.io.IOException; - -import org.apache.crunch.Pair; -import org.apache.crunch.io.CompositePathIterable; -import org.apache.crunch.io.ReadableSource; -import org.apache.crunch.io.impl.FileTableSourceImpl; -import org.apache.crunch.types.PTableType; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; - -/** - * A {@code TableSource} that uses {@code SequenceFileInputFormat} to read the input - * file. - */ -public class SeqFileTableSource extends FileTableSourceImpl implements ReadableSource> { - - public SeqFileTableSource(String path, PTableType ptype) { - this(new Path(path), ptype); - } - - public SeqFileTableSource(Path path, PTableType ptype) { - super(path, ptype, SequenceFileInputFormat.class); - } - - @Override - public Iterable> read(Configuration conf) throws IOException { - FileSystem fs = path.getFileSystem(conf); - return CompositePathIterable.create(fs, path, - new SeqFileReaderFactory>(getTableType())); - } - - @Override - public String toString() { - return "SeqFile(" + path.toString() + ")"; - } -} http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableSourceTarget.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableSourceTarget.java b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableSourceTarget.java deleted file mode 100644 index ebdf319..0000000 --- a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableSourceTarget.java +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.io.seq; - -import org.apache.crunch.Pair; -import org.apache.crunch.TableSourceTarget; -import org.apache.crunch.io.FileNamingScheme; -import org.apache.crunch.io.SequentialFileNamingScheme; -import org.apache.crunch.io.impl.ReadableSourcePathTargetImpl; -import org.apache.crunch.types.PTableType; -import org.apache.hadoop.fs.Path; - -public class SeqFileTableSourceTarget extends ReadableSourcePathTargetImpl> implements - TableSourceTarget { - private final PTableType tableType; - - public SeqFileTableSourceTarget(String path, PTableType tableType) { - this(new Path(path), tableType); - } - - public SeqFileTableSourceTarget(Path path, PTableType tableType) { - this(path, tableType, new SequentialFileNamingScheme()); - } - - public SeqFileTableSourceTarget(Path path, PTableType tableType, FileNamingScheme fileNamingScheme) { - super(new SeqFileTableSource(path, tableType), new SeqFileTarget(path), fileNamingScheme); - this.tableType = tableType; - } - - @Override - public PTableType getTableType() { - return tableType; - } - - @Override - public String toString() { - return target.toString(); - } -} http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTarget.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTarget.java b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTarget.java deleted file mode 100644 index 60e4739..0000000 --- a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTarget.java +++ /dev/null @@ -1,55 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.io.seq; - -import org.apache.crunch.SourceTarget; -import org.apache.crunch.io.FileNamingScheme; -import org.apache.crunch.io.SequentialFileNamingScheme; -import org.apache.crunch.io.impl.FileTargetImpl; -import org.apache.crunch.types.PTableType; -import org.apache.crunch.types.PType; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; - -public class SeqFileTarget extends FileTargetImpl { - public SeqFileTarget(String path) { - this(new Path(path)); - } - - public SeqFileTarget(Path path) { - this(path, new SequentialFileNamingScheme()); - } - - public SeqFileTarget(Path path, FileNamingScheme fileNamingScheme) { - super(path, SequenceFileOutputFormat.class, fileNamingScheme); - } - - @Override - public String toString() { - return "SeqFile(" + path.toString() + ")"; - } - - @Override - public SourceTarget asSourceTarget(PType ptype) { - if (ptype instanceof PTableType) { - return new SeqFileTableSourceTarget(path, (PTableType) ptype); - } else { - return new SeqFileSourceTarget(path, ptype); - } - } -} http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/text/BZip2TextInputFormat.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/text/BZip2TextInputFormat.java b/crunch/src/main/java/org/apache/crunch/io/text/BZip2TextInputFormat.java deleted file mode 100644 index 67a8870..0000000 --- a/crunch/src/main/java/org/apache/crunch/io/text/BZip2TextInputFormat.java +++ /dev/null @@ -1,235 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.io.text; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.input.FileSplit; - -class BZip2TextInputFormat extends FileInputFormat { - /** - * Treats keys as offset in file and value as line. Since the input file is - * compressed, the offset for a particular line is not well-defined. This - * implementation returns the starting position of a compressed block as the - * key for every line in that block. - */ - - private static class BZip2LineRecordReader extends RecordReader { - - private long start; - - private long end; - - private long pos; - - private CBZip2InputStream in; - - private ByteArrayOutputStream buffer = new ByteArrayOutputStream(256); - - // flag to indicate if previous character read was Carriage Return ('\r') - // and the next character was not Line Feed ('\n') - private boolean CRFollowedByNonLF = false; - - // in the case where a Carriage Return ('\r') was not followed by a - // Line Feed ('\n'), this variable will hold that non Line Feed character - // that was read from the underlying stream. - private byte nonLFChar; - - /** - * Provide a bridge to get the bytes from the ByteArrayOutputStream without - * creating a new byte array. - */ - private static class TextStuffer extends OutputStream { - public Text target; - - @Override - public void write(int b) { - throw new UnsupportedOperationException("write(byte) not supported"); - } - - @Override - public void write(byte[] data, int offset, int len) throws IOException { - target.clear(); - target.set(data, offset, len); - } - } - - private TextStuffer bridge = new TextStuffer(); - - private LongWritable key = new LongWritable(); - private Text value = new Text(); - - public BZip2LineRecordReader(Configuration job, FileSplit split) throws IOException { - start = split.getStart(); - end = start + split.getLength(); - final Path file = split.getPath(); - - // open the file and seek to the start of the split - FileSystem fs = file.getFileSystem(job); - FSDataInputStream fileIn = fs.open(split.getPath()); - fileIn.seek(start); - - in = new CBZip2InputStream(fileIn, 9, end); - if (start != 0) { - // skip first line and re-establish "start". - // LineRecordReader.readLine(this.in, null); - readLine(this.in, null); - start = in.getPos(); - } - pos = in.getPos(); - } - - /* - * LineRecordReader.readLine() is depricated in HAdoop 0.17. So it is added - * here locally. - */ - private long readLine(InputStream in, OutputStream out) throws IOException { - long bytes = 0; - while (true) { - int b = -1; - if (CRFollowedByNonLF) { - // In the previous call, a Carriage Return ('\r') was followed - // by a non Line Feed ('\n') character - in that call we would - // have not returned the non Line Feed character but would have - // read it from the stream - lets use that already read character - // now - b = nonLFChar; - CRFollowedByNonLF = false; - } else { - b = in.read(); - } - if (b == -1) { - break; - } - bytes += 1; - - byte c = (byte) b; - if (c == '\n') { - break; - } - - if (c == '\r') { - byte nextC = (byte) in.read(); - if (nextC != '\n') { - CRFollowedByNonLF = true; - nonLFChar = nextC; - } else { - bytes += 1; - } - break; - } - - if (out != null) { - out.write(c); - } - } - return bytes; - } - - /** Read a line. */ - public boolean next(LongWritable key, Text value) throws IOException { - if (pos > end) - return false; - - key.set(pos); // key is position - buffer.reset(); - // long bytesRead = LineRecordReader.readLine(in, buffer); - long bytesRead = readLine(in, buffer); - if (bytesRead == 0) { - return false; - } - pos = in.getPos(); - // if we have read ahead because we encountered a carriage return - // char followed by a non line feed char, decrement the pos - if (CRFollowedByNonLF) { - pos--; - } - - bridge.target = value; - buffer.writeTo(bridge); - return true; - } - - /** - * Get the progress within the split - */ - @Override - public float getProgress() { - if (start == end) { - return 0.0f; - } else { - return Math.min(1.0f, (pos - start) / (float) (end - start)); - } - } - - @Override - public void close() throws IOException { - in.close(); - } - - @Override - public LongWritable getCurrentKey() throws IOException, InterruptedException { - return key; - } - - @Override - public Text getCurrentValue() throws IOException, InterruptedException { - return value; - } - - @Override - public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { - // no op - } - - @Override - public boolean nextKeyValue() throws IOException, InterruptedException { - return next(key, value); - } - - } - - @Override - protected boolean isSplitable(JobContext context, Path file) { - return true; - } - - @Override - public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) { - try { - return new BZip2LineRecordReader(context.getConfiguration(), (FileSplit) split); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/text/CBZip2InputStream.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/text/CBZip2InputStream.java b/crunch/src/main/java/org/apache/crunch/io/text/CBZip2InputStream.java deleted file mode 100644 index 92bb787..0000000 --- a/crunch/src/main/java/org/apache/crunch/io/text/CBZip2InputStream.java +++ /dev/null @@ -1,980 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.io.text; - -import java.io.IOException; -import java.io.InputStream; - -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.io.compress.bzip2.BZip2Constants; -import org.apache.hadoop.mapreduce.InputSplit; - -/** - * An input stream that decompresses from the BZip2 format (without the file - * header chars) to be read as any other stream. - * - * @author Keiron Liddle - */ -class CBZip2InputStream extends InputStream implements BZip2Constants { - private static void cadvise(String reason) throws IOException { - throw new IOException(reason); - } - - private static void compressedStreamEOF() throws IOException { - cadvise("compressedStream EOF"); - } - - private void makeMaps() { - int i; - nInUse = 0; - for (i = 0; i < 256; i++) { - if (inUse[i]) { - seqToUnseq[nInUse] = (char) i; - unseqToSeq[i] = (char) nInUse; - nInUse++; - } - } - } - - /* - * index of the last char in the block, so the block size == last + 1. - */ - private int last; - - /* - * index in zptr[] of original string after sorting. - */ - private int origPtr; - - /* - * always: in the range 0 .. 9. The current block size is 100000 * this - * number. - */ - private int blockSize100k; - - private boolean blockRandomised; - - // a buffer to keep the read byte - private int bsBuff; - - // since bzip is bit-aligned at block boundaries there can be a case wherein - // only few bits out of a read byte are consumed and the remaining bits - // need to be consumed while processing the next block. - // indicate how many bits in bsBuff have not been processed yet - private int bsLive; - private CRC mCrc = new CRC(); - - private boolean[] inUse = new boolean[256]; - private int nInUse; - - private char[] seqToUnseq = new char[256]; - private char[] unseqToSeq = new char[256]; - - private char[] selector = new char[MAX_SELECTORS]; - private char[] selectorMtf = new char[MAX_SELECTORS]; - - private int[] tt; - private char[] ll8; - - /* - * freq table collected to save a pass over the data during decompression. - */ - private int[] unzftab = new int[256]; - - private int[][] limit = new int[N_GROUPS][MAX_ALPHA_SIZE]; - private int[][] base = new int[N_GROUPS][MAX_ALPHA_SIZE]; - private int[][] perm = new int[N_GROUPS][MAX_ALPHA_SIZE]; - private int[] minLens = new int[N_GROUPS]; - - private FSDataInputStream innerBsStream; - long readLimit = Long.MAX_VALUE; - - public long getReadLimit() { - return readLimit; - } - - public void setReadLimit(long readLimit) { - this.readLimit = readLimit; - } - - long readCount; - - public long getReadCount() { - return readCount; - } - - private boolean streamEnd = false; - - private int currentChar = -1; - - private static final int START_BLOCK_STATE = 1; - private static final int RAND_PART_A_STATE = 2; - private static final int RAND_PART_B_STATE = 3; - private static final int RAND_PART_C_STATE = 4; - private static final int NO_RAND_PART_A_STATE = 5; - private static final int NO_RAND_PART_B_STATE = 6; - private static final int NO_RAND_PART_C_STATE = 7; - - private int currentState = START_BLOCK_STATE; - - private int storedBlockCRC, storedCombinedCRC; - private int computedBlockCRC, computedCombinedCRC; - private boolean checkComputedCombinedCRC = true; - - int i2, count, chPrev, ch2; - int i, tPos; - int rNToGo = 0; - int rTPos = 0; - int j2; - char z; - - // see comment in getPos() - private long retPos = -1; - // the position offset which corresponds to the end of the InputSplit that - // will be processed by this instance - private long endOffsetOfSplit; - - private boolean signalToStopReading; - - public CBZip2InputStream(FSDataInputStream zStream, int blockSize, long end) throws IOException { - endOffsetOfSplit = end; - // initialize retPos to the beginning of the current InputSplit - // see comments in getPos() to understand how this is used. - retPos = zStream.getPos(); - ll8 = null; - tt = null; - checkComputedCombinedCRC = blockSize == -1; - bsSetStream(zStream); - initialize(blockSize); - initBlock(blockSize != -1); - setupBlock(); - } - - @Override - public int read() throws IOException { - if (streamEnd) { - return -1; - } else { - - // if we just started reading a bzip block which starts at a position - // >= end of current split, then we should set up retpos such that - // after a record is read, future getPos() calls will get a value - // > end of current split - this way we will read only one record out - // of this bzip block - the rest of the records from this bzip block - // should be read by the next map task while processing the next split - if (signalToStopReading) { - retPos = endOffsetOfSplit + 1; - } - - int retChar = currentChar; - switch (currentState) { - case START_BLOCK_STATE: - break; - case RAND_PART_A_STATE: - break; - case RAND_PART_B_STATE: - setupRandPartB(); - break; - case RAND_PART_C_STATE: - setupRandPartC(); - break; - case NO_RAND_PART_A_STATE: - break; - case NO_RAND_PART_B_STATE: - setupNoRandPartB(); - break; - case NO_RAND_PART_C_STATE: - setupNoRandPartC(); - break; - default: - break; - } - return retChar; - } - } - - /** - * getPos is used by the caller to know when the processing of the current - * {@link InputSplit} is complete. In this method, as we read each bzip block, - * we keep returning the beginning of the {@link InputSplit} as the return - * value until we hit a block which starts at a position >= end of current - * split. At that point we should set up retpos such that after a record is - * read, future getPos() calls will get a value > end of current split - this - * way we will read only one record out of that bzip block - the rest of the - * records from that bzip block should be read by the next map task while - * processing the next split - * - * @return - * @throws IOException - */ - public long getPos() throws IOException { - return retPos; - } - - private void initialize(int blockSize) throws IOException { - if (blockSize == -1) { - char magic1, magic2; - char magic3, magic4; - magic1 = bsGetUChar(); - magic2 = bsGetUChar(); - magic3 = bsGetUChar(); - magic4 = bsGetUChar(); - if (magic1 != 'B' || magic2 != 'Z' || magic3 != 'h' || magic4 < '1' || magic4 > '9') { - bsFinishedWithStream(); - streamEnd = true; - return; - } - blockSize = magic4 - '0'; - } - - setDecompressStructureSizes(blockSize); - computedCombinedCRC = 0; - } - - private final static long mask = 0xffffffffffffL; - private final static long eob = 0x314159265359L & mask; - private final static long eos = 0x177245385090L & mask; - - private void initBlock(boolean searchForMagic) throws IOException { - if (readCount >= readLimit) { - bsFinishedWithStream(); - streamEnd = true; - return; - } - - // position before beginning of bzip block header - long pos = innerBsStream.getPos(); - if (!searchForMagic) { - char magic1, magic2, magic3, magic4; - char magic5, magic6; - magic1 = bsGetUChar(); - magic2 = bsGetUChar(); - magic3 = bsGetUChar(); - magic4 = bsGetUChar(); - magic5 = bsGetUChar(); - magic6 = bsGetUChar(); - if (magic1 == 0x17 && magic2 == 0x72 && magic3 == 0x45 && magic4 == 0x38 && magic5 == 0x50 && magic6 == 0x90) { - complete(); - return; - } - - if (magic1 != 0x31 || magic2 != 0x41 || magic3 != 0x59 || magic4 != 0x26 || magic5 != 0x53 || magic6 != 0x59) { - badBlockHeader(); - streamEnd = true; - return; - } - } else { - long magic = 0; - for (int i = 0; i < 6; i++) { - magic <<= 8; - magic |= bsGetUChar(); - } - while (magic != eos && magic != eob) { - magic <<= 1; - magic &= mask; - magic |= bsR(1); - // if we just found the block header, the beginning of the bzip - // header would be 6 bytes before the current stream position - // when we eventually break from this while(), if it is because - // we found a block header then pos will have the correct start - // of header position - pos = innerBsStream.getPos() - 6; - } - if (magic == eos) { - complete(); - return; - } - - } - // if the previous block finished a few bits into the previous byte, - // then we will first be reading the remaining bits from the previous - // byte - so logically pos needs to be one behind - if (bsLive > 0) { - pos--; - } - - if (pos >= endOffsetOfSplit) { - // we have reached a block which begins exactly at the next InputSplit - // or >1 byte into the next InputSplit - lets record this fact - signalToStopReading = true; - } - storedBlockCRC = bsGetInt32(); - - if (bsR(1) == 1) { - blockRandomised = true; - } else { - blockRandomised = false; - } - - // currBlockNo++; - getAndMoveToFrontDecode(); - - mCrc.initialiseCRC(); - currentState = START_BLOCK_STATE; - } - - private void endBlock() throws IOException { - computedBlockCRC = mCrc.getFinalCRC(); - /* A bad CRC is considered a fatal error. */ - if (storedBlockCRC != computedBlockCRC) { - crcError(); - } - - computedCombinedCRC = (computedCombinedCRC << 1) | (computedCombinedCRC >>> 31); - computedCombinedCRC ^= computedBlockCRC; - } - - private void complete() throws IOException { - storedCombinedCRC = bsGetInt32(); - if (checkComputedCombinedCRC && storedCombinedCRC != computedCombinedCRC) { - crcError(); - } - if (innerBsStream.getPos() < endOffsetOfSplit) { - throw new IOException("Encountered additional bytes in the filesplit past the crc block. " - + "Loading of concatenated bz2 files is not supported"); - } - bsFinishedWithStream(); - streamEnd = true; - } - - private static void blockOverrun() throws IOException { - cadvise("block overrun"); - } - - private static void badBlockHeader() throws IOException { - cadvise("bad block header"); - } - - private static void crcError() throws IOException { - cadvise("CRC error"); - } - - private void bsFinishedWithStream() { - if (this.innerBsStream != null) { - if (this.innerBsStream != System.in) { - this.innerBsStream = null; - } - } - } - - private void bsSetStream(FSDataInputStream f) { - innerBsStream = f; - bsLive = 0; - bsBuff = 0; - } - - final private int readBs() throws IOException { - readCount++; - return innerBsStream.read(); - } - - private int bsR(int n) throws IOException { - int v; - while (bsLive < n) { - int zzi; - zzi = readBs(); - if (zzi == -1) { - compressedStreamEOF(); - } - bsBuff = (bsBuff << 8) | (zzi & 0xff); - bsLive += 8; - } - - v = (bsBuff >> (bsLive - n)) & ((1 << n) - 1); - bsLive -= n; - return v; - } - - private char bsGetUChar() throws IOException { - return (char) bsR(8); - } - - private int bsGetint() throws IOException { - int u = 0; - u = (u << 8) | bsR(8); - u = (u << 8) | bsR(8); - u = (u << 8) | bsR(8); - u = (u << 8) | bsR(8); - return u; - } - - private int bsGetIntVS(int numBits) throws IOException { - return bsR(numBits); - } - - private int bsGetInt32() throws IOException { - return bsGetint(); - } - - private void hbCreateDecodeTables(int[] limit, int[] base, int[] perm, char[] length, int minLen, int maxLen, - int alphaSize) { - int pp, i, j, vec; - - pp = 0; - for (i = minLen; i <= maxLen; i++) { - for (j = 0; j < alphaSize; j++) { - if (length[j] == i) { - perm[pp] = j; - pp++; - } - } - } - - for (i = 0; i < MAX_CODE_LEN; i++) { - base[i] = 0; - } - for (i = 0; i < alphaSize; i++) { - base[length[i] + 1]++; - } - - for (i = 1; i < MAX_CODE_LEN; i++) { - base[i] += base[i - 1]; - } - - for (i = 0; i < MAX_CODE_LEN; i++) { - limit[i] = 0; - } - vec = 0; - - for (i = minLen; i <= maxLen; i++) { - vec += (base[i + 1] - base[i]); - limit[i] = vec - 1; - vec <<= 1; - } - for (i = minLen + 1; i <= maxLen; i++) { - base[i] = ((limit[i - 1] + 1) << 1) - base[i]; - } - } - - private void recvDecodingTables() throws IOException { - char len[][] = new char[N_GROUPS][MAX_ALPHA_SIZE]; - int i, j, t, nGroups, nSelectors, alphaSize; - int minLen, maxLen; - boolean[] inUse16 = new boolean[16]; - - /* Receive the mapping table */ - for (i = 0; i < 16; i++) { - if (bsR(1) == 1) { - inUse16[i] = true; - } else { - inUse16[i] = false; - } - } - - for (i = 0; i < 256; i++) { - inUse[i] = false; - } - - for (i = 0; i < 16; i++) { - if (inUse16[i]) { - for (j = 0; j < 16; j++) { - if (bsR(1) == 1) { - inUse[i * 16 + j] = true; - } - } - } - } - - makeMaps(); - alphaSize = nInUse + 2; - - /* Now the selectors */ - nGroups = bsR(3); - nSelectors = bsR(15); - for (i = 0; i < nSelectors; i++) { - j = 0; - while (bsR(1) == 1) { - j++; - } - selectorMtf[i] = (char) j; - } - - /* Undo the MTF values for the selectors. */ - { - char[] pos = new char[N_GROUPS]; - char tmp, v; - for (v = 0; v < nGroups; v++) { - pos[v] = v; - } - - for (i = 0; i < nSelectors; i++) { - v = selectorMtf[i]; - tmp = pos[v]; - while (v > 0) { - pos[v] = pos[v - 1]; - v--; - } - pos[0] = tmp; - selector[i] = tmp; - } - } - - /* Now the coding tables */ - for (t = 0; t < nGroups; t++) { - int curr = bsR(5); - for (i = 0; i < alphaSize; i++) { - while (bsR(1) == 1) { - if (bsR(1) == 0) { - curr++; - } else { - curr--; - } - } - len[t][i] = (char) curr; - } - } - - /* Create the Huffman decoding tables */ - for (t = 0; t < nGroups; t++) { - minLen = 32; - maxLen = 0; - for (i = 0; i < alphaSize; i++) { - if (len[t][i] > maxLen) { - maxLen = len[t][i]; - } - if (len[t][i] < minLen) { - minLen = len[t][i]; - } - } - hbCreateDecodeTables(limit[t], base[t], perm[t], len[t], minLen, maxLen, alphaSize); - minLens[t] = minLen; - } - } - - private void getAndMoveToFrontDecode() throws IOException { - char[] yy = new char[256]; - int i, j, nextSym, limitLast; - int EOB, groupNo, groupPos; - - limitLast = baseBlockSize * blockSize100k; - origPtr = bsGetIntVS(24); - - recvDecodingTables(); - EOB = nInUse + 1; - groupNo = -1; - groupPos = 0; - - /* - * Setting up the unzftab entries here is not strictly necessary, but it - * does save having to do it later in a separate pass, and so saves a - * block's worth of cache misses. - */ - for (i = 0; i <= 255; i++) { - unzftab[i] = 0; - } - - for (i = 0; i <= 255; i++) { - yy[i] = (char) i; - } - - last = -1; - - { - int zt, zn, zvec, zj; - if (groupPos == 0) { - groupNo++; - groupPos = G_SIZE; - } - groupPos--; - zt = selector[groupNo]; - zn = minLens[zt]; - zvec = bsR(zn); - while (zvec > limit[zt][zn]) { - zn++; - { - { - while (bsLive < 1) { - int zzi = 0; - try { - zzi = readBs(); - } catch (IOException e) { - compressedStreamEOF(); - } - if (zzi == -1) { - compressedStreamEOF(); - } - bsBuff = (bsBuff << 8) | (zzi & 0xff); - bsLive += 8; - } - } - zj = (bsBuff >> (bsLive - 1)) & 1; - bsLive--; - } - zvec = (zvec << 1) | zj; - } - nextSym = perm[zt][zvec - base[zt][zn]]; - } - - while (true) { - - if (nextSym == EOB) { - break; - } - - if (nextSym == RUNA || nextSym == RUNB) { - char ch; - int s = -1; - int N = 1; - do { - if (nextSym == RUNA) { - s = s + (0 + 1) * N; - } else if (nextSym == RUNB) { - s = s + (1 + 1) * N; - } - N = N * 2; - { - int zt, zn, zvec, zj; - if (groupPos == 0) { - groupNo++; - groupPos = G_SIZE; - } - groupPos--; - zt = selector[groupNo]; - zn = minLens[zt]; - zvec = bsR(zn); - while (zvec > limit[zt][zn]) { - zn++; - { - { - while (bsLive < 1) { - int zzi = 0; - try { - zzi = readBs(); - } catch (IOException e) { - compressedStreamEOF(); - } - if (zzi == -1) { - compressedStreamEOF(); - } - bsBuff = (bsBuff << 8) | (zzi & 0xff); - bsLive += 8; - } - } - zj = (bsBuff >> (bsLive - 1)) & 1; - bsLive--; - } - zvec = (zvec << 1) | zj; - } - nextSym = perm[zt][zvec - base[zt][zn]]; - } - } while (nextSym == RUNA || nextSym == RUNB); - - s++; - ch = seqToUnseq[yy[0]]; - unzftab[ch] += s; - - while (s > 0) { - last++; - ll8[last] = ch; - s--; - } - - if (last >= limitLast) { - blockOverrun(); - } - continue; - } else { - char tmp; - last++; - if (last >= limitLast) { - blockOverrun(); - } - - tmp = yy[nextSym - 1]; - unzftab[seqToUnseq[tmp]]++; - ll8[last] = seqToUnseq[tmp]; - - /* - * This loop is hammered during decompression, hence the unrolling. - * - * for (j = nextSym-1; j > 0; j--) yy[j] = yy[j-1]; - */ - - j = nextSym - 1; - for (; j > 3; j -= 4) { - yy[j] = yy[j - 1]; - yy[j - 1] = yy[j - 2]; - yy[j - 2] = yy[j - 3]; - yy[j - 3] = yy[j - 4]; - } - for (; j > 0; j--) { - yy[j] = yy[j - 1]; - } - - yy[0] = tmp; - { - int zt, zn, zvec, zj; - if (groupPos == 0) { - groupNo++; - groupPos = G_SIZE; - } - groupPos--; - zt = selector[groupNo]; - zn = minLens[zt]; - zvec = bsR(zn); - while (zvec > limit[zt][zn]) { - zn++; - { - { - while (bsLive < 1) { - int zzi; - char thech = 0; - try { - thech = (char) readBs(); - } catch (IOException e) { - compressedStreamEOF(); - } - zzi = thech; - bsBuff = (bsBuff << 8) | (zzi & 0xff); - bsLive += 8; - } - } - zj = (bsBuff >> (bsLive - 1)) & 1; - bsLive--; - } - zvec = (zvec << 1) | zj; - } - nextSym = perm[zt][zvec - base[zt][zn]]; - } - continue; - } - } - } - - private void setupBlock() throws IOException { - int[] cftab = new int[257]; - char ch; - - cftab[0] = 0; - for (i = 1; i <= 256; i++) { - cftab[i] = unzftab[i - 1]; - } - for (i = 1; i <= 256; i++) { - cftab[i] += cftab[i - 1]; - } - - for (i = 0; i <= last; i++) { - ch = ll8[i]; - tt[cftab[ch]] = i; - cftab[ch]++; - } - cftab = null; - - tPos = tt[origPtr]; - - count = 0; - i2 = 0; - ch2 = 256; /* not a char and not EOF */ - - if (blockRandomised) { - rNToGo = 0; - rTPos = 0; - setupRandPartA(); - } else { - setupNoRandPartA(); - } - } - - private void setupRandPartA() throws IOException { - if (i2 <= last) { - chPrev = ch2; - ch2 = ll8[tPos]; - tPos = tt[tPos]; - if (rNToGo == 0) { - rNToGo = rNums[rTPos]; - rTPos++; - if (rTPos == 512) { - rTPos = 0; - } - } - rNToGo--; - ch2 ^= ((rNToGo == 1) ? 1 : 0); - i2++; - - currentChar = ch2; - currentState = RAND_PART_B_STATE; - mCrc.updateCRC(ch2); - } else { - endBlock(); - initBlock(false); - setupBlock(); - } - } - - private void setupNoRandPartA() throws IOException { - if (i2 <= last) { - chPrev = ch2; - ch2 = ll8[tPos]; - tPos = tt[tPos]; - i2++; - - currentChar = ch2; - currentState = NO_RAND_PART_B_STATE; - mCrc.updateCRC(ch2); - } else { - endBlock(); - initBlock(false); - setupBlock(); - } - } - - private void setupRandPartB() throws IOException { - if (ch2 != chPrev) { - currentState = RAND_PART_A_STATE; - count = 1; - setupRandPartA(); - } else { - count++; - if (count >= 4) { - z = ll8[tPos]; - tPos = tt[tPos]; - if (rNToGo == 0) { - rNToGo = rNums[rTPos]; - rTPos++; - if (rTPos == 512) { - rTPos = 0; - } - } - rNToGo--; - z ^= ((rNToGo == 1) ? 1 : 0); - j2 = 0; - currentState = RAND_PART_C_STATE; - setupRandPartC(); - } else { - currentState = RAND_PART_A_STATE; - setupRandPartA(); - } - } - } - - private void setupRandPartC() throws IOException { - if (j2 < (int) z) { - currentChar = ch2; - mCrc.updateCRC(ch2); - j2++; - } else { - currentState = RAND_PART_A_STATE; - i2++; - count = 0; - setupRandPartA(); - } - } - - private void setupNoRandPartB() throws IOException { - if (ch2 != chPrev) { - currentState = NO_RAND_PART_A_STATE; - count = 1; - setupNoRandPartA(); - } else { - count++; - if (count >= 4) { - z = ll8[tPos]; - tPos = tt[tPos]; - currentState = NO_RAND_PART_C_STATE; - j2 = 0; - setupNoRandPartC(); - } else { - currentState = NO_RAND_PART_A_STATE; - setupNoRandPartA(); - } - } - } - - private void setupNoRandPartC() throws IOException { - if (j2 < (int) z) { - currentChar = ch2; - mCrc.updateCRC(ch2); - j2++; - } else { - currentState = NO_RAND_PART_A_STATE; - i2++; - count = 0; - setupNoRandPartA(); - } - } - - private void setDecompressStructureSizes(int newSize100k) { - if (!(0 <= newSize100k && newSize100k <= 9 && 0 <= blockSize100k && blockSize100k <= 9)) { - // throw new IOException("Invalid block size"); - } - - blockSize100k = newSize100k; - - if (newSize100k == 0) { - return; - } - - int n = baseBlockSize * newSize100k; - ll8 = new char[n]; - tt = new int[n]; - } - - private static class CRC { - public static int crc32Table[] = { 0x00000000, 0x04c11db7, 0x09823b6e, 0x0d4326d9, 0x130476dc, 0x17c56b6b, - 0x1a864db2, 0x1e475005, 0x2608edb8, 0x22c9f00f, 0x2f8ad6d6, 0x2b4bcb61, 0x350c9b64, 0x31cd86d3, 0x3c8ea00a, - 0x384fbdbd, 0x4c11db70, 0x48d0c6c7, 0x4593e01e, 0x4152fda9, 0x5f15adac, 0x5bd4b01b, 0x569796c2, 0x52568b75, - 0x6a1936c8, 0x6ed82b7f, 0x639b0da6, 0x675a1011, 0x791d4014, 0x7ddc5da3, 0x709f7b7a, 0x745e66cd, 0x9823b6e0, - 0x9ce2ab57, 0x91a18d8e, 0x95609039, 0x8b27c03c, 0x8fe6dd8b, 0x82a5fb52, 0x8664e6e5, 0xbe2b5b58, 0xbaea46ef, - 0xb7a96036, 0xb3687d81, 0xad2f2d84, 0xa9ee3033, 0xa4ad16ea, 0xa06c0b5d, 0xd4326d90, 0xd0f37027, 0xddb056fe, - 0xd9714b49, 0xc7361b4c, 0xc3f706fb, 0xceb42022, 0xca753d95, 0xf23a8028, 0xf6fb9d9f, 0xfbb8bb46, 0xff79a6f1, - 0xe13ef6f4, 0xe5ffeb43, 0xe8bccd9a, 0xec7dd02d, 0x34867077, 0x30476dc0, 0x3d044b19, 0x39c556ae, 0x278206ab, - 0x23431b1c, 0x2e003dc5, 0x2ac12072, 0x128e9dcf, 0x164f8078, 0x1b0ca6a1, 0x1fcdbb16, 0x018aeb13, 0x054bf6a4, - 0x0808d07d, 0x0cc9cdca, 0x7897ab07, 0x7c56b6b0, 0x71159069, 0x75d48dde, 0x6b93dddb, 0x6f52c06c, 0x6211e6b5, - 0x66d0fb02, 0x5e9f46bf, 0x5a5e5b08, 0x571d7dd1, 0x53dc6066, 0x4d9b3063, 0x495a2dd4, 0x44190b0d, 0x40d816ba, - 0xaca5c697, 0xa864db20, 0xa527fdf9, 0xa1e6e04e, 0xbfa1b04b, 0xbb60adfc, 0xb6238b25, 0xb2e29692, 0x8aad2b2f, - 0x8e6c3698, 0x832f1041, 0x87ee0df6, 0x99a95df3, 0x9d684044, 0x902b669d, 0x94ea7b2a, 0xe0b41de7, 0xe4750050, - 0xe9362689, 0xedf73b3e, 0xf3b06b3b, 0xf771768c, 0xfa325055, 0xfef34de2, 0xc6bcf05f, 0xc27dede8, 0xcf3ecb31, - 0xcbffd686, 0xd5b88683, 0xd1799b34, 0xdc3abded, 0xd8fba05a, 0x690ce0ee, 0x6dcdfd59, 0x608edb80, 0x644fc637, - 0x7a089632, 0x7ec98b85, 0x738aad5c, 0x774bb0eb, 0x4f040d56, 0x4bc510e1, 0x46863638, 0x42472b8f, 0x5c007b8a, - 0x58c1663d, 0x558240e4, 0x51435d53, 0x251d3b9e, 0x21dc2629, 0x2c9f00f0, 0x285e1d47, 0x36194d42, 0x32d850f5, - 0x3f9b762c, 0x3b5a6b9b, 0x0315d626, 0x07d4cb91, 0x0a97ed48, 0x0e56f0ff, 0x1011a0fa, 0x14d0bd4d, 0x19939b94, - 0x1d528623, 0xf12f560e, 0xf5ee4bb9, 0xf8ad6d60, 0xfc6c70d7, 0xe22b20d2, 0xe6ea3d65, 0xeba91bbc, 0xef68060b, - 0xd727bbb6, 0xd3e6a601, 0xdea580d8, 0xda649d6f, 0xc423cd6a, 0xc0e2d0dd, 0xcda1f604, 0xc960ebb3, 0xbd3e8d7e, - 0xb9ff90c9, 0xb4bcb610, 0xb07daba7, 0xae3afba2, 0xaafbe615, 0xa7b8c0cc, 0xa379dd7b, 0x9b3660c6, 0x9ff77d71, - 0x92b45ba8, 0x9675461f, 0x8832161a, 0x8cf30bad, 0x81b02d74, 0x857130c3, 0x5d8a9099, 0x594b8d2e, 0x5408abf7, - 0x50c9b640, 0x4e8ee645, 0x4a4ffbf2, 0x470cdd2b, 0x43cdc09c, 0x7b827d21, 0x7f436096, 0x7200464f, 0x76c15bf8, - 0x68860bfd, 0x6c47164a, 0x61043093, 0x65c52d24, 0x119b4be9, 0x155a565e, 0x18197087, 0x1cd86d30, 0x029f3d35, - 0x065e2082, 0x0b1d065b, 0x0fdc1bec, 0x3793a651, 0x3352bbe6, 0x3e119d3f, 0x3ad08088, 0x2497d08d, 0x2056cd3a, - 0x2d15ebe3, 0x29d4f654, 0xc5a92679, 0xc1683bce, 0xcc2b1d17, 0xc8ea00a0, 0xd6ad50a5, 0xd26c4d12, 0xdf2f6bcb, - 0xdbee767c, 0xe3a1cbc1, 0xe760d676, 0xea23f0af, 0xeee2ed18, 0xf0a5bd1d, 0xf464a0aa, 0xf9278673, 0xfde69bc4, - 0x89b8fd09, 0x8d79e0be, 0x803ac667, 0x84fbdbd0, 0x9abc8bd5, 0x9e7d9662, 0x933eb0bb, 0x97ffad0c, 0xafb010b1, - 0xab710d06, 0xa6322bdf, 0xa2f33668, 0xbcb4666d, 0xb8757bda, 0xb5365d03, 0xb1f740b4 }; - - public CRC() { - initialiseCRC(); - } - - void initialiseCRC() { - globalCrc = 0xffffffff; - } - - int getFinalCRC() { - return ~globalCrc; - } - - void updateCRC(int inCh) { - int temp = (globalCrc >> 24) ^ inCh; - if (temp < 0) { - temp = 256 + temp; - } - globalCrc = (globalCrc << 8) ^ CRC.crc32Table[temp]; - } - - int globalCrc; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/text/LineParser.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/text/LineParser.java b/crunch/src/main/java/org/apache/crunch/io/text/LineParser.java deleted file mode 100644 index 9438014..0000000 --- a/crunch/src/main/java/org/apache/crunch/io/text/LineParser.java +++ /dev/null @@ -1,125 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.io.text; - -import java.util.Iterator; -import java.util.List; -import java.util.StringTokenizer; - -import org.apache.crunch.MapFn; -import org.apache.crunch.Pair; -import org.apache.crunch.fn.CompositeMapFn; -import org.apache.crunch.fn.IdentityFn; -import org.apache.crunch.types.PTableType; -import org.apache.crunch.types.PType; - -import com.google.common.base.Splitter; -import com.google.common.collect.ImmutableList; - -/** - * An abstraction for parsing the lines of a text file using a {@code PType} to - * convert the lines of text into a given data type. - * - * @param The type returned by the text parsing - */ -abstract class LineParser { - - public static LineParser forType(PType ptype) { - return new SimpleLineParser(ptype); - } - - public static LineParser> forTableType(PTableType ptt, String sep) { - return new KeyValueLineParser(ptt, sep); - } - - private MapFn mapFn; - - public void initialize() { - mapFn = getMapFn(); - mapFn.initialize(); - } - - public T parse(String line) { - return mapFn.map(line); - } - - protected abstract MapFn getMapFn(); - - private static MapFn getMapFnForPType(PType ptype) { - MapFn ret = null; - if (String.class.equals(ptype.getTypeClass())) { - ret = (MapFn) IdentityFn.getInstance(); - } else { - // Check for a composite MapFn for the PType. - // Note that this won't work for Avro-- need to solve that. - ret = ptype.getInputMapFn(); - if (ret instanceof CompositeMapFn) { - ret = ((CompositeMapFn) ret).getSecond(); - } - } - return ret; - } - - private static class SimpleLineParser extends LineParser { - - private final PType ptype; - - public SimpleLineParser(PType ptype) { - this.ptype = ptype; - } - - @Override - protected MapFn getMapFn() { - return getMapFnForPType(ptype); - } - } - - private static class KeyValueLineParser extends LineParser> { - - private final PTableType ptt; - private final String sep; - - public KeyValueLineParser(PTableType ptt, String sep) { - this.ptt = ptt; - this.sep = sep; - } - - @Override - protected MapFn> getMapFn() { - final MapFn keyMapFn = getMapFnForPType(ptt.getKeyType()); - final MapFn valueMapFn = getMapFnForPType(ptt.getValueType()); - - return new MapFn>() { - @Override - public void initialize() { - keyMapFn.initialize(); - valueMapFn.initialize(); - } - - @Override - public Pair map(String input) { - List kv = ImmutableList.copyOf(Splitter.on(sep).limit(1).split(input)); - if (kv.size() != 2) { - throw new RuntimeException("Invalid input string: " + input); - } - return Pair.of(keyMapFn.map(kv.get(0)), valueMapFn.map(kv.get(1))); - } - }; - } - } -} http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/text/NLineFileSource.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/text/NLineFileSource.java b/crunch/src/main/java/org/apache/crunch/io/text/NLineFileSource.java deleted file mode 100644 index 40e2dbd..0000000 --- a/crunch/src/main/java/org/apache/crunch/io/text/NLineFileSource.java +++ /dev/null @@ -1,77 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.io.text; - -import java.io.IOException; - -import org.apache.crunch.io.CompositePathIterable; -import org.apache.crunch.io.FormatBundle; -import org.apache.crunch.io.ReadableSource; -import org.apache.crunch.io.impl.FileSourceImpl; -import org.apache.crunch.types.PType; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat; - -/** - * A {@code Source} instance that uses the {@code NLineInputFormat}, which gives each map - * task a fraction of the lines in a text file as input. Most useful when running simulations - * on Hadoop, where each line represents configuration information about each simulation - * run. - */ -public class NLineFileSource extends FileSourceImpl implements ReadableSource { - - private static FormatBundle getBundle(int linesPerTask) { - FormatBundle bundle = FormatBundle.forInput(NLineInputFormat.class); - bundle.set(NLineInputFormat.LINES_PER_MAP, String.valueOf(linesPerTask)); - return bundle; - } - - /** - * Create a new {@code NLineFileSource} instance. - * - * @param path The path to the input data, as a String - * @param ptype The PType to use for processing the data - * @param linesPerTask The number of lines from the input each map task will process - */ - public NLineFileSource(String path, PType ptype, int linesPerTask) { - this(new Path(path), ptype, linesPerTask); - } - - /** - * Create a new {@code NLineFileSource} instance. - * - * @param path The {@code Path} to the input data - * @param ptype The PType to use for processing the data - * @param linesPerTask The number of lines from the input each map task will process - */ - public NLineFileSource(Path path, PType ptype, int linesPerTask) { - super(path, ptype, getBundle(linesPerTask)); - } - - @Override - public String toString() { - return "NLine(" + path + ")"; - } - - @Override - public Iterable read(Configuration conf) throws IOException { - return CompositePathIterable.create(path.getFileSystem(conf), path, - new TextFileReaderFactory(LineParser.forType(ptype))); - } -} http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java b/crunch/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java deleted file mode 100644 index e1fea6e..0000000 --- a/crunch/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java +++ /dev/null @@ -1,83 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.io.text; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.util.Iterator; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.crunch.io.FileReaderFactory; -import org.apache.crunch.io.impl.AutoClosingIterator; -import org.apache.crunch.types.PType; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - -import com.google.common.collect.Iterators; -import com.google.common.collect.UnmodifiableIterator; - -public class TextFileReaderFactory implements FileReaderFactory { - - private static final Log LOG = LogFactory.getLog(TextFileReaderFactory.class); - - private final LineParser parser; - - public TextFileReaderFactory(PType ptype) { - this(LineParser.forType(ptype)); - } - - public TextFileReaderFactory(LineParser parser) { - this.parser = parser; - } - - @Override - public Iterator read(FileSystem fs, Path path) { - parser.initialize(); - - FSDataInputStream is; - try { - is = fs.open(path); - } catch (IOException e) { - LOG.info("Could not read path: " + path, e); - return Iterators.emptyIterator(); - } - - final BufferedReader reader = new BufferedReader(new InputStreamReader(is)); - return new AutoClosingIterator(reader, new UnmodifiableIterator() { - private String nextLine; - - @Override - public boolean hasNext() { - try { - return (nextLine = reader.readLine()) != null; - } catch (IOException e) { - LOG.info("Exception reading text file stream", e); - return false; - } - } - - @Override - public T next() { - return parser.parse(nextLine); - } - }); - } -} http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/text/TextFileSource.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/text/TextFileSource.java b/crunch/src/main/java/org/apache/crunch/io/text/TextFileSource.java deleted file mode 100644 index 026fca9..0000000 --- a/crunch/src/main/java/org/apache/crunch/io/text/TextFileSource.java +++ /dev/null @@ -1,73 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.io.text; - -import java.io.IOException; - -import org.apache.crunch.io.CompositePathIterable; -import org.apache.crunch.io.ReadableSource; -import org.apache.crunch.io.impl.FileSourceImpl; -import org.apache.crunch.types.PType; -import org.apache.crunch.types.avro.AvroTypeFamily; -import org.apache.crunch.types.avro.AvroUtf8InputFormat; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; - -public class TextFileSource extends FileSourceImpl implements ReadableSource { - - private static boolean isBZip2(Path path) { - String strPath = path.toString(); - return strPath.endsWith(".bz") || strPath.endsWith(".bz2"); - } - - private static Class> getInputFormat(Path path, PType ptype) { - if (ptype.getFamily().equals(AvroTypeFamily.getInstance())) { - return AvroUtf8InputFormat.class; - } else if (isBZip2(path)) { - return BZip2TextInputFormat.class; - } else { - return TextInputFormat.class; - } - } - - public TextFileSource(Path path, PType ptype) { - super(path, ptype, getInputFormat(path, ptype)); - } - - @Override - public long getSize(Configuration conf) { - long sz = super.getSize(conf); - if (isBZip2(path)) { - sz *= 10; // Arbitrary compression factor - } - return sz; - } - - @Override - public String toString() { - return "Text(" + path + ")"; - } - - @Override - public Iterable read(Configuration conf) throws IOException { - return CompositePathIterable.create(path.getFileSystem(conf), path, - new TextFileReaderFactory(LineParser.forType(ptype))); - } -} http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/text/TextFileSourceTarget.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/text/TextFileSourceTarget.java b/crunch/src/main/java/org/apache/crunch/io/text/TextFileSourceTarget.java deleted file mode 100644 index 1d1211e..0000000 --- a/crunch/src/main/java/org/apache/crunch/io/text/TextFileSourceTarget.java +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.io.text; - -import org.apache.crunch.io.FileNamingScheme; -import org.apache.crunch.io.SequentialFileNamingScheme; -import org.apache.crunch.io.impl.ReadableSourcePathTargetImpl; -import org.apache.crunch.types.PType; -import org.apache.hadoop.fs.Path; - -public class TextFileSourceTarget extends ReadableSourcePathTargetImpl { - - public TextFileSourceTarget(String path, PType ptype) { - this(new Path(path), ptype); - } - - public TextFileSourceTarget(Path path, PType ptype) { - this(path, ptype, new SequentialFileNamingScheme()); - } - - public TextFileSourceTarget(Path path, PType ptype, FileNamingScheme fileNamingScheme) { - super(new TextFileSource(path, ptype), new TextFileTarget(path), fileNamingScheme); - } - - @Override - public String toString() { - return target.toString(); - } -} http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/text/TextFileTableSource.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/text/TextFileTableSource.java b/crunch/src/main/java/org/apache/crunch/io/text/TextFileTableSource.java deleted file mode 100644 index 94fc5fd..0000000 --- a/crunch/src/main/java/org/apache/crunch/io/text/TextFileTableSource.java +++ /dev/null @@ -1,81 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.io.text; - -import java.io.IOException; - -import org.apache.crunch.Pair; -import org.apache.crunch.io.CompositePathIterable; -import org.apache.crunch.io.FormatBundle; -import org.apache.crunch.io.ReadableSource; -import org.apache.crunch.io.impl.FileTableSourceImpl; -import org.apache.crunch.types.PTableType; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; - -/** - * A {@code Source} that uses the {@code KeyValueTextInputFormat} to process - * input text. If a separator for the keys and values in the text file is not specified, - * a tab character is used. - */ -public class TextFileTableSource extends FileTableSourceImpl - implements ReadableSource> { - - // CRUNCH-125: Maintain compatibility with both versions of the KeyValueTextInputFormat's - // configuration field for specifying the separator character. - private static final String OLD_KV_SEP = "key.value.separator.in.input.line"; - private static final String NEW_KV_SEP = "mapreduce.input.keyvaluelinerecordreader.key.value.separator"; - - private static FormatBundle getBundle(String sep) { - FormatBundle bundle = FormatBundle.forInput(KeyValueTextInputFormat.class); - bundle.set(OLD_KV_SEP, sep); - bundle.set(NEW_KV_SEP, sep); - return bundle; - } - - private final String separator; - - public TextFileTableSource(String path, PTableType tableType) { - this(new Path(path), tableType); - } - - public TextFileTableSource(Path path, PTableType tableType) { - this(path, tableType, "\t"); - } - - public TextFileTableSource(String path, PTableType tableType, String separator) { - this(new Path(path), tableType, separator); - } - - public TextFileTableSource(Path path, PTableType tableType, String separator) { - super(path, tableType, getBundle(separator)); - this.separator = separator; - } - - @Override - public String toString() { - return "KeyValueText(" + path + ")"; - } - - @Override - public Iterable> read(Configuration conf) throws IOException { - return CompositePathIterable.create(path.getFileSystem(conf), path, - new TextFileReaderFactory>(LineParser.forTableType(getTableType(), separator))); - } -} http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/text/TextFileTableSourceTarget.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/text/TextFileTableSourceTarget.java b/crunch/src/main/java/org/apache/crunch/io/text/TextFileTableSourceTarget.java deleted file mode 100644 index dec97e5..0000000 --- a/crunch/src/main/java/org/apache/crunch/io/text/TextFileTableSourceTarget.java +++ /dev/null @@ -1,63 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.io.text; - -import org.apache.crunch.Pair; -import org.apache.crunch.TableSourceTarget; -import org.apache.crunch.io.FileNamingScheme; -import org.apache.crunch.io.SequentialFileNamingScheme; -import org.apache.crunch.io.impl.ReadableSourcePathTargetImpl; -import org.apache.crunch.types.PTableType; -import org.apache.hadoop.fs.Path; - -/** - * A {@code TableSource} and {@code SourceTarget} implementation that uses the - * {@code KeyValueTextInputFormat} and {@code TextOutputFormat} to support reading - * and writing text files as {@code PTable} instances using a tab separator for - * the keys and the values. - */ -public class TextFileTableSourceTarget extends ReadableSourcePathTargetImpl> implements - TableSourceTarget { - - private final PTableType tableType; - - public TextFileTableSourceTarget(String path, PTableType tableType) { - this(new Path(path), tableType); - } - - public TextFileTableSourceTarget(Path path, PTableType tableType) { - this(path, tableType, new SequentialFileNamingScheme()); - } - - public TextFileTableSourceTarget(Path path, PTableType tableType, - FileNamingScheme fileNamingScheme) { - super(new TextFileTableSource(path, tableType), new TextFileTarget(path), - fileNamingScheme); - this.tableType = tableType; - } - - @Override - public PTableType getTableType() { - return tableType; - } - - @Override - public String toString() { - return target.toString(); - } -}