Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 73D81200C76 for ; Sat, 29 Apr 2017 03:00:59 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 705E9160BB8; Sat, 29 Apr 2017 01:00:59 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 25FA8160BA3 for ; Sat, 29 Apr 2017 03:00:54 +0200 (CEST) Received: (qmail 68567 invoked by uid 500); 29 Apr 2017 01:00:54 -0000 Mailing-List: contact notifications-help@asterixdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@asterixdb.apache.org Delivered-To: mailing list notifications@asterixdb.apache.org Received: (qmail 68557 invoked by uid 99); 29 Apr 2017 01:00:54 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 29 Apr 2017 01:00:54 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 9A9BAC031B for ; Sat, 29 Apr 2017 01:00:53 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.127 X-Spam-Level: ** X-Spam-Status: No, score=2.127 tagged_above=-999 required=6.31 tests=[MISSING_HEADERS=1.207, SPF_FAIL=0.919, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id WQvrKQpnOVlv for ; Sat, 29 Apr 2017 01:00:26 +0000 (UTC) Received: from unhygienix.ics.uci.edu (unhygienix.ics.uci.edu [128.195.14.130]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTP id C56715F405 for ; Sat, 29 Apr 2017 01:00:23 +0000 (UTC) Received: from localhost (localhost [127.0.0.1]) by unhygienix.ics.uci.edu (Postfix) with ESMTP id 2A92A24212E; Fri, 28 Apr 2017 18:00:23 -0700 (PDT) Date: Fri, 28 Apr 2017 18:00:23 -0700 From: "Michael Blow (Code Review)" CC: Ian Maxon , Jenkins Reply-To: mblow@apache.org X-Gerrit-MessageType: merged Subject: Change in asterixdb[master]: Remove Unused / Historical Hyracks Modules X-Gerrit-Change-Id: Iaa058eb7c73696e1ead2c05c1ee34dbe9055ce52 X-Gerrit-ChangeURL: X-Gerrit-Commit: ef36076750e1ece71acd6ce849e7d74897346919 In-Reply-To: References: MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Content-Disposition: inline User-Agent: Gerrit/2.12.7 Message-Id: <20170429010023.2A92A24212E@unhygienix.ics.uci.edu> archived-at: Sat, 29 Apr 2017 01:00:59 -0000 Michael Blow has submitted this change and it was merged. Change subject: Remove Unused / Historical Hyracks Modules ...................................................................... Remove Unused / Historical Hyracks Modules Change-Id: Iaa058eb7c73696e1ead2c05c1ee34dbe9055ce52 Reviewed-on: https://asterix-gerrit.ics.uci.edu/1714 Sonar-Qube: Jenkins Tested-by: Jenkins BAD: Jenkins Integration-Tests: Jenkins Reviewed-by: Ian Maxon --- D hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/pom.xml D hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/AbstractHadoopOperatorDescriptor.java D hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java D hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java D hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java D hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/HadoopWriteOperatorDescriptor.java D hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/AbstractClassBasedDelegate.java D hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/HadoopHashTuplePartitionComputerFactory.java D hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/HadoopNewPartitionerTuplePartitionComputerFactory.java D hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/HadoopPartitionerTuplePartitionComputerFactory.java D hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/KeyBinaryComparatorFactory.java D hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/KeyComparatorFactory.java D hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/RawComparingComparatorFactory.java D hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/WritableComparingBinaryComparatorFactory.java D hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/HadoopHelper.java D hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/HadoopTools.java D hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/HashPartitioningShuffleConnectorDescriptor.java D hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/IInputSplitProvider.java D hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/IInputSplitProviderFactory.java D hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/InputFileSplit.java D hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/KVIterator.java D hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/MapperOperatorDescriptor.java D hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/MarshalledWritable.java D hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/ReduceWriter.java D hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/ReducerOperatorDescriptor.java D hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/ShuffleFrameReader.java D hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/util/ClasspathBasedHadoopClassFactory.java D hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/util/DatatypeHelper.java D hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/util/DuplicateKeyMapper.java D hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/util/IHadoopClassFactory.java D hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/util/InputSplitsProxy.java D hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/util/MRContextUtil.java D hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/util/PreappendLongWritableMapper.java D hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-am/pom.xml D hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-am/src/main/assembly/binary-assembly.xml D hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-am/src/main/java/org/apache/hyracks/yarn/am/HyracksYarnApplicationMaster.java D hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-am/src/main/java/org/apache/hyracks/yarn/am/manifest/AbstractProcess.java D hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-am/src/main/java/org/apache/hyracks/yarn/am/manifest/ClusterController.java D hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-am/src/main/java/org/apache/hyracks/yarn/am/manifest/ContainerSpecification.java D hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-am/src/main/java/org/apache/hyracks/yarn/am/manifest/HyracksCluster.java D hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-am/src/main/java/org/apache/hyracks/yarn/am/manifest/ManifestParser.java D hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-am/src/main/java/org/apache/hyracks/yarn/am/manifest/NodeController.java D hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-client/pom.xml D hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-client/src/main/assembly/binary-assembly.xml D hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-client/src/main/java/org/apache/hyracks/yarn/client/KillHyracksApplication.java D hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-client/src/main/java/org/apache/hyracks/yarn/client/LaunchHyracksApplication.java D hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-common/pom.xml D hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-common/src/main/java/org/apache/hyracks/yarn/common/protocols/amrm/AMRMConnection.java D hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-common/src/main/java/org/apache/hyracks/yarn/common/protocols/clientrm/YarnApplication.java D hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-common/src/main/java/org/apache/hyracks/yarn/common/protocols/clientrm/YarnClientRMConnection.java D hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-common/src/main/java/org/apache/hyracks/yarn/common/resources/LocalResourceHelper.java D hyracks-fullstack/hyracks/hyracks-yarn/hyracks-yarn-common/src/main/java/org/apache/hyracks/yarn/common/resources/ResourceHelper.java D hyracks-fullstack/hyracks/hyracks-yarn/pom.xml 53 files changed, 0 insertions(+), 5,263 deletions(-) Approvals: Ian Maxon: Looks good to me, approved Jenkins: Verified; No violations found; No violations found; Verified diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/pom.xml b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/pom.xml deleted file mode 100644 index cd0b30a..0000000 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/pom.xml +++ /dev/null @@ -1,85 +0,0 @@ - - - - 4.0.0 - hyracks-dataflow-hadoop - hyracks-dataflow-hadoop - - - org.apache.hyracks - hyracks - 0.2.18-SNAPSHOT - - - - - Apache License, Version 2.0 - http://www.apache.org/licenses/LICENSE-2.0.txt - repo - A business-friendly OSS license - - - - - ${basedir}/../.. - - - - - org.apache.hyracks - hyracks-api - ${project.version} - jar - compile - - - org.apache.hyracks - hyracks-dataflow-common - ${project.version} - jar - compile - - - org.apache.hyracks - hyracks-hdfs-2.x - ${project.version} - jar - compile - - - org.apache.hadoop - hadoop-client - jar - compile - - - edu.uci.ics.dcache - dcache-client - 0.0.1 - compile - - - org.apache.hyracks - hyracks-dataflow-std - ${project.version} - compile - - - diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/AbstractHadoopOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/AbstractHadoopOperatorDescriptor.java deleted file mode 100644 index 226250e..0000000 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/AbstractHadoopOperatorDescriptor.java +++ /dev/null @@ -1,172 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.dataflow.hadoop; - -import java.io.IOException; -import java.util.Map; -import java.util.StringTokenizer; - -import org.apache.hadoop.mapred.Counters.Counter; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reporter; - -import edu.uci.ics.dcache.client.DCacheClient; -import org.apache.hyracks.api.dataflow.IDataWriter; -import org.apache.hyracks.api.dataflow.value.RecordDescriptor; -import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; -import org.apache.hyracks.dataflow.hadoop.util.DatatypeHelper; -import org.apache.hyracks.dataflow.hadoop.util.IHadoopClassFactory; -import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor; - -public abstract class AbstractHadoopOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor { - - protected transient JobConf jobConf; - - protected static class DataWritingOutputCollector implements OutputCollector { - private IDataWriter writer; - - public DataWritingOutputCollector() { - } - - public DataWritingOutputCollector(IDataWriter writer) { - this.writer = writer; - } - - @Override - public void collect(Object key, Object value) throws IOException { - writer.writeData(new Object[] { key, value }); - } - - public void setWriter(IDataWriter writer) { - this.writer = writer; - } - } - - public static String MAPRED_CACHE_FILES = "mapred.cache.files"; - public static String MAPRED_CACHE_LOCALFILES = "mapred.cache.localFiles"; - - private static final long serialVersionUID = 1L; - private final Map jobConfMap; - private IHadoopClassFactory hadoopClassFactory; - - public AbstractHadoopOperatorDescriptor(IOperatorDescriptorRegistry spec, int inputArity, - RecordDescriptor recordDescriptor, JobConf jobConf, IHadoopClassFactory hadoopOperatorFactory) { - super(spec, inputArity, 1); - jobConfMap = DatatypeHelper.jobConf2Map(jobConf); - this.hadoopClassFactory = hadoopOperatorFactory; - recordDescriptors[0] = recordDescriptor; - } - - public Map getJobConfMap() { - return jobConfMap; - } - - public IHadoopClassFactory getHadoopClassFactory() { - return hadoopClassFactory; - } - - public void setHadoopClassFactory(IHadoopClassFactory hadoopClassFactory) { - this.hadoopClassFactory = hadoopClassFactory; - } - - protected Reporter createReporter() { - return new Reporter() { - @Override - public Counter getCounter(Enum name) { - return null; - } - - @Override - public Counter getCounter(String group, String name) { - return null; - } - - @Override - public InputSplit getInputSplit() throws UnsupportedOperationException { - return null; - } - - @Override - public void incrCounter(Enum key, long amount) { - - } - - @Override - public void incrCounter(String group, String counter, long amount) { - - } - - @Override - public void progress() { - - } - - @Override - public void setStatus(String status) { - - } - - @Override - public float getProgress() { - return 0.0f; - } - }; - } - - public JobConf getJobConf() { - if (jobConf == null) { - jobConf = DatatypeHelper.map2JobConf(jobConfMap); - jobConf.setClassLoader(this.getClass().getClassLoader()); - } - return jobConf; - } - - public void populateCache(JobConf jobConf) { - try { - String cache = jobConf.get(MAPRED_CACHE_FILES); - System.out.println("cache:" + cache); - if (cache == null) { - return; - } - String localCache = jobConf.get(MAPRED_CACHE_LOCALFILES); - System.out.println("localCache:" + localCache); - if (localCache != null) { - return; - } - localCache = ""; - StringTokenizer cacheTokenizer = new StringTokenizer(cache, ","); - while (cacheTokenizer.hasMoreTokens()) { - if (!"".equals(localCache)) { - localCache += ","; - } - try { - localCache += DCacheClient.get().get(cacheTokenizer.nextToken()); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - jobConf.set(MAPRED_CACHE_LOCALFILES, localCache); - System.out.println("localCache:" + localCache); - } catch (Exception e) { - - } - } -} diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java deleted file mode 100644 index b328f29..0000000 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java +++ /dev/null @@ -1,454 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.dataflow.hadoop; - -import java.io.IOException; - -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapred.FileSplit; -import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Mapper; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapred.SequenceFileRecordReader; -import org.apache.hadoop.mapreduce.Counter; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.OutputCommitter; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.hadoop.mapreduce.StatusReporter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.hadoop.util.ReflectionUtils; - -import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.api.dataflow.IOpenableDataWriter; -import org.apache.hyracks.api.dataflow.IOperatorNodePushable; -import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; -import org.apache.hyracks.api.dataflow.value.RecordDescriptor; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; -import org.apache.hyracks.dataflow.common.comm.io.SerializingDataWriter; -import org.apache.hyracks.dataflow.hadoop.util.DatatypeHelper; -import org.apache.hyracks.dataflow.hadoop.util.IHadoopClassFactory; -import org.apache.hyracks.dataflow.hadoop.util.InputSplitsProxy; -import org.apache.hyracks.dataflow.hadoop.util.MRContextUtil; -import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable; -import org.apache.hyracks.dataflow.std.base.IOpenableDataWriterOperator; -import org.apache.hyracks.dataflow.std.util.DeserializedOperatorNodePushable; -import org.apache.hyracks.hdfs.ContextFactory; - -public class HadoopMapperOperatorDescriptor extends AbstractHadoopOperatorDescriptor { - - private class MapperBaseOperator { - protected OutputCollector output; - protected Reporter reporter; - protected Object mapper; - // protected Mapper mapper; - protected int partition; - protected JobConf conf; - protected IOpenableDataWriter writer; - protected boolean newMapreduceLib = false; - org.apache.hadoop.mapreduce.Mapper.Context context; - - public MapperBaseOperator(int partition) { - this.partition = partition; - } - - protected void initializeMapper() throws HyracksDataException { - Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader()); - jobConf = getJobConf(); - populateCache(jobConf); - conf = new JobConf(jobConf); - conf.setClassLoader(jobConf.getClassLoader()); - reporter = createReporter(); - } - - protected void map(Object[] data) throws HyracksDataException { - try { - if (!conf.getUseNewMapper()) { - ((org.apache.hadoop.mapred.Mapper) mapper).map((K1) data[0], (V1) data[1], output, reporter); - } else - throw new IllegalStateException( - " Incorrect map method called for MapReduce code written using mapreduce package"); - } catch (IOException e) { - throw new HyracksDataException(e); - } catch (RuntimeException re) { - System.out.println(" Runtime exceptione encoutered for row :" + data[0] + ": " + data[1]); - re.printStackTrace(); - } - } - - protected void closeMapper() throws HyracksDataException { - try { - if (!conf.getUseNewMapper()) { - ((org.apache.hadoop.mapred.Mapper) mapper).close(); - } else { - // do nothing. closing the mapper is handled internally by - // run method on context. - } - } catch (IOException ioe) { - throw new HyracksDataException(ioe); - } - } - - } - - private class MapperOperator extends MapperBaseOperator implements IOpenableDataWriterOperator { - - public MapperOperator(int partition) { - super(partition); - }; - - @Override - public void close() throws HyracksDataException { - super.closeMapper(); - writer.close(); - } - - @Override - public void fail() throws HyracksDataException { - writer.fail(); - } - - @Override - public void open() throws HyracksDataException { - initializeMapper(); - writer.open(); - output = new DataWritingOutputCollector(writer); - } - - @Override - public void writeData(Object[] data) throws HyracksDataException { - super.map(data); - } - - public void setDataWriter(int index, IOpenableDataWriter writer) { - if (index != 0) { - throw new IllegalArgumentException(); - } - this.writer = writer; - } - - protected void initializeMapper() throws HyracksDataException { - super.initializeMapper(); - try { - mapper = createMapper(conf); - } catch (Exception e) { - throw new HyracksDataException(e); - } - if (!conf.getUseNewMapper()) { - ((org.apache.hadoop.mapred.Mapper) mapper).configure(conf); - } - } - - @Override - public void flush() throws HyracksDataException { - } - - } - - private class ReaderMapperOperator extends MapperBaseOperator { - - public ReaderMapperOperator(int partition, IOpenableDataWriter writer) throws HyracksDataException { - super(partition); - output = new DataWritingOutputCollector(writer); - this.writer = writer; - this.writer.open(); - } - - protected void updateConfWithSplit(JobConf conf) { - try { - if (inputSplits == null) { - inputSplits = inputSplitsProxy.toInputSplits(conf); - } - Object splitRead = inputSplits[partition]; - if (splitRead instanceof FileSplit) { - conf.set("map.input.file", ((FileSplit) splitRead).getPath().toString()); - conf.setLong("map.input.start", ((FileSplit) splitRead).getStart()); - conf.setLong("map.input.length", ((FileSplit) splitRead).getLength()); - } else if (splitRead instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) { - conf.set("map.input.file", ((org.apache.hadoop.mapreduce.lib.input.FileSplit) splitRead).getPath() - .toString()); - conf.setLong("map.input.start", - ((org.apache.hadoop.mapreduce.lib.input.FileSplit) splitRead).getStart()); - conf.setLong("map.input.length", - ((org.apache.hadoop.mapreduce.lib.input.FileSplit) splitRead).getLength()); - } - } catch (Exception e) { - e.printStackTrace(); - // we do not throw the exception here as we are setting - // additional parameters that may not be - // required by the mapper. If they are indeed required, the - // configure method invoked on the mapper - // shall report an exception because of the missing parameters. - } - } - - protected void initializeMapper() throws HyracksDataException { - super.initializeMapper(); - updateConfWithSplit(conf); - try { - mapper = createMapper(conf); - } catch (Exception e) { - throw new HyracksDataException(e); - } - if (!conf.getUseNewMapper()) { - ((org.apache.hadoop.mapred.Mapper) mapper).configure(conf); - } - } - - public void mapInput() throws HyracksDataException, InterruptedException, ClassNotFoundException { - try { - initializeMapper(); - conf.setClassLoader(this.getClass().getClassLoader()); - Object reader; - Object key = null; - Object value = null; - Object inputSplit = inputSplits[partition]; - reader = getRecordReader(conf, inputSplit); - final Object[] data = new Object[2]; - if (conf.getUseNewMapper()) { - org.apache.hadoop.mapreduce.RecordReader newReader = (org.apache.hadoop.mapreduce.RecordReader) reader; - org.apache.hadoop.mapreduce.RecordWriter recordWriter = new RecordWriter() { - - @Override - public void close(TaskAttemptContext arg0) throws IOException, InterruptedException { - // TODO Auto-generated method stub - } - - @Override - public void write(Object key, Object value) throws IOException, InterruptedException { - data[0] = key; - data[1] = value; - writer.writeData(data); - } - };;; - - OutputCommitter outputCommitter = new org.apache.hadoop.mapreduce.lib.output.NullOutputFormat() - .getOutputCommitter(new ContextFactory().createContext(conf, new TaskAttemptID())); - StatusReporter statusReporter = new StatusReporter() { - @Override - public void setStatus(String arg0) { - } - - @Override - public void progress() { - } - - @Override - public Counter getCounter(String arg0, String arg1) { - return null; - } - - @Override - public Counter getCounter(Enum arg0) { - return null; - } - - @Override - public float getProgress() { - // TODO Auto-generated method stub - return 0; - } - };;; - context = new MRContextUtil().createMapContext(conf, new TaskAttemptID(), newReader, recordWriter, - outputCommitter, statusReporter, (org.apache.hadoop.mapreduce.InputSplit) inputSplit); - newReader.initialize((org.apache.hadoop.mapreduce.InputSplit) inputSplit, context); - ((org.apache.hadoop.mapreduce.Mapper) mapper).run(context); - } else { - Class inputKeyClass = null; - Class inputValueClass = null; - - RecordReader oldReader = (RecordReader) reader; - if (reader instanceof SequenceFileRecordReader) { - inputKeyClass = ((SequenceFileRecordReader) oldReader).getKeyClass(); - inputValueClass = ((SequenceFileRecordReader) oldReader).getValueClass(); - } else { - inputKeyClass = oldReader.createKey().getClass(); - inputValueClass = oldReader.createValue().getClass(); - } - key = oldReader.createKey(); - value = oldReader.createValue(); - while (oldReader.next(key, value)) { - data[0] = key; - data[1] = value; - super.map(data); - } - oldReader.close(); - } - - } catch (IOException e) { - throw new HyracksDataException(e); - } - - } - - public void close() throws HyracksDataException { - super.closeMapper(); - writer.close(); - } - } - - private static final long serialVersionUID = 1L; - private Class mapperClass; - private InputSplitsProxy inputSplitsProxy; - private transient Object[] inputSplits; - private boolean selfRead = false; - - private void initializeSplitInfo(Object[] splits) throws IOException { - jobConf = super.getJobConf(); - InputFormat inputFormat = jobConf.getInputFormat(); - inputSplitsProxy = new InputSplitsProxy(jobConf, splits); - } - - public HadoopMapperOperatorDescriptor(IOperatorDescriptorRegistry spec, JobConf jobConf, - IHadoopClassFactory hadoopClassFactory) throws IOException { - super(spec, 1, getRecordDescriptor(jobConf, hadoopClassFactory), jobConf, hadoopClassFactory); - } - - public HadoopMapperOperatorDescriptor(IOperatorDescriptorRegistry spec, JobConf jobConf, Object[] splits, - IHadoopClassFactory hadoopClassFactory) throws IOException { - super(spec, 0, getRecordDescriptor(jobConf, hadoopClassFactory), jobConf, hadoopClassFactory); - initializeSplitInfo(splits); - this.selfRead = true; - } - - public static RecordDescriptor getRecordDescriptor(JobConf conf, IHadoopClassFactory hadoopClassFactory) { - RecordDescriptor recordDescriptor = null; - String mapOutputKeyClassName = conf.getMapOutputKeyClass().getName(); - String mapOutputValueClassName = conf.getMapOutputValueClass().getName(); - try { - if (hadoopClassFactory == null) { - recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor( - (Class) Class.forName(mapOutputKeyClassName), - (Class) Class.forName(mapOutputValueClassName)); - } else { - recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor( - (Class) hadoopClassFactory.loadClass(mapOutputKeyClassName), - (Class) hadoopClassFactory.loadClass(mapOutputValueClassName)); - } - } catch (Exception e) { - e.printStackTrace(); - } - return recordDescriptor; - } - - private Object createMapper(JobConf conf) throws Exception { - Object mapper; - if (mapperClass != null) { - return ReflectionUtils.newInstance(mapperClass, conf); - } else { - String mapperClassName = null; - if (jobConf.getUseNewMapper()) { - JobContext jobContext = new ContextFactory().createJobContext(conf); - mapperClass = jobContext.getMapperClass(); - mapperClassName = mapperClass.getName(); - } else { - mapperClass = conf.getMapperClass(); - mapperClassName = mapperClass.getName(); - } - mapper = getHadoopClassFactory().createMapper(mapperClassName, conf); - } - return mapper; - } - - private Object getRecordReader(JobConf conf, Object inputSplit) throws ClassNotFoundException, IOException, - InterruptedException { - if (conf.getUseNewMapper()) { - JobContext context = new ContextFactory().createJobContext(conf); - org.apache.hadoop.mapreduce.InputFormat inputFormat = (org.apache.hadoop.mapreduce.InputFormat) ReflectionUtils - .newInstance(context.getInputFormatClass(), conf); - TaskAttemptContext taskAttemptContext = new ContextFactory().createContext(conf, new TaskAttemptID()); - return inputFormat.createRecordReader((org.apache.hadoop.mapreduce.InputSplit) inputSplit, - taskAttemptContext); - } else { - Class inputFormatClass = conf.getInputFormat().getClass(); - InputFormat inputFormat = (InputFormat) ReflectionUtils.newInstance(inputFormatClass, conf); - return inputFormat.getRecordReader((org.apache.hadoop.mapred.InputSplit) inputSplit, conf, - super.createReporter()); - } - } - - @Override - public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, - IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException { - - JobConf conf = getJobConf(); - Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader()); - try { - if (selfRead) { - RecordDescriptor recordDescriptor = null; - if (inputSplits == null) { - inputSplits = inputSplitsProxy.toInputSplits(conf); - } - Object reader = getRecordReader(conf, inputSplits[partition]); - if (conf.getUseNewMapper()) { - org.apache.hadoop.mapreduce.RecordReader newReader = (org.apache.hadoop.mapreduce.RecordReader) reader; - newReader.initialize((org.apache.hadoop.mapreduce.InputSplit) inputSplits[partition], - new ContextFactory().createContext(conf, new TaskAttemptID())); - newReader.nextKeyValue(); - Object key = newReader.getCurrentKey(); - Class keyClass = null; - if (key == null) { - keyClass = Class.forName("org.apache.hadoop.io.NullWritable"); - } - recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor( - (Class) keyClass, (Class) newReader - .getCurrentValue().getClass()); - } else { - RecordReader oldReader = (RecordReader) reader; - recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor( - (Class) oldReader.createKey().getClass(), - (Class) oldReader.createValue().getClass()); - } - return createSelfReadingMapper(ctx, recordDescriptor, partition); - } else { - return new DeserializedOperatorNodePushable(ctx, new MapperOperator(partition), - recordDescProvider.getInputRecordDescriptor(this.activityNodeId, 0)); - } - } catch (Exception e) { - throw new HyracksDataException(e); - } - } - - private IOperatorNodePushable createSelfReadingMapper(final IHyracksTaskContext ctx, - final RecordDescriptor recordDescriptor, final int partition) { - return new AbstractUnaryOutputSourceOperatorNodePushable() { - @Override - public void initialize() throws HyracksDataException { - SerializingDataWriter writer = new SerializingDataWriter(ctx, recordDescriptor, this.writer); - ReaderMapperOperator readMapOp = new ReaderMapperOperator(partition, writer); - try { - readMapOp.mapInput(); - } catch (Exception e) { - writer.fail(); - throw new HyracksDataException(e); - } finally { - readMapOp.close(); - } - } - }; - } - - public Class getMapperClass() { - return mapperClass; - } -} diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java deleted file mode 100644 index 7764265..0000000 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java +++ /dev/null @@ -1,240 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.dataflow.hadoop; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapred.Counters.Counter; -import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapred.SequenceFileRecordReader; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.util.ReflectionUtils; - -import org.apache.hyracks.api.comm.VSizeFrame; -import org.apache.hyracks.api.constraints.PartitionConstraintHelper; -import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.api.dataflow.IOperatorNodePushable; -import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; -import org.apache.hyracks.api.dataflow.value.RecordDescriptor; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.job.JobSpecification; -import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; -import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; -import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; -import org.apache.hyracks.dataflow.hadoop.util.DatatypeHelper; -import org.apache.hyracks.dataflow.hadoop.util.InputSplitsProxy; -import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor; -import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable; -import org.apache.hyracks.hdfs.ContextFactory; - -public class HadoopReadOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor { - private static final long serialVersionUID = 1L; - - private String inputFormatClassName; - private Map jobConfMap; - private InputSplitsProxy inputSplitsProxy; - private transient JobConf jobConf; - - public JobConf getJobConf() { - if (jobConf == null) { - jobConf = DatatypeHelper.map2JobConf(jobConfMap); - } - return jobConf; - } - - public HadoopReadOperatorDescriptor(JobConf jobConf, JobSpecification spec, Object[] splits) throws IOException { - super(spec, 0, 1); - this.jobConfMap = DatatypeHelper.jobConf2Map(jobConf); - InputFormat inputFormat = jobConf.getInputFormat(); - RecordReader recordReader; - try { - recordReader = getRecordReader(DatatypeHelper.map2JobConf(jobConfMap), splits[0]); - } catch (Exception e) { - throw new IOException(e); - } - recordDescriptors[0] = DatatypeHelper.createKeyValueRecordDescriptor((Class) recordReader - .createKey().getClass(), (Class) recordReader.createValue().getClass()); - PartitionConstraintHelper.addPartitionCountConstraint(spec, this, splits.length); - inputSplitsProxy = new InputSplitsProxy(jobConf, splits); - this.inputFormatClassName = inputFormat.getClass().getName(); - } - - private RecordReader getRecordReader(JobConf conf, Object inputSplit) throws ClassNotFoundException, IOException, - InterruptedException { - RecordReader hadoopRecordReader = null; - if (conf.getUseNewMapper()) { - JobContext context = new ContextFactory().createJobContext(conf); - org.apache.hadoop.mapreduce.InputFormat inputFormat = (org.apache.hadoop.mapreduce.InputFormat) ReflectionUtils - .newInstance(context.getInputFormatClass(), conf); - TaskAttemptContext taskAttemptContext = new ContextFactory().createContext(jobConf, null); - hadoopRecordReader = (RecordReader) inputFormat.createRecordReader( - (org.apache.hadoop.mapreduce.InputSplit) inputSplit, taskAttemptContext); - } else { - Class inputFormatClass = conf.getInputFormat().getClass(); - InputFormat inputFormat = (InputFormat) ReflectionUtils.newInstance(inputFormatClass, conf); - hadoopRecordReader = (RecordReader) inputFormat.getRecordReader( - (org.apache.hadoop.mapred.InputSplit) inputSplit, conf, createReporter()); - } - return hadoopRecordReader; - } - - public Object[] getInputSplits() throws InstantiationException, IllegalAccessException, IOException { - return inputSplitsProxy.toInputSplits(getJobConf()); - } - - protected Reporter createReporter() { - return new Reporter() { - @Override - public Counter getCounter(Enum name) { - return null; - } - - @Override - public Counter getCounter(String group, String name) { - return null; - } - - @Override - public InputSplit getInputSplit() throws UnsupportedOperationException { - return null; - } - - @Override - public void incrCounter(Enum key, long amount) { - - } - - @Override - public void incrCounter(String group, String counter, long amount) { - - } - - @Override - public void progress() { - - } - - @Override - public void setStatus(String status) { - - } - - @Override - public float getProgress() { - // TODO Auto-generated method stub - return 0; - } - }; - } - - @SuppressWarnings("deprecation") - @Override - public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, - final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) - throws HyracksDataException { - return new AbstractUnaryOutputSourceOperatorNodePushable() { - @Override - public void initialize() throws HyracksDataException { - try { - JobConf conf = DatatypeHelper.map2JobConf((HashMap) jobConfMap); - Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader()); - conf.setClassLoader(this.getClass().getClassLoader()); - RecordReader hadoopRecordReader; - Object key; - Object value; - Object[] splits = inputSplitsProxy.toInputSplits(conf); - Object inputSplit = splits[partition]; - - if (conf.getUseNewMapper()) { - JobContext context = new ContextFactory().createJobContext(conf); - org.apache.hadoop.mapreduce.InputFormat inputFormat = (org.apache.hadoop.mapreduce.InputFormat) ReflectionUtils - .newInstance(context.getInputFormatClass(), conf); - TaskAttemptContext taskAttemptContext = new ContextFactory().createContext(jobConf, null); - hadoopRecordReader = (RecordReader) inputFormat.createRecordReader( - (org.apache.hadoop.mapreduce.InputSplit) inputSplit, taskAttemptContext); - } else { - Class inputFormatClass = conf.getInputFormat().getClass(); - InputFormat inputFormat = (InputFormat) ReflectionUtils.newInstance(inputFormatClass, conf); - hadoopRecordReader = (RecordReader) inputFormat.getRecordReader( - (org.apache.hadoop.mapred.InputSplit) inputSplit, conf, createReporter()); - } - - Class inputKeyClass; - Class inputValueClass; - if (hadoopRecordReader instanceof SequenceFileRecordReader) { - inputKeyClass = ((SequenceFileRecordReader) hadoopRecordReader).getKeyClass(); - inputValueClass = ((SequenceFileRecordReader) hadoopRecordReader).getValueClass(); - } else { - inputKeyClass = hadoopRecordReader.createKey().getClass(); - inputValueClass = hadoopRecordReader.createValue().getClass(); - } - - key = hadoopRecordReader.createKey(); - value = hadoopRecordReader.createValue(); - FrameTupleAppender appender = new FrameTupleAppender(new VSizeFrame(ctx)); - RecordDescriptor outputRecordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor( - (Class) hadoopRecordReader.createKey().getClass(), - (Class) hadoopRecordReader.createValue().getClass()); - int nFields = outputRecordDescriptor.getFieldCount(); - ArrayTupleBuilder tb = new ArrayTupleBuilder(nFields); - writer.open(); - try { - while (hadoopRecordReader.next(key, value)) { - tb.reset(); - switch (nFields) { - case 2: - tb.addField(outputRecordDescriptor.getFields()[0], key); - case 1: - tb.addField(outputRecordDescriptor.getFields()[1], value); - } - FrameUtils - .appendToWriter(writer, appender, tb.getFieldEndOffsets(), tb.getByteArray(), - 0, tb.getSize()); - } - appender.write(writer, true); - } catch (Exception e) { - writer.fail(); - throw new HyracksDataException(e); - } finally { - writer.close(); - } - hadoopRecordReader.close(); - } catch (InstantiationException e) { - throw new HyracksDataException(e); - } catch (IllegalAccessException e) { - throw new HyracksDataException(e); - } catch (ClassNotFoundException e) { - throw new HyracksDataException(e); - } catch (InterruptedException e) { - throw new HyracksDataException(e); - } catch (IOException e) { - throw new HyracksDataException(e); - } - } - }; - } -} diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java deleted file mode 100644 index 6913876..0000000 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java +++ /dev/null @@ -1,422 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.dataflow.hadoop; - -import java.io.IOException; -import java.util.Iterator; -import java.util.NoSuchElementException; - -import org.apache.hadoop.io.DataInputBuffer; -import org.apache.hadoop.io.RawComparator; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.io.WritableComparator; -import org.apache.hadoop.mapred.Counters.Counter; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RawKeyValueIterator; -import org.apache.hadoop.mapred.Reducer; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.hadoop.util.Progress; -import org.apache.hadoop.util.ReflectionUtils; - -import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.api.dataflow.IDataReader; -import org.apache.hyracks.api.dataflow.IDataWriter; -import org.apache.hyracks.api.dataflow.IOperatorNodePushable; -import org.apache.hyracks.api.dataflow.value.IComparator; -import org.apache.hyracks.api.dataflow.value.IComparatorFactory; -import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; -import org.apache.hyracks.api.dataflow.value.RecordDescriptor; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; -import org.apache.hyracks.dataflow.hadoop.data.KeyComparatorFactory; -import org.apache.hyracks.dataflow.hadoop.data.RawComparingComparatorFactory; -import org.apache.hyracks.dataflow.hadoop.util.DatatypeHelper; -import org.apache.hyracks.dataflow.hadoop.util.IHadoopClassFactory; -import org.apache.hyracks.dataflow.hadoop.util.MRContextUtil; -import org.apache.hyracks.dataflow.std.base.IOpenableDataWriterOperator; -import org.apache.hyracks.dataflow.std.group.DeserializedPreclusteredGroupOperator; -import org.apache.hyracks.dataflow.std.group.IGroupAggregator; -import org.apache.hyracks.dataflow.std.util.DeserializedOperatorNodePushable; -import org.apache.hyracks.hdfs.ContextFactory; - -public class HadoopReducerOperatorDescriptor extends AbstractHadoopOperatorDescriptor { - private class ReducerAggregator implements IGroupAggregator { - private Object reducer; - private DataWritingOutputCollector output; - private Reporter reporter; - private ReducerContext reducerContext; - RawKeyValueIterator rawKeyValueIterator = new RawKeyValueIterator() { - - @Override - public boolean next() throws IOException { - return false; - } - - @Override - public DataInputBuffer getValue() throws IOException { - return null; - } - - @Override - public Progress getProgress() { - return null; - } - - @Override - public DataInputBuffer getKey() throws IOException { - return null; - } - - @Override - public void close() throws IOException { - - } - }; - - class ReducerContext extends org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer.Context { - private HadoopReducerOperatorDescriptor.ValueIterator iterator; - - @SuppressWarnings("unchecked") - ReducerContext(org.apache.hadoop.mapreduce.Reducer reducer, JobConf conf) throws IOException, - InterruptedException, ClassNotFoundException { - ((org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer) reducer).super(new MRContextUtil() - .createReduceContext(conf, new TaskAttemptID(), rawKeyValueIterator, null, null, null, null, - null, null, Class.forName("org.apache.hadoop.io.NullWritable"), - Class.forName("org.apache.hadoop.io.NullWritable"))); - } - - public void setIterator(HadoopReducerOperatorDescriptor.ValueIterator iter) { - iterator = iter; - } - - @Override - public Iterable getValues() throws IOException, InterruptedException { - return new Iterable() { - @Override - public Iterator iterator() { - return iterator; - } - }; - } - - /** Start processing next unique key. */ - @Override - public boolean nextKey() throws IOException, InterruptedException { - boolean hasMore = iterator.hasNext(); - if (hasMore) { - nextKeyValue(); - } - return hasMore; - } - - /** - * Advance to the next key/value pair. - */ - @Override - public boolean nextKeyValue() throws IOException, InterruptedException { - iterator.next(); - return true; - } - - public Object getCurrentKey() { - return iterator.getKey(); - } - - @Override - public Object getCurrentValue() { - return iterator.getValue(); - } - - /** - * Generate an output key/value pair. - */ - @Override - public void write(Object key, Object value) throws IOException, InterruptedException { - output.collect(key, value); - } - - } - - public ReducerAggregator(Object reducer) throws HyracksDataException { - this.reducer = reducer; - initializeReducer(); - output = new DataWritingOutputCollector(); - reporter = new Reporter() { - @Override - public void progress() { - - } - - @Override - public void setStatus(String arg0) { - - } - - @Override - public void incrCounter(String arg0, String arg1, long arg2) { - - } - - @Override - public void incrCounter(Enum arg0, long arg1) { - - } - - @Override - public InputSplit getInputSplit() throws UnsupportedOperationException { - return null; - } - - @Override - public Counter getCounter(String arg0, String arg1) { - return null; - } - - @Override - public Counter getCounter(Enum arg0) { - return null; - } - - @Override - public float getProgress() { - // TODO Auto-generated method stub - return 0; - } - }; - } - - @Override - public void aggregate(IDataReader reader, IDataWriter writer) throws HyracksDataException { - Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader()); - ValueIterator i = new ValueIterator(); - i.reset(reader); - output.setWriter(writer); - try { - if (jobConf.getUseNewReducer()) { - try { - reducerContext.setIterator(i); - ((org.apache.hadoop.mapreduce.Reducer) reducer).run(reducerContext); - } catch (InterruptedException e) { - e.printStackTrace(); - throw new HyracksDataException(e); - } - } else { - ((org.apache.hadoop.mapred.Reducer) reducer).reduce(i.getKey(), i, output, reporter); - } - } catch (IOException e) { - e.printStackTrace(); - } - } - - @Override - public void close() throws HyracksDataException { - // -- - close - -- - try { - if (!jobConf.getUseNewMapper()) { - ((org.apache.hadoop.mapred.Reducer) reducer).close(); - } - } catch (IOException e) { - throw new HyracksDataException(e); - } - } - - private void initializeReducer() throws HyracksDataException { - jobConf.setClassLoader(this.getClass().getClassLoader()); - if (!jobConf.getUseNewReducer()) { - ((org.apache.hadoop.mapred.Reducer) reducer).configure(getJobConf()); - } else { - try { - reducerContext = new ReducerContext((org.apache.hadoop.mapreduce.Reducer) reducer, jobConf); - } catch (IOException e) { - e.printStackTrace(); - throw new HyracksDataException(e); - } catch (InterruptedException e) { - e.printStackTrace(); - throw new HyracksDataException(e); - } catch (RuntimeException e) { - e.printStackTrace(); - } catch (ClassNotFoundException e) { - e.printStackTrace(); - } - } - } - } - - private class ValueIterator implements Iterator { - private IDataReader reader; - private K2 key; - private V2 value; - - public K2 getKey() { - return key; - } - - public V2 getValue() { - return value; - } - - @Override - public boolean hasNext() { - if (value == null) { - Object[] tuple; - try { - tuple = reader.readData(); - } catch (Exception e) { - throw new RuntimeException(e); - } - if (tuple != null) { - value = (V2) tuple[1]; - } - } - return value != null; - } - - @Override - public V2 next() { - if (!hasNext()) { - throw new NoSuchElementException(); - } - V2 v = value; - value = null; - return v; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - - void reset(IDataReader reader) { - this.reader = reader; - try { - Object[] tuple = reader.readData(); - key = (K2) tuple[0]; - value = (V2) tuple[1]; - } catch (Exception e) { - throw new RuntimeException(e); - } - } - } - - private static final long serialVersionUID = 1L; - private Class reducerClass; - private IComparatorFactory comparatorFactory; - private boolean useAsCombiner = false; - - public HadoopReducerOperatorDescriptor(IOperatorDescriptorRegistry spec, JobConf conf, - IComparatorFactory comparatorFactory, IHadoopClassFactory classFactory, boolean useAsCombiner) { - super(spec, 1, getRecordDescriptor(conf, classFactory), conf, classFactory); - this.comparatorFactory = comparatorFactory; - this.useAsCombiner = useAsCombiner; - } - - private Object createReducer() throws Exception { - if (reducerClass != null) { - return ReflectionUtils.newInstance(reducerClass, getJobConf()); - } else { - Object reducer; - if (!useAsCombiner) { - if (getJobConf().getUseNewReducer()) { - JobContext jobContext = new ContextFactory().createJobContext(getJobConf()); - reducerClass = (Class>) jobContext - .getReducerClass(); - } else { - reducerClass = (Class) getJobConf().getReducerClass(); - } - } else { - if (getJobConf().getUseNewReducer()) { - JobContext jobContext = new ContextFactory().createJobContext(getJobConf()); - reducerClass = (Class>) jobContext - .getCombinerClass(); - } else { - reducerClass = (Class) getJobConf().getCombinerClass(); - } - } - reducer = getHadoopClassFactory().createReducer(reducerClass.getName(), getJobConf()); - return reducer; - } - } - - @Override - public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, - IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) { - try { - if (this.comparatorFactory == null) { - String comparatorClassName = getJobConf().getOutputValueGroupingComparator().getClass().getName(); - Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader()); - RawComparator rawComparator = null; - if (comparatorClassName != null) { - Class comparatorClazz = getHadoopClassFactory().loadClass(comparatorClassName); - this.comparatorFactory = new KeyComparatorFactory(comparatorClazz); - - } else { - String mapOutputKeyClass = getJobConf().getMapOutputKeyClass().getName(); - if (getHadoopClassFactory() != null) { - rawComparator = WritableComparator.get(getHadoopClassFactory().loadClass(mapOutputKeyClass)); - } else { - rawComparator = WritableComparator.get((Class) Class - .forName(mapOutputKeyClass)); - } - this.comparatorFactory = new RawComparingComparatorFactory(rawComparator.getClass()); - } - } - IOpenableDataWriterOperator op = new DeserializedPreclusteredGroupOperator(new int[] { 0 }, - new IComparator[] { comparatorFactory.createComparator() }, new ReducerAggregator(createReducer())); - return new DeserializedOperatorNodePushable(ctx, op, recordDescProvider.getInputRecordDescriptor( - getActivityId(), 0)); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - public static RecordDescriptor getRecordDescriptor(JobConf conf, IHadoopClassFactory classFactory) { - String outputKeyClassName = null; - String outputValueClassName = null; - - if (conf.getUseNewMapper()) { - JobContext context = new ContextFactory().createJobContext(conf); - outputKeyClassName = context.getOutputKeyClass().getName(); - outputValueClassName = context.getOutputValueClass().getName(); - } else { - outputKeyClassName = conf.getOutputKeyClass().getName(); - outputValueClassName = conf.getOutputValueClass().getName(); - } - - RecordDescriptor recordDescriptor = null; - try { - if (classFactory == null) { - recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor( - (Class) Class.forName(outputKeyClassName), - (Class) Class.forName(outputValueClassName)); - } else { - recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor( - (Class) classFactory.loadClass(outputKeyClassName), - (Class) classFactory.loadClass(outputValueClassName)); - } - } catch (Exception e) { - e.printStackTrace(); - return null; - } - return recordDescriptor; - } -} diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/HadoopWriteOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/HadoopWriteOperatorDescriptor.java deleted file mode 100644 index 5709082..0000000 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/HadoopWriteOperatorDescriptor.java +++ /dev/null @@ -1,245 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.dataflow.hadoop; - -import java.io.File; -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.Counters.Counter; -import org.apache.hadoop.mapred.FileOutputCommitter; -import org.apache.hadoop.mapred.FileOutputFormat; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapred.lib.NullOutputFormat; -import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.hadoop.util.Progressable; -import org.apache.hadoop.util.ReflectionUtils; - -import org.apache.hyracks.api.io.FileReference; -import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; -import org.apache.hyracks.dataflow.hadoop.util.DatatypeHelper; -import org.apache.hyracks.dataflow.std.file.AbstractFileWriteOperatorDescriptor; -import org.apache.hyracks.dataflow.std.file.FileSplit; -import org.apache.hyracks.dataflow.std.file.IRecordWriter; -import org.apache.hyracks.hdfs.ContextFactory; - -public class HadoopWriteOperatorDescriptor extends AbstractFileWriteOperatorDescriptor { - - private class HadoopFileWriter implements IRecordWriter { - - Object recordWriter; - JobConf conf; - Path finalOutputFile; - Path tempOutputFile; - Path tempDir; - - HadoopFileWriter(Object recordWriter, int index, JobConf conf) throws Exception { - this.recordWriter = recordWriter; - this.conf = conf; - initialize(index, conf); - } - - private void initialize(int index, JobConf conf) throws Exception { - if (!(conf.getOutputFormat() instanceof NullOutputFormat)) { - boolean isMap = conf.getNumReduceTasks() == 0; - TaskAttemptID taskAttempId = new TaskAttemptID("0", index, isMap, index, index); - conf.set("mapred.task.id", taskAttempId.toString()); - String suffix = new String("part-00000"); - suffix = new String(suffix.substring(0, suffix.length() - ("" + index).length())); - suffix = suffix + index; - outputPath = new Path(conf.get("mapred.output.dir")); - tempDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME); - FileSystem fileSys = tempDir.getFileSystem(conf); - if (!fileSys.mkdirs(tempDir)) { - throw new IOException("Mkdirs failed to create " + tempDir.toString()); - } - tempOutputFile = new Path(tempDir, new Path("_" + taskAttempId.toString())); - tempOutputFile = new Path(tempOutputFile, suffix); - finalOutputFile = new Path(outputPath, suffix); - if (conf.getUseNewMapper()) { - org.apache.hadoop.mapreduce.JobContext jobContext = new ContextFactory().createJobContext(conf); - org.apache.hadoop.mapreduce.OutputFormat newOutputFormat = (org.apache.hadoop.mapreduce.OutputFormat) ReflectionUtils - .newInstance(jobContext.getOutputFormatClass(), conf); - recordWriter = newOutputFormat.getRecordWriter(new ContextFactory().createContext(conf, - taskAttempId)); - } else { - recordWriter = conf.getOutputFormat().getRecordWriter(FileSystem.get(conf), conf, suffix, - new Progressable() { - @Override - public void progress() { - } - }); - } - } - } - - @Override - public void write(Object[] record) throws Exception { - if (recordWriter != null) { - if (conf.getUseNewMapper()) { - ((org.apache.hadoop.mapreduce.RecordWriter) recordWriter).write(record[0], record[1]); - } else { - ((org.apache.hadoop.mapred.RecordWriter) recordWriter).write(record[0], record[1]); - } - } - } - - @Override - public void close() { - try { - if (recordWriter != null) { - if (conf.getUseNewMapper()) { - ((org.apache.hadoop.mapreduce.RecordWriter) recordWriter).close(new ContextFactory() - .createContext(conf, new TaskAttemptID())); - } else { - ((org.apache.hadoop.mapred.RecordWriter) recordWriter).close(null); - } - if (outputPath != null) { - FileSystem fileSystem = FileSystem.get(conf); - fileSystem.rename(tempOutputFile, finalOutputFile); - fileSystem.delete(tempDir, true); - } - } - } catch (Exception e) { - e.printStackTrace(); - } - } - } - - private static final long serialVersionUID = 1L; - Map jobConfMap; - - @Override - protected IRecordWriter createRecordWriter(FileSplit fileSplit, int index) throws Exception { - JobConf conf = DatatypeHelper.map2JobConf((HashMap) jobConfMap); - conf.setClassLoader(this.getClass().getClassLoader()); - Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader()); - FileSystem fileSystem = FileSystem.get(conf); - Object recordWriter = null; - return new HadoopFileWriter(recordWriter, index, conf); - } - - Path outputPath; - Path outputTempPath; - - protected Reporter createReporter() { - return new Reporter() { - @Override - public Counter getCounter(Enum name) { - return null; - } - - @Override - public Counter getCounter(String group, String name) { - return null; - } - - @Override - public InputSplit getInputSplit() throws UnsupportedOperationException { - return null; - } - - @Override - public void incrCounter(Enum key, long amount) { - - } - - @Override - public void incrCounter(String group, String counter, long amount) { - - } - - @Override - public void progress() { - - } - - @Override - public void setStatus(String status) { - - } - - @Override - public float getProgress() { - // TODO Auto-generated method stub - return 0; - } - }; - } - - private boolean checkIfCanWriteToHDFS(FileSplit[] fileSplits) throws Exception { - JobConf conf = DatatypeHelper.map2JobConf((HashMap) jobConfMap); - try { - FileSystem fileSystem = FileSystem.get(conf); - for (FileSplit fileSplit : fileSplits) { - Path path = new Path(fileSplit.getLocalFile().getFile().getPath()); - if (fileSystem.exists(path)) { - throw new Exception(" Output path : already exists : " + path); - } - } - } catch (IOException ioe) { - ioe.printStackTrace(); - throw ioe; - } - return true; - } - - private static FileSplit[] getOutputSplits(JobConf conf, int noOfMappers) throws ClassNotFoundException { - int numOutputters = conf.getNumReduceTasks() != 0 ? conf.getNumReduceTasks() : noOfMappers; - Object outputFormat = null; - if (conf.getUseNewMapper()) { - outputFormat = ReflectionUtils.newInstance(new ContextFactory().createJobContext(conf) - .getOutputFormatClass(), conf); - } else { - outputFormat = conf.getOutputFormat(); - } - if (outputFormat instanceof NullOutputFormat) { - FileSplit[] outputFileSplits = new FileSplit[numOutputters]; - for (int i = 0; i < numOutputters; i++) { - String outputPath = "/tmp/" + System.currentTimeMillis() + i; - outputFileSplits[i] = new FileSplit("localhost", new FileReference(new File(outputPath))); - } - return outputFileSplits; - } else { - - FileSplit[] outputFileSplits = new FileSplit[numOutputters]; - String absolutePath = FileOutputFormat.getOutputPath(conf).toString(); - for (int index = 0; index < numOutputters; index++) { - String suffix = new String("part-00000"); - suffix = new String(suffix.substring(0, suffix.length() - ("" + index).length())); - suffix = suffix + index; - String outputPath = absolutePath + "/" + suffix; - outputFileSplits[index] = new FileSplit("localhost", outputPath); - } - return outputFileSplits; - } - } - - public HadoopWriteOperatorDescriptor(IOperatorDescriptorRegistry jobSpec, JobConf jobConf, int numMapTasks) - throws Exception { - super(jobSpec, getOutputSplits(jobConf, numMapTasks)); - this.jobConfMap = DatatypeHelper.jobConf2Map(jobConf); - checkIfCanWriteToHDFS(super.splits); - } -} diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/AbstractClassBasedDelegate.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/AbstractClassBasedDelegate.java deleted file mode 100644 index 62c37ed..0000000 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/AbstractClassBasedDelegate.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.dataflow.hadoop.data; - -import java.io.ObjectStreamException; -import java.io.Serializable; - -public class AbstractClassBasedDelegate implements Serializable { - private static final long serialVersionUID = 1L; - private Class klass; - protected transient T instance; - - public AbstractClassBasedDelegate(Class klass) { - this.klass = klass; - init(); - } - - protected Object readResolve() throws ObjectStreamException { - init(); - return this; - } - - private void init() { - try { - instance = klass.newInstance(); - } catch (InstantiationException e) { - throw new RuntimeException(e); - } catch (IllegalAccessException e) { - throw new RuntimeException(e); - } - } -} diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/HadoopHashTuplePartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/HadoopHashTuplePartitionComputerFactory.java deleted file mode 100644 index fe8a612..0000000 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/HadoopHashTuplePartitionComputerFactory.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.dataflow.hadoop.data; - -import java.io.DataInputStream; - -import org.apache.hadoop.io.Writable; - -import org.apache.hyracks.api.comm.IFrameTupleAccessor; -import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; -import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer; -import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream; - -public class HadoopHashTuplePartitionComputerFactory implements ITuplePartitionComputerFactory { - private static final long serialVersionUID = 1L; - private final ISerializerDeserializer keyIO; - - public HadoopHashTuplePartitionComputerFactory(ISerializerDeserializer keyIO) { - this.keyIO = keyIO; - } - - @Override - public ITuplePartitionComputer createPartitioner() { - return new ITuplePartitionComputer() { - private final ByteBufferInputStream bbis = new ByteBufferInputStream(); - private final DataInputStream dis = new DataInputStream(bbis); - - @Override - public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException { - int keyStart = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength() - + accessor.getFieldStartOffset(tIndex, 0); - bbis.setByteBuffer(accessor.getBuffer(), keyStart); - K key = keyIO.deserialize(dis); - int h = key.hashCode(); - if (h < 0) { - h = -h; - } - return h % nParts; - } - }; - } -} diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/HadoopNewPartitionerTuplePartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/HadoopNewPartitionerTuplePartitionComputerFactory.java deleted file mode 100644 index b20c6e0..0000000 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/HadoopNewPartitionerTuplePartitionComputerFactory.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.dataflow.hadoop.data; - -import java.io.DataInputStream; - -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapreduce.Partitioner; - -import org.apache.hyracks.api.comm.IFrameTupleAccessor; -import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; -import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer; -import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream; - -public class HadoopNewPartitionerTuplePartitionComputerFactory extends - AbstractClassBasedDelegate> implements ITuplePartitionComputerFactory { - private static final long serialVersionUID = 1L; - private final ISerializerDeserializer keyIO; - private final ISerializerDeserializer valueIO; - - public HadoopNewPartitionerTuplePartitionComputerFactory(Class> klass, - ISerializerDeserializer keyIO, ISerializerDeserializer valueIO) { - super(klass); - this.keyIO = keyIO; - this.valueIO = valueIO; - } - - @Override - public ITuplePartitionComputer createPartitioner() { - return new ITuplePartitionComputer() { - private final ByteBufferInputStream bbis = new ByteBufferInputStream(); - private final DataInputStream dis = new DataInputStream(bbis); - - @Override - public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException { - int keyStart = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength() - + accessor.getFieldStartOffset(tIndex, 0); - bbis.setByteBuffer(accessor.getBuffer(), keyStart); - K key = keyIO.deserialize(dis); - int valueStart = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength() - + accessor.getFieldStartOffset(tIndex, 1); - bbis.setByteBuffer(accessor.getBuffer(), valueStart); - V value = valueIO.deserialize(dis); - return instance.getPartition(key, value, nParts); - } - }; - } -} diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/HadoopPartitionerTuplePartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/HadoopPartitionerTuplePartitionComputerFactory.java deleted file mode 100644 index 127eed9..0000000 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/HadoopPartitionerTuplePartitionComputerFactory.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.dataflow.hadoop.data; - -import java.io.DataInputStream; - -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapred.Partitioner; - -import org.apache.hyracks.api.comm.IFrameTupleAccessor; -import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; -import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer; -import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream; - -public class HadoopPartitionerTuplePartitionComputerFactory extends - AbstractClassBasedDelegate> implements ITuplePartitionComputerFactory { - private static final long serialVersionUID = 1L; - private final ISerializerDeserializer keyIO; - private final ISerializerDeserializer valueIO; - - public HadoopPartitionerTuplePartitionComputerFactory(Class> klass, - ISerializerDeserializer keyIO, ISerializerDeserializer valueIO) { - super(klass); - this.keyIO = keyIO; - this.valueIO = valueIO; - } - - @Override - public ITuplePartitionComputer createPartitioner() { - return new ITuplePartitionComputer() { - private final ByteBufferInputStream bbis = new ByteBufferInputStream(); - private final DataInputStream dis = new DataInputStream(bbis); - - @Override - public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException { - int keyStart = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength() - + accessor.getFieldStartOffset(tIndex, 0); - bbis.setByteBuffer(accessor.getBuffer(), keyStart); - K key = keyIO.deserialize(dis); - int valueStart = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength() - + accessor.getFieldStartOffset(tIndex, 1); - bbis.setByteBuffer(accessor.getBuffer(), valueStart); - V value = valueIO.deserialize(dis); - return instance.getPartition(key, value, nParts); - } - }; - } -} diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/KeyBinaryComparatorFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/KeyBinaryComparatorFactory.java deleted file mode 100644 index b74fdde..0000000 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/KeyBinaryComparatorFactory.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.dataflow.hadoop.data; - -import org.apache.hadoop.io.RawComparator; - -import org.apache.hyracks.api.dataflow.value.IBinaryComparator; -import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; -import org.apache.hyracks.dataflow.common.util.ReflectionUtils; - -public class KeyBinaryComparatorFactory implements IBinaryComparatorFactory { - private static final long serialVersionUID = 1L; - - private Class> cmpClass; - - public KeyBinaryComparatorFactory(Class> cmpClass) { - this.cmpClass = cmpClass; - } - - @Override - public IBinaryComparator createBinaryComparator() { - final RawComparator instance = ReflectionUtils.createInstance(cmpClass); - return new IBinaryComparator() { - @Override - public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { - return instance.compare(b1, s1, l1, b2, s2, l2); - } - }; - } -} diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/KeyComparatorFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/KeyComparatorFactory.java deleted file mode 100644 index 01024bd..0000000 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/KeyComparatorFactory.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.dataflow.hadoop.data; - -import org.apache.hadoop.io.RawComparator; - -import org.apache.hyracks.api.dataflow.value.IComparator; -import org.apache.hyracks.api.dataflow.value.IComparatorFactory; -import org.apache.hyracks.dataflow.common.util.ReflectionUtils; - -public class KeyComparatorFactory implements IComparatorFactory { - private static final long serialVersionUID = 1L; - private Class> cmpClass; - - public KeyComparatorFactory(Class> cmpClass) { - this.cmpClass = cmpClass; - } - - @Override - public IComparator createComparator() { - final RawComparator instance = ReflectionUtils.createInstance(cmpClass); - return new IComparator() { - @Override - public int compare(T o1, T o2) { - return instance.compare(o1, o2); - } - }; - } -} diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/RawComparingComparatorFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/RawComparingComparatorFactory.java deleted file mode 100644 index d2f0ead..0000000 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/RawComparingComparatorFactory.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.dataflow.hadoop.data; - -import org.apache.hadoop.io.RawComparator; -import org.apache.hadoop.io.WritableComparable; - -import org.apache.hyracks.api.dataflow.value.IComparator; -import org.apache.hyracks.api.dataflow.value.IComparatorFactory; -import org.apache.hyracks.dataflow.common.util.ReflectionUtils; - -public class RawComparingComparatorFactory implements IComparatorFactory> { - private Class klass; - - public RawComparingComparatorFactory(Class klass) { - this.klass = klass; - } - - private static final long serialVersionUID = 1L; - - @Override - public IComparator> createComparator() { - final RawComparator instance = ReflectionUtils.createInstance(klass); - return new IComparator>() { - @Override - public int compare(WritableComparable o1, WritableComparable o2) { - return instance.compare(o1, o2); - } - }; - } -} diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/WritableComparingBinaryComparatorFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/WritableComparingBinaryComparatorFactory.java deleted file mode 100644 index 01f8755..0000000 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/WritableComparingBinaryComparatorFactory.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.dataflow.hadoop.data; - -import org.apache.hadoop.io.RawComparator; - -import org.apache.hyracks.api.dataflow.value.IBinaryComparator; -import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; -import org.apache.hyracks.dataflow.common.util.ReflectionUtils; - -public class WritableComparingBinaryComparatorFactory implements IBinaryComparatorFactory { - private static final long serialVersionUID = 1L; - - private Class> cmpClass; - - public WritableComparingBinaryComparatorFactory(Class> cmpClass) { - this.cmpClass = cmpClass; - } - - @Override - public IBinaryComparator createBinaryComparator() { - final RawComparator instance = ReflectionUtils.createInstance(cmpClass); - return new IBinaryComparator() { - @Override - public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { - return instance.compare(b1, s1, l1, b2, s2, l2); - } - }; - } -} diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/HadoopHelper.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/HadoopHelper.java deleted file mode 100644 index b248bff..0000000 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/HadoopHelper.java +++ /dev/null @@ -1,276 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.dataflow.hadoop.mapreduce; - -import java.io.IOException; -import java.util.List; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.io.RawComparator; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapreduce.InputFormat; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.OutputFormat; -import org.apache.hadoop.mapreduce.Partitioner; -import org.apache.hadoop.mapreduce.Reducer; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.hadoop.util.ReflectionUtils; - -import org.apache.hyracks.api.context.IHyracksCommonContext; -import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; -import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; -import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory; -import org.apache.hyracks.api.dataflow.value.RecordDescriptor; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer; -import org.apache.hyracks.dataflow.hadoop.data.HadoopNewPartitionerTuplePartitionComputerFactory; -import org.apache.hyracks.dataflow.hadoop.data.WritableComparingBinaryComparatorFactory; -import org.apache.hyracks.dataflow.hadoop.util.DatatypeHelper; -import org.apache.hyracks.hdfs.ContextFactory; - -public class HadoopHelper { - public static final int KEY_FIELD_INDEX = 0; - public static final int VALUE_FIELD_INDEX = 1; - public static final int BLOCKID_FIELD_INDEX = 2; - private static final int[] KEY_SORT_FIELDS = new int[] { 0 }; - - private MarshalledWritable mConfig; - private Configuration config; - private Job job; - - public HadoopHelper(MarshalledWritable mConfig) throws HyracksDataException { - this.mConfig = mConfig; - ClassLoader ctxCL = Thread.currentThread().getContextClassLoader(); - try { - Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); - config = mConfig.get(); - config.setClassLoader(getClass().getClassLoader()); - job = new Job(config); - } catch (Exception e) { - throw new HyracksDataException(e); - } finally { - Thread.currentThread().setContextClassLoader(ctxCL); - } - } - - public RecordDescriptor getMapOutputRecordDescriptor() throws HyracksDataException { - try { - return new RecordDescriptor( - new ISerializerDeserializer[] { - DatatypeHelper.createSerializerDeserializer((Class) job - .getMapOutputKeyClass()), - DatatypeHelper.createSerializerDeserializer((Class) job - .getMapOutputValueClass()), IntegerSerializerDeserializer.INSTANCE }); - - } catch (Exception e) { - throw new HyracksDataException(e); - } - } - - public RecordDescriptor getMapOutputRecordDescriptorWithoutExtraFields() throws HyracksDataException { - try { - return new RecordDescriptor( - new ISerializerDeserializer[] { - DatatypeHelper.createSerializerDeserializer((Class) job - .getMapOutputKeyClass()), - DatatypeHelper.createSerializerDeserializer((Class) job - .getMapOutputValueClass()) }); - - } catch (Exception e) { - throw new HyracksDataException(e); - } - } - - public TaskAttemptContext createTaskAttemptContext(TaskAttemptID taId) throws HyracksDataException { - ClassLoader ctxCL = Thread.currentThread().getContextClassLoader(); - try { - Thread.currentThread().setContextClassLoader(config.getClassLoader()); - return new ContextFactory().createContext(config, taId); - } catch (HyracksDataException e) { - e.printStackTrace(); - throw new HyracksDataException(e); - } finally { - Thread.currentThread().setContextClassLoader(ctxCL); - } - } - - public JobContext createJobContext() { - ClassLoader ctxCL = Thread.currentThread().getContextClassLoader(); - try { - Thread.currentThread().setContextClassLoader(config.getClassLoader()); - return new ContextFactory().createJobContext(config); - } finally { - Thread.currentThread().setContextClassLoader(ctxCL); - } - } - - public Mapper getMapper() throws HyracksDataException { - try { - return (Mapper) HadoopTools.newInstance(job.getMapperClass()); - } catch (ClassNotFoundException e) { - throw new HyracksDataException(e); - } catch (InstantiationException e) { - throw new HyracksDataException(e); - } catch (IllegalAccessException e) { - throw new HyracksDataException(e); - } - } - - public Reducer getReducer() throws HyracksDataException { - try { - return (Reducer) HadoopTools.newInstance(job.getReducerClass()); - } catch (ClassNotFoundException e) { - throw new HyracksDataException(e); - } catch (InstantiationException e) { - throw new HyracksDataException(e); - } catch (IllegalAccessException e) { - throw new HyracksDataException(e); - } - } - - public Reducer getCombiner() throws HyracksDataException { - try { - return (Reducer) HadoopTools.newInstance(job.getCombinerClass()); - } catch (ClassNotFoundException e) { - throw new HyracksDataException(e); - } catch (InstantiationException e) { - throw new HyracksDataException(e); - } catch (IllegalAccessException e) { - throw new HyracksDataException(e); - } - } - - public InputFormat getInputFormat() throws HyracksDataException { - try { - return (InputFormat) ReflectionUtils.newInstance(job.getInputFormatClass(), config); - } catch (ClassNotFoundException e) { - throw new HyracksDataException(e); - } - } - - public List getInputSplits() throws HyracksDataException { - ClassLoader ctxCL = Thread.currentThread().getContextClassLoader(); - try { - Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); - InputFormat fmt = getInputFormat(); - JobContext jCtx = new ContextFactory().createJobContext(config); - try { - return fmt.getSplits(jCtx); - } catch (IOException e) { - throw new HyracksDataException(e); - } catch (InterruptedException e) { - throw new HyracksDataException(e); - } - } finally { - Thread.currentThread().setContextClassLoader(ctxCL); - } - } - - public IBinaryComparatorFactory[] getSortComparatorFactories() { - WritableComparingBinaryComparatorFactory comparatorFactory = new WritableComparingBinaryComparatorFactory(job - .getSortComparator().getClass()); - - return new IBinaryComparatorFactory[] { comparatorFactory }; - } - - public IBinaryComparatorFactory[] getGroupingComparatorFactories() { - WritableComparingBinaryComparatorFactory comparatorFactory = new WritableComparingBinaryComparatorFactory(job - .getGroupingComparator().getClass()); - - return new IBinaryComparatorFactory[] { comparatorFactory }; - } - - public RawComparator getRawGroupingComparator() { - return job.getGroupingComparator(); - } - - public int getSortFrameLimit(IHyracksCommonContext ctx) { - int sortMemory = job.getConfiguration().getInt("io.sort.mb", 100); - return (int) (((long) sortMemory * 1024 * 1024) / ctx.getInitialFrameSize()); - } - - public Job getJob() { - return job; - } - - public MarshalledWritable getMarshalledConfiguration() { - return mConfig; - } - - public Configuration getConfiguration() { - return config; - } - - public ITuplePartitionComputerFactory getTuplePartitionComputer() throws HyracksDataException { - int nReducers = job.getNumReduceTasks(); - try { - return new HadoopNewPartitionerTuplePartitionComputerFactory( - (Class>) job.getPartitionerClass(), - (ISerializerDeserializer) DatatypeHelper - .createSerializerDeserializer((Class) job.getMapOutputKeyClass()), - (ISerializerDeserializer) DatatypeHelper - .createSerializerDeserializer((Class) job.getMapOutputValueClass())); - } catch (ClassNotFoundException e) { - throw new HyracksDataException(e); - } - } - - public int[] getSortFields() { - return KEY_SORT_FIELDS; - } - - public ISerializerDeserializer getMapOutputKeySerializerDeserializer() { - return (ISerializerDeserializer) DatatypeHelper.createSerializerDeserializer((Class) job - .getMapOutputKeyClass()); - } - - public ISerializerDeserializer getMapOutputValueSerializerDeserializer() { - return (ISerializerDeserializer) DatatypeHelper.createSerializerDeserializer((Class) job - .getMapOutputValueClass()); - } - - public FileSystem getFilesystem() throws HyracksDataException { - try { - return FileSystem.get(config); - } catch (IOException e) { - throw new HyracksDataException(e); - } - } - - public OutputFormat getOutputFormat() throws HyracksDataException { - try { - return (OutputFormat) ReflectionUtils.newInstance(job.getOutputFormatClass(), config); - } catch (ClassNotFoundException e) { - throw new HyracksDataException(e); - } - } - - public boolean hasCombiner() throws HyracksDataException { - try { - return job.getCombinerClass() != null; - } catch (ClassNotFoundException e) { - throw new HyracksDataException(e); - } - } -} diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/HadoopTools.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/HadoopTools.java deleted file mode 100644 index fb86385..0000000 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/HadoopTools.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.dataflow.hadoop.mapreduce; - -public class HadoopTools { - public static Object newInstance(String className) throws ClassNotFoundException, InstantiationException, - IllegalAccessException { - ClassLoader ctxCL = Thread.currentThread().getContextClassLoader(); - try { - Thread.currentThread().setContextClassLoader(HadoopTools.class.getClassLoader()); - Class clazz = Class.forName(className, true, HadoopTools.class.getClassLoader()); - return newInstance(clazz); - } finally { - Thread.currentThread().setContextClassLoader(ctxCL); - } - } - - public static Object newInstance(Class clazz) throws InstantiationException, IllegalAccessException { - return clazz.newInstance(); - } -} diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/HashPartitioningShuffleConnectorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/HashPartitioningShuffleConnectorDescriptor.java deleted file mode 100644 index 4892935..0000000 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/HashPartitioningShuffleConnectorDescriptor.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.dataflow.hadoop.mapreduce; - -import java.util.BitSet; - -import org.apache.hadoop.conf.Configuration; - -import org.apache.hyracks.api.comm.IFrameReader; -import org.apache.hyracks.api.comm.IFrameWriter; -import org.apache.hyracks.api.comm.IPartitionCollector; -import org.apache.hyracks.api.comm.IPartitionWriterFactory; -import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory; -import org.apache.hyracks.api.dataflow.value.RecordDescriptor; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.job.IConnectorDescriptorRegistry; -import org.apache.hyracks.dataflow.std.base.AbstractMToNConnectorDescriptor; -import org.apache.hyracks.dataflow.std.collectors.NonDeterministicChannelReader; -import org.apache.hyracks.dataflow.std.collectors.PartitionCollector; -import org.apache.hyracks.dataflow.std.connectors.PartitionDataWriter; - -public class HashPartitioningShuffleConnectorDescriptor extends AbstractMToNConnectorDescriptor { - private static final long serialVersionUID = 1L; - - private final MarshalledWritable mConfig; - - public HashPartitioningShuffleConnectorDescriptor(IConnectorDescriptorRegistry spec, MarshalledWritable mConfig) { - super(spec); - this.mConfig = mConfig; - } - - @Override - public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc, - IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions) - throws HyracksDataException { - HadoopHelper helper = new HadoopHelper(mConfig); - ITuplePartitionComputerFactory tpcf = helper.getTuplePartitionComputer(); - return new PartitionDataWriter(ctx, nConsumerPartitions, edwFactory, recordDesc, tpcf.createPartitioner()); - } - - @Override - public IPartitionCollector createPartitionCollector(IHyracksTaskContext ctx, RecordDescriptor recordDesc, - int receiverIndex, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException { - BitSet expectedPartitions = new BitSet(); - expectedPartitions.set(0, nProducerPartitions); - NonDeterministicChannelReader channelReader = new NonDeterministicChannelReader(nProducerPartitions, - expectedPartitions); - IFrameReader frameReader = new ShuffleFrameReader(ctx, channelReader, mConfig); - return new PartitionCollector(ctx, getConnectorId(), receiverIndex, expectedPartitions, frameReader, - channelReader); - } -} diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/IInputSplitProvider.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/IInputSplitProvider.java deleted file mode 100644 index 9cafaea..0000000 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/IInputSplitProvider.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.dataflow.hadoop.mapreduce; - -import org.apache.hadoop.mapreduce.InputSplit; - -import org.apache.hyracks.api.exceptions.HyracksDataException; - -public interface IInputSplitProvider { - public InputSplit next() throws HyracksDataException; -} diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/IInputSplitProviderFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/IInputSplitProviderFactory.java deleted file mode 100644 index 87f7cd7..0000000 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/IInputSplitProviderFactory.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.dataflow.hadoop.mapreduce; - -import java.io.Serializable; - -import org.apache.hyracks.api.exceptions.HyracksDataException; - -public interface IInputSplitProviderFactory extends Serializable { - public IInputSplitProvider createInputSplitProvider(int id) throws HyracksDataException; -} diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/InputFileSplit.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/InputFileSplit.java deleted file mode 100644 index d0e6ce2..0000000 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/InputFileSplit.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.dataflow.hadoop.mapreduce; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.lib.input.FileSplit; - -public class InputFileSplit extends InputSplit implements Writable { - private Path file; - private long start; - private long length; - private int blockId; - private String[] hosts; - private long scheduleTime; - - public InputFileSplit() { - } - - public InputFileSplit(int blockId, Path file, long start, long length, String[] hosts, long schedule_time) { - this.blockId = blockId; - this.file = file; - this.start = start; - this.length = length; - this.hosts = hosts; - this.scheduleTime = schedule_time; - } - - public int blockId() { - return blockId; - } - - public long scheduleTime() { - return this.scheduleTime; - } - - public Path getPath() { - return file; - } - - /** The position of the first byte in the file to process. */ - public long getStart() { - return start; - } - - /** The number of bytes in the file to process. */ - @Override - public long getLength() { - return length; - } - - @Override - public String toString() { - return file + ":" + start + "+" + length; - } - - @Override - public void write(DataOutput out) throws IOException { - Text.writeString(out, file.toString()); - out.writeLong(start); - out.writeLong(length); - out.writeInt(blockId); - out.writeLong(this.scheduleTime); - } - - @Override - public void readFields(DataInput in) throws IOException { - file = new Path(Text.readString(in)); - start = in.readLong(); - length = in.readLong(); - hosts = null; - this.blockId = in.readInt(); - this.scheduleTime = in.readLong(); - } - - @Override - public String[] getLocations() throws IOException { - if (this.hosts == null) { - return new String[] {}; - } else { - return this.hosts; - } - } - - public FileSplit toFileSplit() { - return new FileSplit(file, start, length, hosts); - } -} diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/KVIterator.java b/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/KVIterator.java deleted file mode 100644 index 3f02279..0000000 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/KVIterator.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.dataflow.hadoop.mapreduce; - -import java.io.IOException; -import java.util.List; - -import org.apache.hadoop.io.DataInputBuffer; -import org.apache.hadoop.mapred.RawKeyValueIterator; -import org.apache.hadoop.util.Progress; - -import org.apache.hyracks.api.comm.IFrame; -import org.apache.hyracks.api.dataflow.value.RecordDescriptor; -import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; - -public class KVIterator implements RawKeyValueIterator { - private final HadoopHelper helper; - private FrameTupleAccessor accessor; - private DataInputBuffer kBuffer; - private DataInputBuffer vBuffer; - private List