From hcatalog-commits-return-763-apmail-incubator-hcatalog-commits-archive=incubator.apache.org@incubator.apache.org Tue Mar 20 19:10:23 2012 Return-Path: X-Original-To: apmail-incubator-hcatalog-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-hcatalog-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 508349104 for ; Tue, 20 Mar 2012 19:10:23 +0000 (UTC) Received: (qmail 18342 invoked by uid 500); 20 Mar 2012 19:10:22 -0000 Delivered-To: apmail-incubator-hcatalog-commits-archive@incubator.apache.org Received: (qmail 18310 invoked by uid 500); 20 Mar 2012 19:10:22 -0000 Mailing-List: contact hcatalog-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hcatalog-dev@incubator.apache.org Delivered-To: mailing list hcatalog-commits@incubator.apache.org Received: (qmail 18300 invoked by uid 99); 20 Mar 2012 19:10:22 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 20 Mar 2012 19:10:22 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 20 Mar 2012 19:10:11 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 67AD62388847; Tue, 20 Mar 2012 19:09:48 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1303100 - in /incubator/hcatalog/trunk: ./ src/java/org/apache/hcatalog/data/transfer/ src/java/org/apache/hcatalog/data/transfer/impl/ src/java/org/apache/hcatalog/data/transfer/state/ src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/u... Date: Tue, 20 Mar 2012 19:09:47 -0000 To: hcatalog-commits@incubator.apache.org From: hashutosh@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120320190948.67AD62388847@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: hashutosh Date: Tue Mar 20 19:09:46 2012 New Revision: 1303100 URL: http://svn.apache.org/viewvc?rev=1303100&view=rev Log: HCATALOG-287 : Add data api to HCatalog (hashutosh) Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/DataTransferFactory.java incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/EntityBase.java incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/HCatReader.java incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/HCatWriter.java incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/ReadEntity.java incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/ReaderContext.java incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/WriteEntity.java incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/WriterContext.java incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/impl/ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/impl/HCatInputFormatReader.java incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/impl/HCatOutputFormatWriter.java incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/state/ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/state/DefaultStateProvider.java incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/state/StateProvider.java incubator/hcatalog/trunk/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/DataReaderMaster.java incubator/hcatalog/trunk/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/DataReaderSlave.java incubator/hcatalog/trunk/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/DataWriterMaster.java incubator/hcatalog/trunk/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/DataWriterSlave.java incubator/hcatalog/trunk/src/test/org/apache/hcatalog/data/TestReaderWriter.java Modified: incubator/hcatalog/trunk/CHANGES.txt Modified: incubator/hcatalog/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1303100&r1=1303099&r2=1303100&view=diff ============================================================================== --- incubator/hcatalog/trunk/CHANGES.txt (original) +++ incubator/hcatalog/trunk/CHANGES.txt Tue Mar 20 19:09:46 2012 @@ -23,6 +23,7 @@ Trunk (unreleased changes) INCOMPATIBLE CHANGES NEW FEATURES + HCAT-287 Add data api to HCatalog (hashutosh) IMPROVEMENTS HCAT-233 gitignore file (enis via gates) Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/DataTransferFactory.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/DataTransferFactory.java?rev=1303100&view=auto ============================================================================== --- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/DataTransferFactory.java (added) +++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/DataTransferFactory.java Tue Mar 20 19:09:46 2012 @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.data.transfer; + +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hcatalog.data.transfer.impl.HCatInputFormatReader; +import org.apache.hcatalog.data.transfer.impl.HCatOutputFormatWriter; +import org.apache.hcatalog.data.transfer.state.DefaultStateProvider; +import org.apache.hcatalog.data.transfer.state.StateProvider; + +/** Use this factory to get instances of {@link HCatReader} or {@link HCatWriter} at master and slave nodes. + */ + +public class DataTransferFactory { + + /** + * This should be called once from master node to obtain an instance of {@link HCatReader} + * @param re built using {@link ReadEntity.Builder} + * @param config Any configuration which master node wants to pass to HCatalog + * @return {@link HCatReader} + */ + public static HCatReader getHCatReader(final ReadEntity re, final Map config) { + // In future, this may examine ReadEntity and/or config to return appropriate HCatReader + return new HCatInputFormatReader(re, config); + } + + /** + * This should only be called once from every slave nodes to obtain an instance of {@link HCatReader} + * @param split obtained at master node. + * @param config obtained at master node. + * @return {@link HCatReader} + */ + public static HCatReader getHCatReader(final InputSplit split, final Configuration config) { + // In future, this may examine config to return appropriate HCatReader + return getHCatReader(split, config, DefaultStateProvider.get()); + } + + /** + * This should only be called once from every slave nodes to obtain an instance of {@link HCatReader} + * This should be called if external system has some state to provide to HCatalog + * @param split obtained at master node. + * @param config obtained at master node. + * @param sp + * @return {@link HCatReader} + */ + public static HCatReader getHCatReader(final InputSplit split, final Configuration config, StateProvider sp) { + // In future, this may examine config to return appropriate HCatReader + return new HCatInputFormatReader(split, config, sp); + } + + /** This should be called at master node to obtain an instance of {@link HCatWriter} + * @param we built using {@link WriteEntity.Builder} + * @param config Any configuration which master wants to pass to HCatalog + * @return {@link HCatWriter} + */ + public static HCatWriter getHCatWriter(final WriteEntity we, final Map config) { + // In future, this may examine WriteEntity and/or config to return appropriate HCatWriter + return new HCatOutputFormatWriter(we, config); + } + + /** This should be called at slave nodes to obtain an instance of {@link HCatWriter} + * @param info {@link WriterContext} obtained at master node. + * @return {@link HCatWriter} + */ + public static HCatWriter getHCatWriter(final WriterContext cntxt) { + // In future, this may examine context to return appropriate HCatWriter + return getHCatWriter(cntxt, DefaultStateProvider.get()); + } + + /** This should be called at slave nodes to obtain an instance of {@link HCatWriter} + * If external system has some mechanism for providing state to HCatalog, this constructor + * can be used. + * @param info {@link WriterContext} obtained at master node. + * @param sp {@link StateProvider} + * @return {@link HCatWriter} + */ + public static HCatWriter getHCatWriter(final WriterContext cntxt, final StateProvider sp) { + // In future, this may examine context to return appropriate HCatWriter + return new HCatOutputFormatWriter(cntxt.getConf(), sp); + } +} Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/EntityBase.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/EntityBase.java?rev=1303100&view=auto ============================================================================== --- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/EntityBase.java (added) +++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/EntityBase.java Tue Mar 20 19:09:46 2012 @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.data.transfer; + +import java.util.Map; + +/** This is a base class for {@link ReadEntity.Builder} / {@link WriteEntity.Builder}. Many fields in them are common, + * so this class contains the common fields. + */ + +abstract class EntityBase { + + String region; + String tableName; + String dbName; + Map partitionKVs; + + + + /** Common methods for {@link ReadEntity} and {@link WriteEntity} + */ + + abstract static class Entity extends EntityBase{ + + public String getRegion() { + return region; + } + public String getTableName() { + return tableName; + } + public String getDbName() { + return dbName; + } + public Map getPartitionKVs() { + return partitionKVs; + } + } +} Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/HCatReader.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/HCatReader.java?rev=1303100&view=auto ============================================================================== --- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/HCatReader.java (added) +++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/HCatReader.java Tue Mar 20 19:09:46 2012 @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.data.transfer; + +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hcatalog.common.HCatException; +import org.apache.hcatalog.data.HCatRecord; +import org.apache.hcatalog.data.transfer.state.StateProvider; + +/** This abstract class is internal to HCatalog and abstracts away the notion of + * underlying system from which reads will be done. + */ + +public abstract class HCatReader{ + + /** This should be called at master node to obtain {@link ReaderContext} which then should be + * serialized and sent to slave nodes. + * @return {@link ReaderContext} + * @throws HCatException + */ + public abstract ReaderContext prepareRead() throws HCatException; + + /** This should be called at slave nodes to read {@link HCatRecord}s + * @return {@link Iterator} of {@link HCatRecord} + * @throws HCatException + */ + public abstract Iterator read() throws HCatException; + + /** This constructor will be invoked by {@link DataTransferFactory} at master node. + * Don't use this constructor. Instead, use {@link DataTransferFactory} + * @param re + * @param config + */ + protected HCatReader(final ReadEntity re, final Map config) { + this(config); + this.re = re; + } + + /** This constructor will be invoked by {@link DataTransferFactory} at slave nodes. + * Don't use this constructor. Instead, use {@link DataTransferFactory} + * @param re + * @param config + * @param sp + */ + + protected HCatReader(final Configuration config, StateProvider sp) { + this.conf = config; + this.sp = sp; + } + + protected ReadEntity re; // This will be null at slaves. + protected Configuration conf; + protected ReaderContext info; + protected StateProvider sp; // This will be null at master. + + private HCatReader(final Map config) { + Configuration conf = new Configuration(); + if (null != config) { + for(Entry kv : config.entrySet()){ + conf.set(kv.getKey(), kv.getValue()); + } + } + this.conf = conf; + } + + public Configuration getConf() { + if (null == conf) { + throw new IllegalStateException("HCatReader is not constructed correctly."); + } + return conf; + } +} Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/HCatWriter.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/HCatWriter.java?rev=1303100&view=auto ============================================================================== --- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/HCatWriter.java (added) +++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/HCatWriter.java Tue Mar 20 19:09:46 2012 @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.data.transfer; + +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hcatalog.common.HCatException; +import org.apache.hcatalog.data.HCatRecord; +import org.apache.hcatalog.data.transfer.state.StateProvider; + +/** This abstraction is internal to HCatalog. This is to facilitate writing to HCatalog from external + * systems. Don't try to instantiate this directly. Instead, use {@link DataTransferFactory} + */ + +public abstract class HCatWriter { + + protected Configuration conf; + protected WriteEntity we; // This will be null at slave nodes. + protected WriterContext info; + protected StateProvider sp; + + /** External system should invoke this method exactly once from a master node. + * @return {@link WriterContext} This should be serialized and sent to slave nodes to + * construct HCatWriter there. + * @throws HCatException + */ + public abstract WriterContext prepareWrite() throws HCatException; + + /** This method should be used at slave needs to perform writes. + * @param {@link Iterator} records to be written into HCatalog. + * @throws {@link HCatException} + */ + public abstract void write(final Iterator recordItr) throws HCatException; + + /** This method should be called at master node. Primary purpose of this is to do metadata commit. + * @throws {@link HCatException} + */ + public abstract void commit(final WriterContext context) throws HCatException; + + /** This method should be called at master node. Primary purpose of this is to do cleanups in case + * of failures. + * @throws {@link HCatException} * + */ + public abstract void abort(final WriterContext context) throws HCatException; + + /** + * This constructor will be used at master node + * @param we WriteEntity defines where in storage records should be written to. + * @param config Any configuration which external system wants to communicate to HCatalog + * for performing writes. + */ + protected HCatWriter(final WriteEntity we, final Map config) { + this(config); + this.we = we; + } + + /** This constructor will be used at slave nodes. + * @param config + */ + protected HCatWriter(final Configuration config, final StateProvider sp) { + this.conf = config; + this.sp = sp; + } + + private HCatWriter(final Map config) { + Configuration conf = new Configuration(); + if(config != null){ + // user is providing config, so it could be null. + for(Entry kv : config.entrySet()){ + conf.set(kv.getKey(), kv.getValue()); + } + } + + this.conf = conf; + } +} Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/ReadEntity.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/ReadEntity.java?rev=1303100&view=auto ============================================================================== --- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/ReadEntity.java (added) +++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/ReadEntity.java Tue Mar 20 19:09:46 2012 @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.data.transfer; + +import java.util.Map; + +public class ReadEntity extends EntityBase.Entity{ + + private String filterString; + + /** Don't instantiate {@link ReadEntity} directly. Use, {@link ReadEntity.Builder} instead. + * + */ + private ReadEntity() { + // Not allowed + } + + private ReadEntity(Builder builder) { + + this.region = builder.region; + this.dbName = builder.dbName; + this.tableName = builder.tableName; + this.partitionKVs = builder.partitionKVs; + this.filterString = builder.filterString; + } + + public String getFilterString() { + return this.filterString; + } + + /** This class should be used to build {@link ReadEntity}. It follows builder pattern, letting you build + * your {@link ReadEntity} with whatever level of detail you want. + * + */ + public static class Builder extends EntityBase { + + private String filterString; + + public Builder withRegion(final String region) { + this.region = region; + return this; + } + + + public Builder withDatabase(final String dbName) { + this.dbName = dbName; + return this; + } + + public Builder withTable(final String tblName) { + this.tableName = tblName; + return this; + } + + public Builder withPartition(final Map partKVs) { + this.partitionKVs = partKVs; + return this; + } + + public Builder withFilter(String filterString) { + this.filterString = filterString; + return this; + } + + public ReadEntity build() { + return new ReadEntity(this); + } + } +} \ No newline at end of file Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/ReaderContext.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/ReaderContext.java?rev=1303100&view=auto ============================================================================== --- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/ReaderContext.java (added) +++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/ReaderContext.java Tue Mar 20 19:09:46 2012 @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.data.transfer; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hcatalog.mapreduce.HCatSplit; + +/** This class will contain information of different {@link InputSplit} obtained at master node + * and configuration. This class implements {@link Externalizable} so it can be serialized using + * standard java mechanisms. + */ +public class ReaderContext implements Externalizable, Configurable { + + private static final long serialVersionUID = -2656468331739574367L; + private List splits; + private Configuration conf; + + public ReaderContext() { + this.splits = new ArrayList(); + this.conf = new Configuration(); + } + + public void setInputSplits(final List splits) { + this.splits = splits; + } + + public List getSplits() { + return splits; + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void setConf(final Configuration config) { + conf = config; + } + + @Override + public void writeExternal(ObjectOutput out) throws IOException { + conf.write(out); + out.writeInt(splits.size()); + for (InputSplit split : splits) { + ((HCatSplit)split).write(out); + } + } + + @Override + public void readExternal(ObjectInput in) throws IOException, + ClassNotFoundException { + conf.readFields(in); + int numOfSplits = in.readInt(); + for (int i=0 ; i < numOfSplits; i++) { + HCatSplit split = new HCatSplit(); + split.readFields(in); + splits.add(split); + } + } +} Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/WriteEntity.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/WriteEntity.java?rev=1303100&view=auto ============================================================================== --- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/WriteEntity.java (added) +++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/WriteEntity.java Tue Mar 20 19:09:46 2012 @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.data.transfer; + +import java.util.Map; + +public class WriteEntity extends EntityBase.Entity{ + + /** Don't instantiate {@link WriteEntity} directly. Use, {@link Builder} to build + * {@link WriteEntity}. + */ + + private WriteEntity() { + // Not allowed. + } + + private WriteEntity(Builder builder) { + this.region = builder.region; + this.dbName = builder.dbName; + this.tableName = builder.tableName; + this.partitionKVs = builder.partitionKVs; + } + + /** This class should be used to build {@link WriteEntity}. It follows builder pattern, letting you build + * your {@link WriteEntity} with whatever level of detail you want. + * + */ + public static class Builder extends EntityBase{ + + public Builder withRegion(final String region) { + this.region = region; + return this; + } + + public Builder withDatabase(final String dbName) { + this.dbName = dbName; + return this; + } + + public Builder withTable(final String tblName) { + this.tableName = tblName; + return this; + } + + public Builder withPartition(final Map partKVs) { + this.partitionKVs = partKVs; + return this; + } + + public WriteEntity build() { + return new WriteEntity(this); + } + + } +} Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/WriterContext.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/WriterContext.java?rev=1303100&view=auto ============================================================================== --- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/WriterContext.java (added) +++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/WriterContext.java Tue Mar 20 19:09:46 2012 @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.data.transfer; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; + +/** This contains information obtained at master node to help prepare slave nodes for writer. + * This class implements {@link Externalizable} so it can be serialized using + * standard java mechanisms. Master should serialize it and make it available to slaves to + * prepare for writes. + */ +public class WriterContext implements Externalizable, Configurable{ + + private static final long serialVersionUID = -5899374262971611840L; + private Configuration conf; + + public WriterContext() { + conf = new Configuration(); + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void setConf(final Configuration config) { + this.conf = config; + } + + @Override + public void writeExternal(ObjectOutput out) throws IOException { + conf.write(out); + } + + @Override + public void readExternal(ObjectInput in) throws IOException, + ClassNotFoundException { + conf.readFields(in); + } +} Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/impl/HCatInputFormatReader.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/impl/HCatInputFormatReader.java?rev=1303100&view=auto ============================================================================== --- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/impl/HCatInputFormatReader.java (added) +++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/impl/HCatInputFormatReader.java Tue Mar 20 19:09:46 2012 @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.data.transfer.impl; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hcatalog.common.ErrorType; +import org.apache.hcatalog.common.HCatException; +import org.apache.hcatalog.data.HCatRecord; +import org.apache.hcatalog.data.transfer.HCatReader; +import org.apache.hcatalog.data.transfer.ReadEntity; +import org.apache.hcatalog.data.transfer.ReaderContext; +import org.apache.hcatalog.data.transfer.state.StateProvider; +import org.apache.hcatalog.mapreduce.HCatInputFormat; +import org.apache.hcatalog.mapreduce.InputJobInfo; + +/** This reader reads via {@link HCatInputFormat} + * + */ +public class HCatInputFormatReader extends HCatReader{ + + private InputSplit split; + + public HCatInputFormatReader(InputSplit split, Configuration config, StateProvider sp) { + super(config, sp); + this.split = split; + } + + public HCatInputFormatReader(ReadEntity info, Map config) { + super(info,config); + } + + @Override + public ReaderContext prepareRead() throws HCatException { + + try { + Job job = new Job(conf); + InputJobInfo jobInfo = InputJobInfo.create(re.getDbName(), re.getTableName(), re.getFilterString()); + HCatInputFormat.setInput(job, jobInfo); + HCatInputFormat hcif = new HCatInputFormat(); + ReaderContext cntxt = new ReaderContext(); + cntxt.setInputSplits(hcif.getSplits(new JobContext(job.getConfiguration(), null))); + cntxt.setConf(job.getConfiguration()); + return cntxt; + } catch (IOException e) { + throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e); + } catch (InterruptedException e) { + throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED,e); + } + } + + @Override + public Iterator read() throws HCatException { + + HCatInputFormat inpFmt = new HCatInputFormat(); + RecordReader rr; + try { + TaskAttemptContext cntxt = new TaskAttemptContext(conf, new TaskAttemptID()); + rr = inpFmt.createRecordReader(split, cntxt); + rr.initialize(split, cntxt); + } catch (IOException e) { + throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e); + } catch (InterruptedException e) { + throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e); + } + return new HCatRecordItr(rr); + } + + + private static class HCatRecordItr implements Iterator{ + + private RecordReader curRecReader; + + HCatRecordItr(RecordReader rr) { + curRecReader = rr; + } + + @Override + public boolean hasNext(){ + try { + boolean retVal = curRecReader.nextKeyValue(); + if (retVal) { + return true; + } + // if its false, we need to close recordReader. + curRecReader.close(); + return false; + } catch (IOException e) { + throw new RuntimeException(e); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + @Override + public HCatRecord next() { + try { + return curRecReader.getCurrentValue(); + } catch (IOException e) { + throw new RuntimeException(e); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Not allowed"); + } + } +} Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/impl/HCatOutputFormatWriter.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/impl/HCatOutputFormatWriter.java?rev=1303100&view=auto ============================================================================== --- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/impl/HCatOutputFormatWriter.java (added) +++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/impl/HCatOutputFormatWriter.java Tue Mar 20 19:09:46 2012 @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.data.transfer.impl; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobStatus.State; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hcatalog.common.ErrorType; +import org.apache.hcatalog.common.HCatException; +import org.apache.hcatalog.data.HCatRecord; +import org.apache.hcatalog.data.transfer.HCatWriter; +import org.apache.hcatalog.data.transfer.WriteEntity; +import org.apache.hcatalog.data.transfer.WriterContext; +import org.apache.hcatalog.data.transfer.state.StateProvider; +import org.apache.hcatalog.mapreduce.HCatOutputFormat; +import org.apache.hcatalog.mapreduce.OutputJobInfo; + +/** This writer writes via {@link HCatOutputFormat} + * + */ +public class HCatOutputFormatWriter extends HCatWriter { + + public HCatOutputFormatWriter(WriteEntity we, Map config) { + super(we, config); + } + + public HCatOutputFormatWriter(Configuration config, StateProvider sp) { + super(config, sp); + } + + @Override + public WriterContext prepareWrite() throws HCatException { + OutputJobInfo jobInfo = OutputJobInfo.create(we.getDbName(), we.getTableName(), we.getPartitionKVs()); + Job job; + try { + job = new Job(conf); + HCatOutputFormat.setOutput(job, jobInfo); + HCatOutputFormat.setSchema(job, HCatOutputFormat.getTableSchema(job)); + HCatOutputFormat outFormat = new HCatOutputFormat(); + outFormat.checkOutputSpecs(job); + outFormat.getOutputCommitter(new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID())).setupJob(job); + } catch (IOException e) { + throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e); + } catch (InterruptedException e) { + throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e); + } + WriterContext cntxt = new WriterContext(); + cntxt.setConf(job.getConfiguration()); + return cntxt; + } + + @Override + public void write(Iterator recordItr) throws HCatException { + + int id = sp.getId(); + setVarsInConf(id); + HCatOutputFormat outFormat = new HCatOutputFormat(); + TaskAttemptContext cntxt = new TaskAttemptContext(conf, new TaskAttemptID(new TaskID(), id)); + OutputCommitter committer = null; + RecordWriter, HCatRecord> writer; + try { + committer = outFormat.getOutputCommitter(cntxt); + committer.setupTask(cntxt); + writer = outFormat.getRecordWriter(cntxt); + while(recordItr.hasNext()){ + HCatRecord rec = recordItr.next(); + writer.write(null, rec); + } + writer.close(cntxt); + if(committer.needsTaskCommit(cntxt)){ + committer.commitTask(cntxt); + } + } catch (IOException e) { + if(null != committer) { + try { + committer.abortTask(cntxt); + } catch (IOException e1) { + throw new HCatException(ErrorType.ERROR_INTERNAL_EXCEPTION, e1); + } + } + throw new HCatException("Failed while writing",e); + } catch (InterruptedException e) { + if(null != committer) { + try { + committer.abortTask(cntxt); + } catch (IOException e1) { + throw new HCatException(ErrorType.ERROR_INTERNAL_EXCEPTION, e1); + } + } + throw new HCatException("Failed while writing", e); + } + } + + @Override + public void commit(WriterContext context) throws HCatException { + try { + new HCatOutputFormat().getOutputCommitter(new TaskAttemptContext(context.getConf(), new TaskAttemptID())) + .commitJob(new JobContext(context.getConf(), null)); + } catch (IOException e) { + throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e); + } catch (InterruptedException e) { + throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e); + } + } + + @Override + public void abort(WriterContext context) throws HCatException { + try { + new HCatOutputFormat().getOutputCommitter(new TaskAttemptContext(context.getConf(), new TaskAttemptID())) + .abortJob(new JobContext(context.getConf(), null),State.FAILED); + } catch (IOException e) { + throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e); + } catch (InterruptedException e) { + throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e); + } + } + + private void setVarsInConf(int id) { + + // Following two config keys are required by FileOutputFormat to work correctly. + // In usual case of Hadoop, JobTracker will set these before launching tasks. + // Since there is no jobtracker here, we set it ourself. + conf.setInt("mapred.task.partition", id); + conf.set("mapred.task.id", "attempt__0000_r_000000_"+id); + } +} Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/state/DefaultStateProvider.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/state/DefaultStateProvider.java?rev=1303100&view=auto ============================================================================== --- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/state/DefaultStateProvider.java (added) +++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/state/DefaultStateProvider.java Tue Mar 20 19:09:46 2012 @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.data.transfer.state; + +import java.text.NumberFormat; +import java.util.Random; + + +public class DefaultStateProvider implements StateProvider { + + /** Default implementation. Here, ids are generated randomly. + */ + @Override + public int getId() { + + NumberFormat numberFormat = NumberFormat.getInstance(); + numberFormat.setMinimumIntegerDigits(5); + numberFormat.setGroupingUsed(false); + return Integer.parseInt(numberFormat.format(Math.abs(new Random().nextInt()))); + } + + private static StateProvider sp; + + public static synchronized StateProvider get() { + if (null == sp) { + sp = new DefaultStateProvider(); + } + return sp; + } +} Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/state/StateProvider.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/state/StateProvider.java?rev=1303100&view=auto ============================================================================== --- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/state/StateProvider.java (added) +++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/transfer/state/StateProvider.java Tue Mar 20 19:09:46 2012 @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.data.transfer.state; + +import org.apache.hadoop.mapred.JobTracker; +import org.apache.hadoop.mapred.TaskTracker; + +/** If external system wants to communicate any state to slaves, they can do so via this interface. + * One example of this in case of Map-Reduce is ids assigned by {@link JobTracker} to + * {@link TaskTracker} + */ +public interface StateProvider { + + /** This method should return id assigned to slave node. + * @return id + */ + public int getId(); +} Added: incubator/hcatalog/trunk/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/DataReaderMaster.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/DataReaderMaster.java?rev=1303100&view=auto ============================================================================== --- incubator/hcatalog/trunk/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/DataReaderMaster.java (added) +++ incubator/hcatalog/trunk/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/DataReaderMaster.java Tue Mar 20 19:09:46 2012 @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.utils; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.FileReader; +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; + +import org.apache.hcatalog.common.HCatException; +import org.apache.hcatalog.data.transfer.DataTransferFactory; +import org.apache.hcatalog.data.transfer.HCatReader; +import org.apache.hcatalog.data.transfer.ReadEntity; +import org.apache.hcatalog.data.transfer.ReaderContext; + +public class DataReaderMaster { + + public static void main(String[] args) throws FileNotFoundException, IOException { + + // This config contains all the configuration that master node wants to provide + // to the HCatalog. + Properties externalConfigs = new Properties(); + externalConfigs.load(new FileReader(args[0])); + Map config = new HashMap(); + + for (Entry kv : externalConfigs.entrySet()){ + config.put((String)kv.getKey(), (String)kv.getValue()); + } + + // This piece of code runs in master node and gets necessary context. + ReaderContext context = runsInMaster(config); + + ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream(new File(args[1]))); + oos.writeObject(context); + oos.flush(); + oos.close(); + // Master node will serialize readercontext and will make it available at slaves. + } + + private static ReaderContext runsInMaster(Map config) throws HCatException { + + ReadEntity.Builder builder = new ReadEntity.Builder(); + ReadEntity entity = builder.withTable(config.get("table")).build(); + HCatReader reader = DataTransferFactory.getHCatReader(entity, config); + ReaderContext cntxt = reader.prepareRead(); + return cntxt; + } +} Added: incubator/hcatalog/trunk/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/DataReaderSlave.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/DataReaderSlave.java?rev=1303100&view=auto ============================================================================== --- incubator/hcatalog/trunk/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/DataReaderSlave.java (added) +++ incubator/hcatalog/trunk/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/DataReaderSlave.java Tue Mar 20 19:09:46 2012 @@ -0,0 +1,44 @@ +package org.apache.hcatalog.utils; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileWriter; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hcatalog.data.HCatRecord; +import org.apache.hcatalog.data.transfer.DataTransferFactory; +import org.apache.hcatalog.data.transfer.HCatReader; +import org.apache.hcatalog.data.transfer.ReaderContext; + +public class DataReaderSlave { + + public static void main(String[] args) throws IOException, ClassNotFoundException { + + ObjectInputStream ois = new ObjectInputStream(new FileInputStream(new File(args[0]))); + ReaderContext cntxt = (ReaderContext) ois.readObject(); + ois.close(); + + String[] inpSlitsToRead = args[1].split(","); + List splits = cntxt.getSplits(); + + for (int i = 0; i < inpSlitsToRead.length; i++){ + InputSplit split = splits.get(Integer.parseInt(inpSlitsToRead[i])); + HCatReader reader = DataTransferFactory.getHCatReader(split, cntxt.getConf()); + Iterator itr = reader.read(); + File f = new File(args[2]+"-"+i); + f.delete(); + BufferedWriter outFile = new BufferedWriter(new FileWriter(f)); + while(itr.hasNext()){ + String rec = itr.next().toString().replaceFirst("\\s+$", ""); + System.err.println(rec); + outFile.write(rec+"\n"); + } + outFile.close(); + } + } +} Added: incubator/hcatalog/trunk/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/DataWriterMaster.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/DataWriterMaster.java?rev=1303100&view=auto ============================================================================== --- incubator/hcatalog/trunk/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/DataWriterMaster.java (added) +++ incubator/hcatalog/trunk/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/DataWriterMaster.java Tue Mar 20 19:09:46 2012 @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.utils; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.FileReader; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.Map.Entry; + +import javax.imageio.stream.FileImageInputStream; + +import org.apache.hcatalog.common.HCatException; +import org.apache.hcatalog.data.transfer.DataTransferFactory; +import org.apache.hcatalog.data.transfer.HCatWriter; +import org.apache.hcatalog.data.transfer.WriteEntity; +import org.apache.hcatalog.data.transfer.WriterContext; + +public class DataWriterMaster { + + public static void main(String[] args) throws FileNotFoundException, IOException, ClassNotFoundException { + + // This config contains all the configuration that master node wants to provide + // to the HCatalog. + Properties externalConfigs = new Properties(); + externalConfigs.load(new FileReader(args[0])); + Map config = new HashMap(); + + for (Entry kv : externalConfigs.entrySet()){ + System.err.println("k: " + kv.getKey() + "\t v: " + kv.getValue()); + config.put((String)kv.getKey(), (String)kv.getValue()); + } + + if(args.length == 3 && "commit".equalsIgnoreCase(args[2])){ + // Then, master commits if everything goes well. + ObjectInputStream ois = new ObjectInputStream(new FileInputStream(new File(args[1]))); + WriterContext cntxt = (WriterContext)ois.readObject(); + commit(config,true, cntxt); + System.exit(0); + } + // This piece of code runs in master node and gets necessary context. + WriterContext cntxt = runsInMaster(config); + + + // Master node will serialize writercontext and will make it available at slaves. + File f = new File(args[1]); + f.delete(); + ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream(f)); + oos.writeObject(cntxt); + oos.flush(); + oos.close(); + } + + private static WriterContext runsInMaster(Map config) throws HCatException { + + WriteEntity.Builder builder = new WriteEntity.Builder(); + WriteEntity entity = builder.withTable(config.get("table")).build(); + HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config); + WriterContext info = writer.prepareWrite(); + return info; + } + + private static void commit(Map config, boolean status, WriterContext cntxt) throws HCatException { + + WriteEntity.Builder builder = new WriteEntity.Builder(); + WriteEntity entity = builder.withTable(config.get("table")).build(); + HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config); + if(status){ + writer.commit(cntxt); + } else { + writer.abort(cntxt); + } + } +} Added: incubator/hcatalog/trunk/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/DataWriterSlave.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/DataWriterSlave.java?rev=1303100&view=auto ============================================================================== --- incubator/hcatalog/trunk/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/DataWriterSlave.java (added) +++ incubator/hcatalog/trunk/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/DataWriterSlave.java Tue Mar 20 19:09:46 2012 @@ -0,0 +1,69 @@ +package org.apache.hcatalog.utils; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.hcatalog.data.DefaultHCatRecord; +import org.apache.hcatalog.data.HCatRecord; +import org.apache.hcatalog.data.transfer.DataTransferFactory; +import org.apache.hcatalog.data.transfer.HCatWriter; +import org.apache.hcatalog.data.transfer.WriterContext; + +public class DataWriterSlave { + + public static void main(String[] args) throws FileNotFoundException, IOException, ClassNotFoundException { + + ObjectInputStream ois = new ObjectInputStream(new FileInputStream(args[0])); + WriterContext cntxt = (WriterContext) ois.readObject(); + ois.close(); + + HCatWriter writer = DataTransferFactory.getHCatWriter(cntxt); + writer.write(new HCatRecordItr(args[1])); + + } + + private static class HCatRecordItr implements Iterator { + + BufferedReader reader; + String curLine; + + public HCatRecordItr(String fileName) throws FileNotFoundException { + reader = new BufferedReader(new FileReader(new File(fileName))); + } + + @Override + public boolean hasNext() { + try { + curLine = reader.readLine(); + } catch (IOException e) { + e.printStackTrace(); + } + return null == curLine ? false : true; + } + + @Override + public HCatRecord next() { + + String[] fields = curLine.split("\t"); + List data = new ArrayList(3); + data.add(fields[0]); + data.add(Integer.parseInt(fields[1])); + data.add(Double.parseDouble(fields[2])); + return new DefaultHCatRecord(data); + } + + @Override + public void remove() { + // TODO Auto-generated method stub + + } + } +} Added: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/data/TestReaderWriter.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/data/TestReaderWriter.java?rev=1303100&view=auto ============================================================================== --- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/data/TestReaderWriter.java (added) +++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/data/TestReaderWriter.java Tue Mar 20 19:09:46 2012 @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.data; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.cli.CliSessionState; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.ql.CommandNeedRetryException; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hcatalog.common.HCatException; +import org.apache.hcatalog.data.transfer.DataTransferFactory; +import org.apache.hcatalog.data.transfer.HCatReader; +import org.apache.hcatalog.data.transfer.HCatWriter; +import org.apache.hcatalog.data.transfer.ReadEntity; +import org.apache.hcatalog.data.transfer.ReaderContext; +import org.apache.hcatalog.data.transfer.WriteEntity; +import org.apache.hcatalog.data.transfer.WriterContext; +import org.junit.Assert; +import org.junit.Test; + +public class TestReaderWriter { + + @Test + public void test() throws MetaException, CommandNeedRetryException, IOException, ClassNotFoundException { + + HiveConf conf = new HiveConf(getClass()); + Driver driver = new Driver(conf); + SessionState.start(new CliSessionState(conf)); + driver.run("drop table mytbl"); + driver.run("create table mytbl (a string, b int)"); + Iterator> itr = conf.iterator(); + Map map = new HashMap(); + while(itr.hasNext()){ + Entry kv = itr.next(); + map.put(kv.getKey(), kv.getValue()); + } + + WriterContext cntxt = runsInMaster(map); + + File writeCntxtFile = File.createTempFile("hcat-write", "temp"); + writeCntxtFile.deleteOnExit(); + + // Serialize context. + ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream(writeCntxtFile)); + oos.writeObject(cntxt); + oos.flush(); + oos.close(); + + // Now, deserialize it. + ObjectInputStream ois = new ObjectInputStream(new FileInputStream(writeCntxtFile)); + cntxt = (WriterContext) ois.readObject(); + ois.close(); + + runsInSlave(cntxt); + commit(map, true, cntxt); + + ReaderContext readCntxt = runsInMaster(map, false); + + File readCntxtFile = File.createTempFile("hcat-read", "temp"); + readCntxtFile.deleteOnExit(); + oos = new ObjectOutputStream(new FileOutputStream(readCntxtFile)); + oos.writeObject(readCntxt); + oos.flush(); + oos.close(); + + ois = new ObjectInputStream(new FileInputStream(readCntxtFile)); + readCntxt = (ReaderContext) ois.readObject(); + ois.close(); + + + for(InputSplit split : readCntxt.getSplits()){ + runsInSlave(split, readCntxt.getConf()); + } + } + + private WriterContext runsInMaster(Map config) throws HCatException { + + WriteEntity.Builder builder = new WriteEntity.Builder(); + WriteEntity entity = builder.withTable("mytbl").build(); + HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config); + WriterContext info = writer.prepareWrite(); + return info; + } + + private ReaderContext runsInMaster(Map config, boolean bogus) throws HCatException { + + ReadEntity.Builder builder = new ReadEntity.Builder(); + ReadEntity entity = builder.withTable("mytbl").build(); + HCatReader reader = DataTransferFactory.getHCatReader(entity, config); + ReaderContext cntxt = reader.prepareRead(); + return cntxt; + } + + private void runsInSlave(InputSplit split, Configuration config) throws HCatException { + + HCatReader reader = DataTransferFactory.getHCatReader(split, config); + Iterator itr = reader.read(); + int i = 1; + while(itr.hasNext()){ + HCatRecord read = itr.next(); + HCatRecord written = getRecord(i++); + // Argh, HCatRecord doesnt implement equals() + Assert.assertTrue("Read: " + read.get(0) + "Written: " + written.get(0), written.get(0).equals(read.get(0))); + Assert.assertTrue("Read: " + read.get(1) + "Written: " + written.get(1), written.get(1).equals(read.get(1))); + Assert.assertEquals(2, read.size()); + } + Assert.assertFalse(itr.hasNext()); + } + + private void runsInSlave(WriterContext context) throws HCatException { + + HCatWriter writer = DataTransferFactory.getHCatWriter(context); + writer.write(new HCatRecordItr()); + } + + private void commit(Map config, boolean status, WriterContext context) throws IOException { + + WriteEntity.Builder builder = new WriteEntity.Builder(); + WriteEntity entity = builder.withTable("mytbl").build(); + HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config); + if(status){ + writer.commit(context); + } else { + writer.abort(context); + } + } + + private static HCatRecord getRecord(int i) { + List list = new ArrayList(2); + list.add("Row #: " + i); + list.add(i); + return new DefaultHCatRecord(list); + } + + private static class HCatRecordItr implements Iterator { + + int i = 0; + @Override + public boolean hasNext() { + return i++ < 100 ? true : false; + } + + @Override + public HCatRecord next() { + return getRecord(i); + } + + @Override + public void remove() { + throw new RuntimeException(); + } + } +}