Return-Path: X-Original-To: apmail-hive-commits-archive@www.apache.org Delivered-To: apmail-hive-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 2ED2A10652 for ; Thu, 10 Apr 2014 01:09:28 +0000 (UTC) Received: (qmail 3227 invoked by uid 500); 10 Apr 2014 01:09:27 -0000 Delivered-To: apmail-hive-commits-archive@hive.apache.org Received: (qmail 3192 invoked by uid 500); 10 Apr 2014 01:09:27 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 3183 invoked by uid 99); 10 Apr 2014 01:09:27 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 10 Apr 2014 01:09:27 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 10 Apr 2014 01:09:22 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id B1A6023889EC; Thu, 10 Apr 2014 01:09:02 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1586190 [1/2] - in /hive/branches/branch-0.13: hcatalog/ hcatalog/streaming/ hcatalog/streaming/src/ hcatalog/streaming/src/java/ hcatalog/streaming/src/java/org/ hcatalog/streaming/src/java/org/apache/ hcatalog/streaming/src/java/org/apac... Date: Thu, 10 Apr 2014 01:09:01 -0000 To: commits@hive.apache.org From: gates@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140410010902.B1A6023889EC@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: gates Date: Thu Apr 10 01:08:59 2014 New Revision: 1586190 URL: http://svn.apache.org/r1586190 Log: HIVE-5687 Streaming support in Hive (Roshan Naik via gates) Added: hive/branches/branch-0.13/hcatalog/streaming/ hive/branches/branch-0.13/hcatalog/streaming/pom.xml hive/branches/branch-0.13/hcatalog/streaming/src/ hive/branches/branch-0.13/hcatalog/streaming/src/java/ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ConnectionError.java hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HeartBeatFailure.java hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ImpersonationFailed.java hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidColumn.java hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidPartition.java hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidTable.java hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidTrasactionState.java hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/PartitionCreationFailed.java hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/QueryFailedException.java hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/RecordWriter.java hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/SerializationError.java hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingConnection.java hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingException.java hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingIOFailure.java hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionBatch.java hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionBatchUnAvailable.java hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionError.java hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/package.html hive/branches/branch-0.13/hcatalog/streaming/src/test/ hive/branches/branch-0.13/hcatalog/streaming/src/test/org/ hive/branches/branch-0.13/hcatalog/streaming/src/test/org/apache/ hive/branches/branch-0.13/hcatalog/streaming/src/test/org/apache/hive/ hive/branches/branch-0.13/hcatalog/streaming/src/test/org/apache/hive/hcatalog/ hive/branches/branch-0.13/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/ hive/branches/branch-0.13/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/StreamingIntegrationTester.java hive/branches/branch-0.13/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestDelimitedInputWriter.java hive/branches/branch-0.13/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java hive/branches/branch-0.13/hcatalog/streaming/src/test/sit Modified: hive/branches/branch-0.13/hcatalog/pom.xml hive/branches/branch-0.13/packaging/pom.xml hive/branches/branch-0.13/packaging/src/main/assembly/bin.xml Modified: hive/branches/branch-0.13/hcatalog/pom.xml URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/pom.xml?rev=1586190&r1=1586189&r2=1586190&view=diff ============================================================================== --- hive/branches/branch-0.13/hcatalog/pom.xml (original) +++ hive/branches/branch-0.13/hcatalog/pom.xml Thu Apr 10 01:08:59 2014 @@ -44,6 +44,7 @@ webhcat/java-client webhcat/svr storage-handlers/hbase + streaming Added: hive/branches/branch-0.13/hcatalog/streaming/pom.xml URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/pom.xml?rev=1586190&view=auto ============================================================================== --- hive/branches/branch-0.13/hcatalog/streaming/pom.xml (added) +++ hive/branches/branch-0.13/hcatalog/streaming/pom.xml Thu Apr 10 01:08:59 2014 @@ -0,0 +1,124 @@ + + + + + 4.0.0 + + org.apache.hive.hcatalog + hive-hcatalog + 0.14.0-SNAPSHOT + ../pom.xml + + + hive-hcatalog-streaming + jar + Hive HCatalog Streaming + + + ../.. + + + + + hadoop-1 + + + org.apache.hadoop + hadoop-core + true + + + + + hadoop-2 + + + org.apache.hadoop + hadoop-common + true + + + org.apache.hadoop + hadoop-mapreduce-client-core + true + + + + + + + + + + org.apache.hive + hive-serde + ${project.version} + + + org.apache.hive + hive-metastore + ${project.version} + + + org.apache.hive + hive-exec + ${project.version} + + + org.apache.hive + hive-cli + ${project.version} + + + org.apache.hive.hcatalog + hive-hcatalog-core + true + ${project.version} + + + + + junit + junit + ${junit.version} + test + + + + + + ${basedir}/src/java + ${basedir}/src/test + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + test-jar + + + + + + + + Added: hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java?rev=1586190&view=auto ============================================================================== --- hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java (added) +++ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java Thu Apr 10 01:08:59 2014 @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.hcatalog.streaming; + + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.io.AcidOutputFormat; +import org.apache.hadoop.hive.ql.io.RecordUpdater; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.thrift.TException; + +import java.io.IOException; + +import java.util.Random; + +abstract class AbstractRecordWriter implements RecordWriter { + static final private Log LOG = LogFactory.getLog(AbstractRecordWriter.class.getName()); + + final HiveConf conf; + final HiveEndPoint endPoint; + final Table tbl; + + final HiveMetaStoreClient msClient; + RecordUpdater updater = null; + + private final int totalBuckets; + private Random rand = new Random(); + private int currentBucketId = 0; + private final Path partitionPath; + + final AcidOutputFormat outf; + + protected AbstractRecordWriter(HiveEndPoint endPoint, HiveConf conf) + throws ConnectionError, StreamingException { + this.endPoint = endPoint; + this.conf = conf!=null ? conf + : HiveEndPoint.createHiveConf(DelimitedInputWriter.class, endPoint.metaStoreUri); + try { + msClient = new HiveMetaStoreClient(conf); + this.tbl = msClient.getTable(endPoint.database, endPoint.table); + this.partitionPath = getPathForEndPoint(msClient, endPoint); + this.totalBuckets = tbl.getSd().getNumBuckets(); + if(totalBuckets <= 0) { + throw new StreamingException("Cannot stream to table that has not been bucketed : " + + endPoint); + } + String outFormatName = this.tbl.getSd().getOutputFormat(); + outf = (AcidOutputFormat) ReflectionUtils.newInstance(Class.forName(outFormatName), conf); + } catch (MetaException e) { + throw new ConnectionError(endPoint, e); + } catch (NoSuchObjectException e) { + throw new ConnectionError(endPoint, e); + } catch (TException e) { + throw new StreamingException(e.getMessage(), e); + } catch (ClassNotFoundException e) { + throw new StreamingException(e.getMessage(), e); + } + } + + protected AbstractRecordWriter(HiveEndPoint endPoint) + throws ConnectionError, StreamingException { + this(endPoint, HiveEndPoint.createHiveConf(AbstractRecordWriter.class, endPoint.metaStoreUri) ); + } + + abstract SerDe getSerde() throws SerializationError; + + @Override + public void flush() throws StreamingIOFailure { + try { + updater.flush(); + } catch (IOException e) { + throw new StreamingIOFailure("Unable to flush recordUpdater", e); + } + } + + @Override + public void clear() throws StreamingIOFailure { + } + + /** + * Creates a new record updater for the new batch + * @param minTxnId smallest Txnid in the batch + * @param maxTxnID largest Txnid in the batch + * @throws StreamingIOFailure if failed to create record updater + */ + @Override + public void newBatch(Long minTxnId, Long maxTxnID) + throws StreamingIOFailure, SerializationError { + try { + this.currentBucketId = rand.nextInt(totalBuckets); + LOG.debug("Creating Record updater"); + updater = createRecordUpdater(currentBucketId, minTxnId, maxTxnID); + } catch (IOException e) { + LOG.error("Failed creating record updater", e); + throw new StreamingIOFailure("Unable to get new record Updater", e); + } + } + + @Override + public void closeBatch() throws StreamingIOFailure { + try { + updater.close(false); + updater = null; + } catch (IOException e) { + throw new StreamingIOFailure("Unable to close recordUpdater", e); + } + } + + private RecordUpdater createRecordUpdater(int bucketId, Long minTxnId, Long maxTxnID) + throws IOException, SerializationError { + try { + return outf.getRecordUpdater(partitionPath, + new AcidOutputFormat.Options(conf) + .inspector(getSerde().getObjectInspector()) + .bucket(bucketId) + .minimumTransactionId(minTxnId) + .maximumTransactionId(maxTxnID)); + } catch (SerDeException e) { + throw new SerializationError("Failed to get object inspector from Serde " + + getSerde().getClass().getName(), e); + } + } + + private Path getPathForEndPoint(HiveMetaStoreClient msClient, HiveEndPoint endPoint) + throws StreamingException { + try { + String location; + if(endPoint.partitionVals==null || endPoint.partitionVals.isEmpty() ) { + location = msClient.getTable(endPoint.database,endPoint.table) + .getSd().getLocation(); + } else { + location = msClient.getPartition(endPoint.database, endPoint.table, + endPoint.partitionVals).getSd().getLocation(); + } + return new Path(location); + } catch (TException e) { + throw new StreamingException(e.getMessage() + + ". Unable to get path for end point: " + + endPoint.partitionVals, e); + } + } +} Added: hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ConnectionError.java URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ConnectionError.java?rev=1586190&view=auto ============================================================================== --- hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ConnectionError.java (added) +++ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ConnectionError.java Thu Apr 10 01:08:59 2014 @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.hcatalog.streaming; + +public class ConnectionError extends StreamingException { + + public ConnectionError(String msg, Exception innerEx) { + super(msg, innerEx); + } + + public ConnectionError(HiveEndPoint endPoint, Exception innerEx) { + super("Error connecting to " + endPoint, innerEx); + } +} Added: hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java?rev=1586190&view=auto ============================================================================== --- hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java (added) +++ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java Thu Apr 10 01:08:59 2014 @@ -0,0 +1,270 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.hcatalog.streaming; + + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; +import org.apache.hadoop.io.BytesWritable; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +/** + * Streaming Writer handles delimited input (eg. CSV). + * Delimited input is parsed & reordered to match column order in table + * Uses Lazy Simple Serde to process delimited input + */ +public class DelimitedInputWriter extends AbstractRecordWriter { + private final boolean reorderingNeeded; + private String delimiter; + private char serdeSeparator; + private int[] fieldToColMapping; + private final ArrayList tableColumns; + private AbstractSerDe serde = null; + + static final private Log LOG = LogFactory.getLog(DelimitedInputWriter.class.getName()); + + /** Constructor. Uses default separator of the LazySimpleSerde + * @param colNamesForFields Column name assignment for input fields. nulls or empty + * strings in the array indicates the fields to be skipped + * @param delimiter input field delimiter + * @param endPoint Hive endpoint + * @throws ConnectionError Problem talking to Hive + * @throws ClassNotFoundException Serde class not found + * @throws SerializationError Serde initialization/interaction failed + * @throws StreamingException Problem acquiring file system path for partition + * @throws InvalidColumn any element in colNamesForFields refers to a non existing column + */ + public DelimitedInputWriter(String[] colNamesForFields, String delimiter, + HiveEndPoint endPoint) + throws ClassNotFoundException, ConnectionError, SerializationError, + InvalidColumn, StreamingException { + this(colNamesForFields, delimiter, endPoint, null); + } + + /** Constructor. Uses default separator of the LazySimpleSerde + * @param colNamesForFields Column name assignment for input fields. nulls or empty + * strings in the array indicates the fields to be skipped + * @param delimiter input field delimiter + * @param endPoint Hive endpoint + * @param conf a Hive conf object. Can be null if not using advanced hive settings. + * @throws ConnectionError Problem talking to Hive + * @throws ClassNotFoundException Serde class not found + * @throws SerializationError Serde initialization/interaction failed + * @throws StreamingException Problem acquiring file system path for partition + * @throws InvalidColumn any element in colNamesForFields refers to a non existing column + */ + public DelimitedInputWriter(String[] colNamesForFields, String delimiter, + HiveEndPoint endPoint, HiveConf conf) + throws ClassNotFoundException, ConnectionError, SerializationError, + InvalidColumn, StreamingException { + this(colNamesForFields, delimiter, endPoint, conf, + (char) LazySimpleSerDe.DefaultSeparators[0]); + } + + /** + * Constructor. Allows overriding separator of the LazySimpleSerde + * @param colNamesForFields Column name assignment for input fields + * @param delimiter input field delimiter + * @param endPoint Hive endpoint + * @param conf a Hive conf object. Set to null if not using advanced hive settings. + * @param serdeSeparator separator used when encoding data that is fed into the + * LazySimpleSerde. Ensure this separator does not occur + * in the field data + * @throws ConnectionError Problem talking to Hive + * @throws ClassNotFoundException Serde class not found + * @throws SerializationError Serde initialization/interaction failed + * @throws StreamingException Problem acquiring file system path for partition + * @throws InvalidColumn any element in colNamesForFields refers to a non existing column + */ + public DelimitedInputWriter(String[] colNamesForFields, String delimiter, + HiveEndPoint endPoint, HiveConf conf, char serdeSeparator) + throws ClassNotFoundException, ConnectionError, SerializationError, + InvalidColumn, StreamingException { + super(endPoint, conf); + this.tableColumns = getCols(tbl); + this.serdeSeparator = serdeSeparator; + this.delimiter = delimiter; + this.fieldToColMapping = getFieldReordering(colNamesForFields, getTableColumns()); + this.reorderingNeeded = isReorderingNeeded(delimiter, getTableColumns()); + LOG.debug("Field reordering needed = " + this.reorderingNeeded + ", for endpoint " + endPoint); + this.serdeSeparator = serdeSeparator; + } + + private boolean isReorderingNeeded(String delimiter, ArrayList tableColumns) { + return !( delimiter.equals(String.valueOf(getSerdeSeparator())) + && areFieldsInColOrder(fieldToColMapping) + && tableColumns.size()>=fieldToColMapping.length ); + } + + private static boolean areFieldsInColOrder(int[] fieldToColMapping) { + for(int i=0; i tableColNames) + throws InvalidColumn { + int[] result = new int[ colNamesForFields.length ]; + for(int i=0; itableColNames.size()) { + throw new InvalidColumn("Number of field names exceeds the number of columns in table"); + } + return result; + } + + // Reorder fields in record based on the order of columns in the table + protected byte[] reorderFields(byte[] record) throws UnsupportedEncodingException { + if(!reorderingNeeded) { + return record; + } + String[] reorderedFields = new String[getTableColumns().size()]; + String decoded = new String(record); + String[] fields = decoded.split(delimiter); + for (int i=0; i getTableColumns() { + return tableColumns; + } + + @Override + public void write(long transactionId, byte[] record) + throws SerializationError, StreamingIOFailure { + try { + byte[] orderedFields = reorderFields(record); + Object encodedRow = encode(orderedFields); + updater.insert(transactionId, encodedRow); + } catch (IOException e) { + throw new StreamingIOFailure("Error writing record in transaction (" + + transactionId + ")", e); + } + } + + @Override + SerDe getSerde() throws SerializationError { + if(serde!=null) { + return serde; + } + serde = createSerde(tbl, conf); + return serde; + } + + private Object encode(byte[] record) throws SerializationError { + try { + BytesWritable blob = new BytesWritable(); + blob.set(record, 0, record.length); + return serde.deserialize(blob); + } catch (SerDeException e) { + throw new SerializationError("Unable to convert byte[] record into Object", e); + } + } + + /** + * Creates LazySimpleSerde + * @return + * @throws SerializationError if serde could not be initialized + * @param tbl + */ + protected LazySimpleSerDe createSerde(Table tbl, HiveConf conf) + throws SerializationError { + try { + Properties tableProps = MetaStoreUtils.getTableMetadata(tbl); + tableProps.setProperty("field.delim", String.valueOf(serdeSeparator)); + LazySimpleSerDe serde = new LazySimpleSerDe(); + serde.initialize(conf, tableProps); + return serde; + } catch (SerDeException e) { + throw new SerializationError("Error initializing serde", e); + } + } + + private ArrayList getCols(Table table) { + List cols = table.getSd().getCols(); + ArrayList colNames = new ArrayList(cols.size()); + for (FieldSchema col : cols) { + colNames.add(col.getName().toLowerCase()); + } + return colNames; + } + + public char getSerdeSeparator() { + return serdeSeparator; + } +} Added: hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HeartBeatFailure.java URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HeartBeatFailure.java?rev=1586190&view=auto ============================================================================== --- hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HeartBeatFailure.java (added) +++ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HeartBeatFailure.java Thu Apr 10 01:08:59 2014 @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.hcatalog.streaming; + +import java.util.Collection; +import java.util.Set; + +public class HeartBeatFailure extends StreamingException { + private Collection abortedTxns; + private Collection nosuchTxns; + + public HeartBeatFailure(Collection abortedTxns, Set nosuchTxns) { + super("Heart beat error. InvalidTxns: " + nosuchTxns + ". AbortedTxns: " + abortedTxns); + this.abortedTxns = abortedTxns; + this.nosuchTxns = nosuchTxns; + } +} Added: hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java?rev=1586190&view=auto ============================================================================== --- hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java (added) +++ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java Thu Apr 10 01:08:59 2014 @@ -0,0 +1,823 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.hcatalog.streaming; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.cli.CliSessionState; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.LockComponentBuilder; +import org.apache.hadoop.hive.metastore.LockRequestBuilder; +import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse; +import org.apache.hadoop.hive.metastore.api.LockRequest; +import org.apache.hadoop.hive.metastore.api.LockResponse; +import org.apache.hadoop.hive.metastore.api.LockState; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.TxnAbortedException; +import org.apache.hadoop.hive.ql.CommandNeedRetryException; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.session.SessionState; + +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.thrift.TException; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +/** + * Information about the hive end point (i.e. table or partition) to write to. + * A light weight object that does NOT internally hold on to resources such as + * network connections. It can be stored in Hashed containers such as sets and hash tables. + */ +public class HiveEndPoint { + public final String metaStoreUri; + public final String database; + public final String table; + public final ArrayList partitionVals; + + + static final private Log LOG = LogFactory.getLog(HiveEndPoint.class.getName()); + + /** + * + * @param metaStoreUri URI of the metastore to connect to eg: thrift://localhost:9083 + * @param database Name of the Hive database + * @param table Name of table to stream to + * @param partitionVals Indicates the specific partition to stream to. Can be null or empty List + * if streaming to a table without partitions. The order of values in this + * list must correspond exactly to the order of partition columns specified + * during the table creation. E.g. For a table partitioned by + * (continent string, country string), partitionVals could be the list + * ("Asia", "India"). + */ + public HiveEndPoint(String metaStoreUri + , String database, String table, List partitionVals) { + this.metaStoreUri = metaStoreUri; + if (database==null) { + throw new IllegalArgumentException("Database cannot be null for HiveEndPoint"); + } + this.database = database; + this.table = table; + if (table==null) { + throw new IllegalArgumentException("Table cannot be null for HiveEndPoint"); + } + this.partitionVals = partitionVals==null ? new ArrayList() + : new ArrayList( partitionVals ); + } + + + /** + * Acquire a new connection to MetaStore for streaming + * @param createPartIfNotExists If true, the partition specified in the endpoint + * will be auto created if it does not exist + * @return + * @throws ConnectionError if problem connecting + * @throws InvalidPartition if specified partition is not valid (createPartIfNotExists = false) + * @throws ImpersonationFailed if not able to impersonate 'proxyUser' + * @throws IOException if there was an I/O error when acquiring connection + * @throws PartitionCreationFailed if failed to create partition + * @throws InterruptedException + */ + public StreamingConnection newConnection(final boolean createPartIfNotExists) + throws ConnectionError, InvalidPartition, InvalidTable, PartitionCreationFailed + , ImpersonationFailed , InterruptedException { + return newConnection(null, createPartIfNotExists, null); + } + + /** + * Acquire a new connection to MetaStore for streaming + * @param createPartIfNotExists If true, the partition specified in the endpoint + * will be auto created if it does not exist + * @param conf HiveConf object, set it to null if not using advanced hive settings. + * @return + * @throws ConnectionError if problem connecting + * @throws InvalidPartition if specified partition is not valid (createPartIfNotExists = false) + * @throws ImpersonationFailed if not able to impersonate 'proxyUser' + * @throws IOException if there was an I/O error when acquiring connection + * @throws PartitionCreationFailed if failed to create partition + * @throws InterruptedException + */ + public StreamingConnection newConnection(final boolean createPartIfNotExists, HiveConf conf) + throws ConnectionError, InvalidPartition, InvalidTable, PartitionCreationFailed + , ImpersonationFailed , InterruptedException { + return newConnection(null, createPartIfNotExists, conf); + } + + /** + * Acquire a new connection to MetaStore for streaming + * @param proxyUser User on whose behalf all hdfs and hive operations will be + * performed on this connection. Set it to null or empty string + * to connect as user of current process without impersonation. + * Currently this argument is not supported and must be null + * @param createPartIfNotExists If true, the partition specified in the endpoint + * will be auto created if it does not exist + * @return + * @throws ConnectionError if problem connecting + * @throws InvalidPartition if specified partition is not valid (createPartIfNotExists = false) + * @throws ImpersonationFailed if not able to impersonate 'proxyUser' + * @throws IOException if there was an I/O error when acquiring connection + * @throws PartitionCreationFailed if failed to create partition + * @throws InterruptedException + */ + private StreamingConnection newConnection(final String proxyUser, + final boolean createPartIfNotExists, final HiveConf conf) + throws ConnectionError, InvalidPartition, + InvalidTable, PartitionCreationFailed, ImpersonationFailed , InterruptedException { + if (proxyUser ==null || proxyUser.trim().isEmpty() ) { + return newConnectionImpl(System.getProperty("user.name"), null, createPartIfNotExists, conf); + } + final UserGroupInformation ugi = getUserGroupInfo(proxyUser); + try { + return ugi.doAs ( + new PrivilegedExceptionAction() { + @Override + public StreamingConnection run() + throws ConnectionError, InvalidPartition, InvalidTable + , PartitionCreationFailed { + return newConnectionImpl(proxyUser, ugi, createPartIfNotExists, conf); + } + } + ); + } catch (IOException e) { + throw new ImpersonationFailed("Failed to impersonate '" + proxyUser + + "' when acquiring connection", e); + } + } + + + + private StreamingConnection newConnectionImpl(String proxyUser, UserGroupInformation ugi, + boolean createPartIfNotExists, HiveConf conf) + throws ConnectionError, InvalidPartition, InvalidTable + , PartitionCreationFailed { + return new ConnectionImpl(this, proxyUser, ugi, conf, createPartIfNotExists); + } + + private static UserGroupInformation getUserGroupInfo(String proxyUser) + throws ImpersonationFailed { + try { + return UserGroupInformation.createProxyUser( + proxyUser, UserGroupInformation.getLoginUser()); + } catch (IOException e) { + LOG.error("Unable to login as proxy user. Exception follows.", e); + throw new ImpersonationFailed(proxyUser,e); + } + } + + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) { + return false; + } + + HiveEndPoint endPoint = (HiveEndPoint) o; + + if (database != null + ? !database.equals(endPoint.database) + : endPoint.database != null ) { + return false; + } + if (metaStoreUri != null + ? !metaStoreUri.equals(endPoint.metaStoreUri) + : endPoint.metaStoreUri != null ) { + return false; + } + if (!partitionVals.equals(endPoint.partitionVals)) { + return false; + } + if (table != null ? !table.equals(endPoint.table) : endPoint.table != null) { + return false; + } + + return true; + } + + @Override + public int hashCode() { + int result = metaStoreUri != null ? metaStoreUri.hashCode() : 0; + result = 31 * result + (database != null ? database.hashCode() : 0); + result = 31 * result + (table != null ? table.hashCode() : 0); + result = 31 * result + partitionVals.hashCode(); + return result; + } + + @Override + public String toString() { + return "{" + + "metaStoreUri='" + metaStoreUri + '\'' + + ", database='" + database + '\'' + + ", table='" + table + '\'' + + ", partitionVals=" + partitionVals + " }"; + } + + + private static class ConnectionImpl implements StreamingConnection { + private final IMetaStoreClient msClient; + private final HiveEndPoint endPt; + private final String proxyUser; + private final UserGroupInformation ugi; + + /** + * + * @param endPoint end point to connect to + * @param proxyUser can be null + * @param ugi of prody user. If ugi is null, impersonation of proxy user will be disabled + * @param conf HiveConf object + * @param createPart create the partition if it does not exist + * @throws ConnectionError if there is trouble connecting + * @throws InvalidPartition if specified partition does not exist (and createPart=false) + * @throws InvalidTable if specified table does not exist + * @throws PartitionCreationFailed if createPart=true and not able to create partition + */ + private ConnectionImpl(HiveEndPoint endPoint, String proxyUser, UserGroupInformation ugi, + HiveConf conf, boolean createPart) + throws ConnectionError, InvalidPartition, InvalidTable + , PartitionCreationFailed { + this.proxyUser = proxyUser; + this.endPt = endPoint; + this.ugi = ugi; + if (conf==null) { + conf = HiveEndPoint.createHiveConf(this.getClass(),endPoint.metaStoreUri); + } + this.msClient = getMetaStoreClient(endPoint, conf); + if (createPart && !endPoint.partitionVals.isEmpty()) { + createPartitionIfNotExists(endPoint, msClient, conf); + } + } + + /** + * Close connection + */ + @Override + public void close() { + if (ugi==null) { + msClient.close(); + return; + } + try { + ugi.doAs ( + new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + msClient.close(); + return null; + } + } ); + } catch (IOException e) { + LOG.error("Error closing connection to " + endPt, e); + } catch (InterruptedException e) { + LOG.error("Interrupted when closing connection to " + endPt, e); + } + } + + + /** + * Acquires a new batch of transactions from Hive. + * + * @param numTransactions is a hint from client indicating how many transactions client needs. + * @param recordWriter Used to write record. The same writer instance can + * be shared with another TransactionBatch (to the same endpoint) + * only after the first TransactionBatch has been closed. + * Writer will be closed when the TransactionBatch is closed. + * @return + * @throws StreamingIOFailure if failed to create new RecordUpdater for batch + * @throws TransactionBatchUnAvailable if failed to acquire a new Transaction batch + * @throws ImpersonationFailed failed to run command as proxyUser + * @throws InterruptedException + */ + public TransactionBatch fetchTransactionBatch(final int numTransactions, + final RecordWriter recordWriter) + throws StreamingException, TransactionBatchUnAvailable, ImpersonationFailed + , InterruptedException { + if (ugi==null) { + return fetchTransactionBatchImpl(numTransactions, recordWriter); + } + try { + return ugi.doAs ( + new PrivilegedExceptionAction() { + @Override + public TransactionBatch run() throws StreamingException { + return fetchTransactionBatchImpl(numTransactions, recordWriter); + } + } + ); + } catch (IOException e) { + throw new ImpersonationFailed("Failed impersonating proxy user '" + proxyUser + + "' when acquiring Transaction Batch on endPoint " + endPt, e); + } + } + + private TransactionBatch fetchTransactionBatchImpl(int numTransactions, + RecordWriter recordWriter) + throws StreamingException, TransactionBatchUnAvailable { + return new TransactionBatchImpl(proxyUser, ugi, endPt, numTransactions, msClient + , recordWriter); + } + + + private static void createPartitionIfNotExists(HiveEndPoint ep, + IMetaStoreClient msClient, HiveConf conf) + throws InvalidTable, PartitionCreationFailed { + if (ep.partitionVals.isEmpty()) { + return; + } + SessionState state = SessionState.start(new CliSessionState(conf)); + Driver driver = new Driver(conf); + + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Attempting to create partition (if not existent) " + ep); + } + + List partKeys = msClient.getTable(ep.database, ep.table) + .getPartitionKeys(); + runDDL(driver, "use " + ep.database); + String query = "alter table " + ep.table + " add if not exists partition " + + partSpecStr(partKeys, ep.partitionVals); + runDDL(driver, query); + } catch (MetaException e) { + LOG.error("Failed to create partition : " + ep, e); + throw new PartitionCreationFailed(ep, e); + } catch (NoSuchObjectException e) { + LOG.error("Failed to create partition : " + ep, e); + throw new InvalidTable(ep.database, ep.table); + } catch (TException e) { + LOG.error("Failed to create partition : " + ep, e); + throw new PartitionCreationFailed(ep, e); + } catch (QueryFailedException e) { + LOG.error("Failed to create partition : " + ep, e); + throw new PartitionCreationFailed(ep, e); + } finally { + driver.close(); + try { + state.close(); + } catch (IOException e) { + LOG.warn("Error closing SessionState used to run Hive DDL."); + } + } + } + + private static boolean runDDL(Driver driver, String sql) throws QueryFailedException { + int retryCount = 1; // # of times to retry if first attempt fails + for (int attempt=0; attempt<=retryCount; ++attempt) { + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Running Hive Query: "+ sql); + } + driver.run(sql); + return true; + } catch (CommandNeedRetryException e) { + if (attempt==retryCount) { + throw new QueryFailedException(sql, e); + } + continue; + } + } // for + return false; + } + + private static String partSpecStr(List partKeys, ArrayList partVals) { + if (partKeys.size()!=partVals.size()) { + throw new IllegalArgumentException("Partition values:" + partVals + + ", does not match the partition Keys in table :" + partKeys ); + } + StringBuffer buff = new StringBuffer(partKeys.size()*20); + buff.append(" ( "); + int i=0; + for (FieldSchema schema : partKeys) { + buff.append(schema.getName()); + buff.append("='"); + buff.append(partVals.get(i)); + buff.append("'"); + if (i!=partKeys.size()-1) { + buff.append(","); + } + ++i; + } + buff.append(" )"); + return buff.toString(); + } + + private static IMetaStoreClient getMetaStoreClient(HiveEndPoint endPoint, HiveConf conf) + throws ConnectionError { + + if (endPoint.metaStoreUri!= null) { + conf.setVar(HiveConf.ConfVars.METASTOREURIS, endPoint.metaStoreUri); + } + + try { + return new HiveMetaStoreClient(conf); + } catch (MetaException e) { + throw new ConnectionError("Error connecting to Hive Metastore URI: " + + endPoint.metaStoreUri, e); + } + } + + + } // class ConnectionImpl + + private static class TransactionBatchImpl implements TransactionBatch { + private final String proxyUser; + private final UserGroupInformation ugi; + private final HiveEndPoint endPt; + private final IMetaStoreClient msClient; + private final RecordWriter recordWriter; + private final List txnIds; + + private int currentTxnIndex; + private final String partNameForLock; + + private TxnState state; + private LockRequest lockRequest = null; + + /** + * Represents a batch of transactions acquired from MetaStore + * + * @param proxyUser + * @param ugi + * @param endPt + * @param numTxns + * @param msClient + * @param recordWriter + * @throws StreamingException if failed to create new RecordUpdater for batch + * @throws TransactionBatchUnAvailable if failed to acquire a new Transaction batch + */ + private TransactionBatchImpl(String proxyUser, UserGroupInformation ugi, HiveEndPoint endPt + , int numTxns, IMetaStoreClient msClient, RecordWriter recordWriter) + throws StreamingException, TransactionBatchUnAvailable { + try { + if ( endPt.partitionVals!=null && !endPt.partitionVals.isEmpty() ) { + Table tableObj = msClient.getTable(endPt.database, endPt.table); + List partKeys = tableObj.getPartitionKeys(); + partNameForLock = Warehouse.makePartName(partKeys, endPt.partitionVals); + } else { + partNameForLock = null; + } + this.proxyUser = proxyUser; + this.ugi = ugi; + this.endPt = endPt; + this.msClient = msClient; + this.recordWriter = recordWriter; + this.txnIds = msClient.openTxns(proxyUser, numTxns).getTxn_ids(); + this.currentTxnIndex = -1; + this.state = TxnState.INACTIVE; + recordWriter.newBatch(txnIds.get(0), txnIds.get(txnIds.size()-1)); + } catch (TException e) { + throw new TransactionBatchUnAvailable(endPt, e); + } + } + + @Override + public String toString() { + if (txnIds==null || txnIds.isEmpty()) { + return "{}"; + } + return "TxnIds=[" + txnIds.get(0) + "src/gen/thrift" + txnIds.get(txnIds.size()-1) + + "] on endPoint= " + endPt; + } + + /** + * Activate the next available transaction in the current transaction batch + * @throws TransactionError failed to switch to next transaction + */ + @Override + public void beginNextTransaction() throws TransactionError, ImpersonationFailed, + InterruptedException { + if (ugi==null) { + beginNextTransactionImpl(); + return; + } + try { + ugi.doAs ( + new PrivilegedExceptionAction() { + @Override + public Void run() throws TransactionError { + beginNextTransactionImpl(); + return null; + } + } + ); + } catch (IOException e) { + throw new ImpersonationFailed("Failed impersonating proxyUser '" + proxyUser + + "' when switch to next Transaction for endPoint :" + endPt, e); + } + } + + private void beginNextTransactionImpl() throws TransactionError { + if ( currentTxnIndex >= txnIds.size() ) + throw new InvalidTrasactionState("No more transactions available in" + + " current batch for end point : " + endPt); + ++currentTxnIndex; + lockRequest = createLockRequest(endPt, partNameForLock, proxyUser, getCurrentTxnId()); + try { + LockResponse res = msClient.lock(lockRequest); + if (res.getState() != LockState.ACQUIRED) { + throw new TransactionError("Unable to acquire lock on " + endPt); + } + } catch (TException e) { + throw new TransactionError("Unable to acquire lock on " + endPt, e); + } + + state = TxnState.OPEN; + } + + /** + * Get Id of currently open transaction + * @return + */ + @Override + public Long getCurrentTxnId() { + return txnIds.get(currentTxnIndex); + } + + /** + * get state of current tramsaction + * @return + */ + @Override + public TxnState getCurrentTransactionState() { + return state; + } + + /** + * Remaining transactions are the ones that are not committed or aborted or active. + * Active transaction is not considered part of remaining txns. + * @return number of transactions remaining this batch. + */ + @Override + public int remainingTransactions() { + if (currentTxnIndex>=0) { + return txnIds.size() - currentTxnIndex -1; + } + return txnIds.size(); + } + + + /** + * Write record using RecordWriter + * @param record the data to be written + * @throws StreamingIOFailure I/O failure + * @throws SerializationError serialization error + * @throws ImpersonationFailed error writing on behalf of proxyUser + * @throws InterruptedException + */ + @Override + public void write(final byte[] record) + throws StreamingException, InterruptedException, + ImpersonationFailed { + if (ugi==null) { + recordWriter.write(getCurrentTxnId(), record); + return; + } + try { + ugi.doAs ( + new PrivilegedExceptionAction() { + @Override + public Void run() throws StreamingException { + recordWriter.write(getCurrentTxnId(), record); + return null; + } + } + ); + } catch (IOException e) { + throw new ImpersonationFailed("Failed impersonating proxy user '" + proxyUser + + "' when writing to endPoint :" + endPt + ". Transaction Id: " + + getCurrentTxnId(), e); + } + } + + + /** + * Write records using RecordWriter + * @param records collection of rows to be written + * @throws StreamingException serialization error + * @throws ImpersonationFailed error writing on behalf of proxyUser + * @throws InterruptedException + */ + @Override + public void write(final Collection records) + throws StreamingException, InterruptedException, + ImpersonationFailed { + if (ugi==null) { + writeImpl(records); + return; + } + try { + ugi.doAs ( + new PrivilegedExceptionAction() { + @Override + public Void run() throws StreamingException { + writeImpl(records); + return null; + } + } + ); + } catch (IOException e) { + throw new ImpersonationFailed("Failed impersonating proxyUser '" + proxyUser + + "' when writing to endPoint :" + endPt + ". Transaction Id: " + + getCurrentTxnId(), e); + } + } + + private void writeImpl(Collection records) + throws StreamingException { + for (byte[] record : records) { + recordWriter.write(getCurrentTxnId(), record); + } + } + + + /** + * Commit the currently open transaction + * @throws TransactionError + * @throws StreamingIOFailure if flushing records failed + * @throws ImpersonationFailed if + * @throws InterruptedException + */ + @Override + public void commit() throws TransactionError, StreamingException, + ImpersonationFailed, InterruptedException { + if (ugi==null) { + commitImpl(); + return; + } + try { + ugi.doAs ( + new PrivilegedExceptionAction() { + @Override + public Void run() throws StreamingException { + commitImpl(); + return null; + } + } + ); + } catch (IOException e) { + throw new ImpersonationFailed("Failed impersonating proxy user '" + proxyUser + + "' when committing Txn on endPoint :" + endPt + ". Transaction Id: " + + getCurrentTxnId(), e); + } + + } + + private void commitImpl() throws TransactionError, StreamingException { + try { + recordWriter.flush(); + msClient.commitTxn(txnIds.get(currentTxnIndex)); + state = TxnState.COMMITTED; + } catch (NoSuchTxnException e) { + throw new TransactionError("Invalid transaction id : " + + getCurrentTxnId(), e); + } catch (TxnAbortedException e) { + throw new TransactionError("Aborted transaction cannot be committed" + , e); + } catch (TException e) { + throw new TransactionError("Unable to commit transaction" + + getCurrentTxnId(), e); + } + } + + /** + * Abort the currently open transaction + * @throws TransactionError + */ + @Override + public void abort() throws TransactionError, StreamingException + , ImpersonationFailed, InterruptedException { + if (ugi==null) { + abortImpl(); + return; + } + try { + ugi.doAs ( + new PrivilegedExceptionAction() { + @Override + public Void run() throws StreamingException { + abortImpl(); + return null; + } + } + ); + } catch (IOException e) { + throw new ImpersonationFailed("Failed impersonating proxy user '" + proxyUser + + "' when aborting Txn on endPoint :" + endPt + ". Transaction Id: " + + getCurrentTxnId(), e); + } + } + + private void abortImpl() throws TransactionError, StreamingException { + try { + recordWriter.clear(); + msClient.rollbackTxn(getCurrentTxnId()); + state = TxnState.ABORTED; + } catch (NoSuchTxnException e) { + throw new TransactionError("Unable to abort invalid transaction id : " + + getCurrentTxnId(), e); + } catch (TException e) { + throw new TransactionError("Unable to abort transaction id : " + + getCurrentTxnId(), e); + } + } + + @Override + public void heartbeat() throws StreamingException, HeartBeatFailure { + Long first = txnIds.get(currentTxnIndex); + Long last = txnIds.get(txnIds.size()-1); + try { + HeartbeatTxnRangeResponse resp = msClient.heartbeatTxnRange(first, last); + if (!resp.getAborted().isEmpty() || !resp.getNosuch().isEmpty()) { + throw new HeartBeatFailure(resp.getAborted(), resp.getNosuch()); + } + } catch (TException e) { + throw new StreamingException("Failure to heartbeat on ids (" + first + "src/gen/thrift" + + last + ") on end point : " + endPt ); + } + } + + /** + * Close the TransactionBatch + * @throws StreamingIOFailure I/O failure when closing transaction batch + */ + @Override + public void close() throws StreamingException, ImpersonationFailed, InterruptedException { + if (ugi==null) { + state = TxnState.INACTIVE; + recordWriter.closeBatch(); + return; + } + try { + ugi.doAs ( + new PrivilegedExceptionAction() { + @Override + public Void run() throws StreamingException { + state = TxnState.INACTIVE; + recordWriter.closeBatch(); + return null; + } + } + ); + } catch (IOException e) { + throw new ImpersonationFailed("Failed impersonating proxy user '" + proxyUser + + "' when closing Txn Batch on endPoint :" + endPt, e); + } + } + + private static LockRequest createLockRequest(final HiveEndPoint hiveEndPoint, + String partNameForLock, String user, long txnId) { + LockRequestBuilder rqstBuilder = new LockRequestBuilder(); + rqstBuilder.setUser(user); + rqstBuilder.setTransactionId(txnId); + + LockComponentBuilder lockCompBuilder = new LockComponentBuilder() + .setDbName(hiveEndPoint.database) + .setTableName(hiveEndPoint.table) + .setShared(); + if (partNameForLock!=null && !partNameForLock.isEmpty() ) { + lockCompBuilder.setPartitionName(partNameForLock); + } + rqstBuilder.addLockComponent(lockCompBuilder.build()); + + return rqstBuilder.build(); + } + } // class TransactionBatchImpl + + static HiveConf createHiveConf(Class clazz, String metaStoreUri) { + HiveConf conf = new HiveConf(clazz); + conf.setVar(HiveConf.ConfVars.HIVE_TXN_MANAGER, + "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"); + conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true); + conf.setBoolVar(HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI, true); + if (metaStoreUri!= null) { + conf.setVar(HiveConf.ConfVars.METASTOREURIS, metaStoreUri); + } + return conf; + } + +} // class HiveEndPoint Added: hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ImpersonationFailed.java URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ImpersonationFailed.java?rev=1586190&view=auto ============================================================================== --- hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ImpersonationFailed.java (added) +++ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ImpersonationFailed.java Thu Apr 10 01:08:59 2014 @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.hcatalog.streaming; + +public class ImpersonationFailed extends StreamingException { + public ImpersonationFailed(String username, Exception e) { + super("Failed to impersonate user " + username, e); + } +} Added: hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidColumn.java URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidColumn.java?rev=1586190&view=auto ============================================================================== --- hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidColumn.java (added) +++ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidColumn.java Thu Apr 10 01:08:59 2014 @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.hcatalog.streaming; + +public class InvalidColumn extends StreamingException { + + public InvalidColumn(String msg) { + super(msg); + } +} Added: hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidPartition.java URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidPartition.java?rev=1586190&view=auto ============================================================================== --- hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidPartition.java (added) +++ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidPartition.java Thu Apr 10 01:08:59 2014 @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.hcatalog.streaming; + +public class InvalidPartition extends StreamingException { + + public InvalidPartition(String partitionName, String partitionValue) { + super("Invalid partition: Name=" + partitionName + + ", Value=" + partitionValue); + } + +} Added: hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidTable.java URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidTable.java?rev=1586190&view=auto ============================================================================== --- hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidTable.java (added) +++ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidTable.java Thu Apr 10 01:08:59 2014 @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.hcatalog.streaming; + +public class InvalidTable extends StreamingException { + + private static String makeMsg(String db, String table) { + return "Invalid table db:" + db + ", table:" + table; + } + + public InvalidTable(String db, String table) { + super(makeMsg(db,table), null); + } +} Added: hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidTrasactionState.java URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidTrasactionState.java?rev=1586190&view=auto ============================================================================== --- hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidTrasactionState.java (added) +++ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidTrasactionState.java Thu Apr 10 01:08:59 2014 @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.hcatalog.streaming; + +public class InvalidTrasactionState extends TransactionError { + public InvalidTrasactionState(String msg) { + super(msg); + } + +} Added: hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/PartitionCreationFailed.java URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/PartitionCreationFailed.java?rev=1586190&view=auto ============================================================================== --- hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/PartitionCreationFailed.java (added) +++ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/PartitionCreationFailed.java Thu Apr 10 01:08:59 2014 @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.hcatalog.streaming; + +public class PartitionCreationFailed extends StreamingException { + public PartitionCreationFailed(HiveEndPoint endPoint, Exception cause) { + super("Failed to create partition " + endPoint, cause); + } +} Added: hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/QueryFailedException.java URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/QueryFailedException.java?rev=1586190&view=auto ============================================================================== --- hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/QueryFailedException.java (added) +++ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/QueryFailedException.java Thu Apr 10 01:08:59 2014 @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.hcatalog.streaming; + +import org.apache.hadoop.hive.ql.CommandNeedRetryException; + +public class QueryFailedException extends StreamingException { + String query; + public QueryFailedException(String query, CommandNeedRetryException e) { + super("Query failed: " + query + ". Due to :" + e.getMessage(), e); + this.query = query; + } +} Added: hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/RecordWriter.java URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/RecordWriter.java?rev=1586190&view=auto ============================================================================== --- hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/RecordWriter.java (added) +++ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/RecordWriter.java Thu Apr 10 01:08:59 2014 @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.hcatalog.streaming; + + +public interface RecordWriter { + + /** Writes using a hive RecordUpdater + * + * @param transactionId the ID of the Txn in which the write occurs + * @param record the record to be written + */ + public void write(long transactionId, byte[] record) throws StreamingException; + + /** Flush records from buffer. Invoked by TransactionBatch.commit() */ + public void flush() throws StreamingException; + + /** Clear bufferred writes. Invoked by TransactionBatch.abort() */ + public void clear() throws StreamingException; + + /** Acquire a new RecordUpdater. Invoked when + * StreamingConnection.fetchTransactionBatch() is called */ + public void newBatch(Long minTxnId, Long maxTxnID) throws StreamingException; + + /** Close the RecordUpdater. Invoked by TransactionBatch.close() */ + public void closeBatch() throws StreamingException; +} Added: hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/SerializationError.java URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/SerializationError.java?rev=1586190&view=auto ============================================================================== --- hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/SerializationError.java (added) +++ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/SerializationError.java Thu Apr 10 01:08:59 2014 @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.hcatalog.streaming; + + +public class SerializationError extends StreamingException { + public SerializationError(String msg, Exception e) { + super(msg,e); + } +} Added: hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingConnection.java URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingConnection.java?rev=1586190&view=auto ============================================================================== --- hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingConnection.java (added) +++ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingConnection.java Thu Apr 10 01:08:59 2014 @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.hcatalog.streaming; + +/** + * Represents a connection to a HiveEndPoint. Used to acquire transaction batches. + */ +public interface StreamingConnection { + + /** + * Acquires a new batch of transactions from Hive. + + * @param numTransactionsHint is a hint from client indicating how many transactions client needs. + * @param writer Used to write record. The same writer instance can + * be shared with another TransactionBatch (to the same endpoint) + * only after the first TransactionBatch has been closed. + * Writer will be closed when the TransactionBatch is closed. + * @return + * @throws ConnectionError + * @throws InvalidPartition + * @throws StreamingException + * @return a batch of transactions + */ + public TransactionBatch fetchTransactionBatch(int numTransactionsHint, + RecordWriter writer) + throws ConnectionError, StreamingException, InterruptedException; + + /** + * Close connection + */ + public void close(); + +} Added: hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingException.java URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingException.java?rev=1586190&view=auto ============================================================================== --- hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingException.java (added) +++ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingException.java Thu Apr 10 01:08:59 2014 @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.hcatalog.streaming; + +public class StreamingException extends Exception { + public StreamingException(String msg, Exception cause) { + super(msg, cause); + } + public StreamingException(String msg) { + super(msg); + } +} Added: hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingIOFailure.java URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingIOFailure.java?rev=1586190&view=auto ============================================================================== --- hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingIOFailure.java (added) +++ hive/branches/branch-0.13/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingIOFailure.java Thu Apr 10 01:08:59 2014 @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.hcatalog.streaming; + + +public class StreamingIOFailure extends StreamingException { + + public StreamingIOFailure(String msg, Exception cause) { + super(msg, cause); + } + + public StreamingIOFailure(String msg) { + super(msg); + } +}