asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ima...@apache.org
Subject [2/2] asterixdb git commit: Subset hive-exec, make twitter4j a provided dep.
Date Fri, 13 Jan 2017 23:17:57 GMT
Subset hive-exec, make twitter4j a provided dep.

Change-Id: Iee4276f540ec8552181bfb452882654b5faa17df
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1427
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/5226ca87
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/5226ca87
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/5226ca87

Branch: refs/heads/master
Commit: 5226ca87421b3ade4f65be206b5eb291ea6c4667
Parents: 74271c0
Author: Ian Maxon <imaxon@apache.org>
Authored: Fri Jan 13 12:19:44 2017 -0800
Committer: Ian Maxon <imaxon@apache.org>
Committed: Fri Jan 13 15:17:30 2017 -0800

----------------------------------------------------------------------
 asterixdb/asterix-app/pom.xml                   |    2 -
 asterixdb/asterix-external-data/pom.xml         |   14 +-
 .../external/indexing/IndexingConstants.java    |    2 +-
 .../record/reader/hdfs/RCLookupReader.java      |    2 +-
 .../external/util/ExternalDataConstants.java    |    2 +-
 .../apache/asterix/external/util/HDFSUtils.java |    2 +-
 asterixdb/asterix-hivecompat/pom.xml            |   72 +
 .../apache/asterix/hivecompat/io/CodecPool.java |  164 ++
 .../hivecompat/io/InputFormatChecker.java       |   40 +
 .../hivecompat/io/NonSyncDataInputBuffer.java   |  512 +++++
 .../hivecompat/io/NonSyncDataOutputBuffer.java  |   88 +
 .../apache/asterix/hivecompat/io/RCFile.java    | 2049 ++++++++++++++++++
 .../hivecompat/io/RCFileInputFormat.java        |   83 +
 .../hivecompat/io/RCFileRecordReader.java       |  213 ++
 .../io/SchemaAwareCompressionInputStream.java   |   42 +
 .../io/SchemaAwareCompressionOutputStream.java  |   44 +
 asterixdb/asterix-yarn/pom.xml                  |    4 +-
 asterixdb/pom.xml                               |   45 +
 18 files changed, 3370 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5226ca87/asterixdb/asterix-app/pom.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml
index 0d991da..d4a9e5f 100644
--- a/asterixdb/asterix-app/pom.xml
+++ b/asterixdb/asterix-app/pom.xml
@@ -155,8 +155,6 @@
           </ignoredUsedUndeclaredDependencies>
           <usedDependencies>
             <usedDependency>org.apache.hadoop:hadoop-common</usedDependency>
-            <usedDependency>commons-lang:commons-lang</usedDependency>
-            <usedDependency>com.google.guava:guava</usedDependency>
             <usedDependency>org.apache.asterix:asterix-external-data</usedDependency>
           </usedDependencies>
         </configuration>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5226ca87/asterixdb/asterix-external-data/pom.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/pom.xml b/asterixdb/asterix-external-data/pom.xml
index da01510..c603571 100644
--- a/asterixdb/asterix-external-data/pom.xml
+++ b/asterixdb/asterix-external-data/pom.xml
@@ -229,14 +229,21 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.asterix</groupId>
+      <artifactId>asterix-hivecompat</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.twitter4j</groupId>
       <artifactId>twitter4j-core</artifactId>
       <version>4.0.3</version>
+      <scope>provided</scope>
     </dependency>
     <dependency>
       <groupId>org.twitter4j</groupId>
       <artifactId>twitter4j-stream</artifactId>
       <version>4.0.3</version>
+      <scope>provided</scope>
     </dependency>
     <dependency>
       <groupId>net.java.dev.rome</groupId>
@@ -256,7 +263,7 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hive</groupId>
-      <artifactId>hive-exec</artifactId>
+      <artifactId>hive-serde</artifactId>
       <version>0.13.0</version>
     </dependency>
     <dependency>
@@ -393,5 +400,10 @@
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-databind</artifactId>
     </dependency>
+    <dependency>
+      <groupId>commons-lang</groupId>
+      <artifactId>commons-lang</artifactId>
+      <version>2.5</version>
+    </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5226ca87/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingConstants.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingConstants.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingConstants.java
index f315950..a269144 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingConstants.java
@@ -41,7 +41,7 @@ public class IndexingConstants {
     public static final String EXTERNAL_FILE_INDEX_NAME_SUFFIX = "FilesIndex";
     public static final String KEY_INPUT_FORMAT = "input-format";
     public static final String INPUT_FORMAT_RC = "rc-input-format";
-    public static final String INPUT_FORMAT_RC_FULLY_QUALIFIED = "org.apache.hadoop.hive.ql.io.RCFileInputFormat";
+    public static final String INPUT_FORMAT_RC_FULLY_QUALIFIED = "org.apache.asterix.hivecompat.io.RCFileInputFormat";
 
     //Field Types
     public static final IAType FILE_NUMBER_FIELD_TYPE = BuiltinType.AINT32;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5226ca87/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/RCLookupReader.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/RCLookupReader.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/RCLookupReader.java
index 95d76ba..3f9d90e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/RCLookupReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/RCLookupReader.java
@@ -27,7 +27,7 @@ import org.apache.asterix.external.input.record.GenericRecord;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.io.RCFile.Reader;
+import org.apache.asterix.hivecompat.io.RCFile.Reader;
 import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5226ca87/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 881c498..a89d13e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -107,7 +107,7 @@ public class ExternalDataConstants {
      */
     public static final String CLASS_NAME_TEXT_INPUT_FORMAT = "org.apache.hadoop.mapred.TextInputFormat";
     public static final String CLASS_NAME_SEQUENCE_INPUT_FORMAT = "org.apache.hadoop.mapred.SequenceFileInputFormat";
-    public static final String CLASS_NAME_RC_INPUT_FORMAT = "org.apache.hadoop.hive.ql.io.RCFileInputFormat";
+    public static final String CLASS_NAME_RC_INPUT_FORMAT = "org.apache.asterix.hivecompat.io.RCFileInputFormat";
     public static final String CLASS_NAME_HDFS_FILESYSTEM = "org.apache.hadoop.hdfs.DistributedFileSystem";
     /**
      * input formats aliases

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5226ca87/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
index 42b582b..9556054 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
@@ -37,7 +37,7 @@ import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
+import org.apache.asterix.hivecompat.io.RCFileInputFormat;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5226ca87/asterixdb/asterix-hivecompat/pom.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-hivecompat/pom.xml b/asterixdb/asterix-hivecompat/pom.xml
new file mode 100644
index 0000000..1180763
--- /dev/null
+++ b/asterixdb/asterix-hivecompat/pom.xml
@@ -0,0 +1,72 @@
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements.  See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership.  The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License.  You may obtain a copy of the License at
+ !
+ !   http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied.  See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <artifactId>apache-asterixdb</artifactId>
+    <groupId>org.apache.asterix</groupId>
+    <version>0.8.9-SNAPSHOT</version>
+  </parent>
+  <licenses>
+    <license>
+      <name>Apache License, Version 2.0</name>
+      <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+      <distribution>repo</distribution>
+      <comments>A business-friendly OSS license</comments>
+    </license>
+  </licenses>
+  <artifactId>asterix-hivecompat</artifactId>
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-checkstyle-plugin</artifactId>
+        <configuration>
+          <excludes>**/hivecompat/**/*</excludes>
+        </configuration>
+       </plugin>
+    </plugins>
+  </build>
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-serde</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>commons-logging</groupId>
+      <artifactId>commons-logging-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hive.shims</groupId>
+      <artifactId>hive-shims-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-core</artifactId>
+    </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5226ca87/asterixdb/asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/CodecPool.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/CodecPool.java
b/asterixdb/asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/CodecPool.java
new file mode 100644
index 0000000..aab4c72
--- /dev/null
+++ b/asterixdb/asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/CodecPool.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.asterix.hivecompat.io;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+
+/**
+ * A global compressor/decompressor pool used to save and reuse (possibly
+ * native) compression/decompression codecs.
+ */
+public final class CodecPool {
+  private static final Log LOG = LogFactory.getLog(CodecPool.class);
+
+  /**
+   * A global compressor pool used to save the expensive
+   * construction/destruction of (possibly native) decompression codecs.
+   */
+  private static final Map<Class<Compressor>, List<Compressor>> COMPRESSOR_POOL
=
+      new HashMap<Class<Compressor>, List<Compressor>>();
+
+  /**
+   * A global decompressor pool used to save the expensive
+   * construction/destruction of (possibly native) decompression codecs.
+   */
+  private static final Map<Class<Decompressor>, List<Decompressor>> DECOMPRESSOR_POOL
=
+      new HashMap<Class<Decompressor>, List<Decompressor>>();
+
+  private static <T> T borrow(Map<Class<T>, List<T>> pool,
+      Class<? extends T> codecClass) {
+    T codec = null;
+
+    // Check if an appropriate codec is available
+    synchronized (pool) {
+      if (pool.containsKey(codecClass)) {
+        List<T> codecList = pool.get(codecClass);
+
+        if (codecList != null) {
+          synchronized (codecList) {
+            if (!codecList.isEmpty()) {
+              codec = codecList.remove(codecList.size() - 1);
+            }
+          }
+        }
+      }
+    }
+
+    return codec;
+  }
+
+  private static <T> void payback(Map<Class<T>, List<T>> pool, T
codec) {
+    if (codec != null) {
+      Class<T> codecClass = (Class<T>) codec.getClass();
+      synchronized (pool) {
+        if (!pool.containsKey(codecClass)) {
+          pool.put(codecClass, new ArrayList<T>());
+        }
+
+        List<T> codecList = pool.get(codecClass);
+        synchronized (codecList) {
+          codecList.add(codec);
+        }
+      }
+    }
+  }
+
+  /**
+   * Get a {@link Compressor} for the given {@link CompressionCodec} from the
+   * pool or a new one.
+   * 
+   * @param codec
+   *          the <code>CompressionCodec</code> for which to get the
+   *          <code>Compressor</code>
+   * @return <code>Compressor</code> for the given <code>CompressionCodec</code>
+   *         from the pool or a new one
+   */
+  public static Compressor getCompressor(CompressionCodec codec) {
+    Compressor compressor = borrow(COMPRESSOR_POOL, codec.getCompressorType());
+    if (compressor == null) {
+      compressor = codec.createCompressor();
+      LOG.info("Got brand-new compressor");
+    } else {
+      LOG.debug("Got recycled compressor");
+    }
+    return compressor;
+  }
+
+  /**
+   * Get a {@link Decompressor} for the given {@link CompressionCodec} from the
+   * pool or a new one.
+   * 
+   * @param codec
+   *          the <code>CompressionCodec</code> for which to get the
+   *          <code>Decompressor</code>
+   * @return <code>Decompressor</code> for the given
+   *         <code>CompressionCodec</code> the pool or a new one
+   */
+  public static Decompressor getDecompressor(CompressionCodec codec) {
+    Decompressor decompressor = borrow(DECOMPRESSOR_POOL, codec
+        .getDecompressorType());
+    if (decompressor == null) {
+      decompressor = codec.createDecompressor();
+      LOG.info("Got brand-new decompressor");
+    } else {
+      LOG.debug("Got recycled decompressor");
+    }
+    return decompressor;
+  }
+
+  /**
+   * Return the {@link Compressor} to the pool.
+   * 
+   * @param compressor
+   *          the <code>Compressor</code> to be returned to the pool
+   */
+  public static void returnCompressor(Compressor compressor) {
+    if (compressor == null) {
+      return;
+    }
+    compressor.reset();
+    payback(COMPRESSOR_POOL, compressor);
+  }
+
+  /**
+   * Return the {@link Decompressor} to the pool.
+   * 
+   * @param decompressor
+   *          the <code>Decompressor</code> to be returned to the pool
+   */
+  public static void returnDecompressor(Decompressor decompressor) {
+    if (decompressor == null) {
+      return;
+    }
+    decompressor.reset();
+    payback(DECOMPRESSOR_POOL, decompressor);
+  }
+
+  private CodecPool() {
+    // prevent instantiation
+  }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5226ca87/asterixdb/asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/InputFormatChecker.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/InputFormatChecker.java
b/asterixdb/asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/InputFormatChecker.java
new file mode 100644
index 0000000..4ac8a59
--- /dev/null
+++ b/asterixdb/asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/InputFormatChecker.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.asterix.hivecompat.io;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.conf.HiveConf;
+
+/**
+ * Check for validity of the input files.
+ */
+public interface InputFormatChecker {
+
+  /**
+   * This method is used to validate the input files.
+   * 
+   */
+  boolean validateInput(FileSystem fs, HiveConf conf,
+      ArrayList<FileStatus> files) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5226ca87/asterixdb/asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/NonSyncDataInputBuffer.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/NonSyncDataInputBuffer.java
b/asterixdb/asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/NonSyncDataInputBuffer.java
new file mode 100644
index 0000000..8cb890b
--- /dev/null
+++ b/asterixdb/asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/NonSyncDataInputBuffer.java
@@ -0,0 +1,512 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.asterix.hivecompat.io;
+
+import java.io.DataInput;
+import java.io.EOFException;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.PushbackInputStream;
+import java.io.UTFDataFormatException;
+
+import org.apache.hadoop.hive.common.io.NonSyncByteArrayInputStream;
+
+/**
+ * A thread-not-safe version of Hadoop's DataInputBuffer, which removes all
+ * synchronized modifiers.
+ */
+public class NonSyncDataInputBuffer extends FilterInputStream implements
+    DataInput {
+
+  private final NonSyncByteArrayInputStream buffer;
+
+  byte[] buff = new byte[16];
+
+  /** Constructs a new empty buffer. */
+  public NonSyncDataInputBuffer() {
+    this(new NonSyncByteArrayInputStream());
+  }
+
+  private NonSyncDataInputBuffer(NonSyncByteArrayInputStream buffer) {
+    super(buffer);
+    this.buffer = buffer;
+  }
+
+  /** Resets the data that the buffer reads. */
+  public void reset(byte[] input, int length) {
+    buffer.reset(input, 0, length);
+  }
+
+  /** Resets the data that the buffer reads. */
+  public void reset(byte[] input, int start, int length) {
+    buffer.reset(input, start, length);
+  }
+
+  /** Returns the current position in the input. */
+  public int getPosition() {
+    return buffer.getPosition();
+  }
+
+  /** Returns the length of the input. */
+  public int getLength() {
+    return buffer.getLength();
+  }
+
+  /**
+   * Reads bytes from the source stream into the byte array <code>buffer</code>.
+   * The number of bytes actually read is returned.
+   * 
+   * @param buffer
+   *          the buffer to read bytes into
+   * @return the number of bytes actually read or -1 if end of stream.
+   * 
+   * @throws IOException
+   *           If a problem occurs reading from this DataInputStream.
+   * 
+   */
+  @Override
+  public final int read(byte[] buffer) throws IOException {
+    return in.read(buffer, 0, buffer.length);
+  }
+
+  /**
+   * Read at most <code>length</code> bytes from this DataInputStream and stores
+   * them in byte array <code>buffer</code> starting at <code>offset</code>.
+   * Answer the number of bytes actually read or -1 if no bytes were read and
+   * end of stream was encountered.
+   * 
+   * @param buffer
+   *          the byte array in which to store the read bytes.
+   * @param offset
+   *          the offset in <code>buffer</code> to store the read bytes.
+   * @param length
+   *          the maximum number of bytes to store in <code>buffer</code>.
+   * @return the number of bytes actually read or -1 if end of stream.
+   * 
+   * @throws IOException
+   *           If a problem occurs reading from this DataInputStream.
+   * 
+   */
+  @Override
+  public final int read(byte[] buffer, int offset, int length)
+      throws IOException {
+    return in.read(buffer, offset, length);
+  }
+
+  /**
+   * Reads a boolean from this stream.
+   * 
+   * @return the next boolean value from the source stream.
+   * 
+   * @throws IOException
+   *           If a problem occurs reading from this DataInputStream.
+   * 
+   */
+  @Override
+  public final boolean readBoolean() throws IOException {
+    int temp = in.read();
+    if (temp < 0) {
+      throw new EOFException();
+    }
+    return temp != 0;
+  }
+
+  /**
+   * Reads an 8-bit byte value from this stream.
+   * 
+   * @return the next byte value from the source stream.
+   * 
+   * @throws IOException
+   *           If a problem occurs reading from this DataInputStream.
+   * 
+   */
+  @Override
+  public final byte readByte() throws IOException {
+    int temp = in.read();
+    if (temp < 0) {
+      throw new EOFException();
+    }
+    return (byte) temp;
+  }
+
+  /**
+   * Reads a 16-bit character value from this stream.
+   * 
+   * @return the next <code>char</code> value from the source stream.
+   * 
+   * @throws IOException
+   *           If a problem occurs reading from this DataInputStream.
+   * 
+   */
+  private int readToBuff(int count) throws IOException {
+    int offset = 0;
+
+    while (offset < count) {
+      int bytesRead = in.read(buff, offset, count - offset);
+      if (bytesRead == -1) {
+        return bytesRead;
+      }
+      offset += bytesRead;
+    }
+    return offset;
+  }
+
+  @Override
+  public final char readChar() throws IOException {
+    if (readToBuff(2) < 0) {
+      throw new EOFException();
+    }
+    return (char) (((buff[0] & 0xff) << 8) | (buff[1] & 0xff));
+
+  }
+
+  /**
+   * Reads a 64-bit <code>double</code> value from this stream.
+   * 
+   * @return the next <code>double</code> value from the source stream.
+   * 
+   * @throws IOException
+   *           If a problem occurs reading from this DataInputStream.
+   * 
+   */
+  @Override
+  public final double readDouble() throws IOException {
+    return Double.longBitsToDouble(readLong());
+  }
+
+  /**
+   * Reads a 32-bit <code>float</code> value from this stream.
+   * 
+   * @return the next <code>float</code> value from the source stream.
+   * 
+   * @throws IOException
+   *           If a problem occurs reading from this DataInputStream.
+   * 
+   */
+  @Override
+  public final float readFloat() throws IOException {
+    return Float.intBitsToFloat(readInt());
+  }
+
+  /**
+   * Reads bytes from this stream into the byte array <code>buffer</code>. This
+   * method will block until <code>buffer.length</code> number of bytes have
+   * been read.
+   * 
+   * @param buffer
+   *          to read bytes into
+   * 
+   * @throws IOException
+   *           If a problem occurs reading from this DataInputStream.
+   * 
+   */
+  @Override
+  public final void readFully(byte[] buffer) throws IOException {
+    readFully(buffer, 0, buffer.length);
+  }
+
+  /**
+   * Reads bytes from this stream and stores them in the byte array
+   * <code>buffer</code> starting at the position <code>offset</code>.
This
+   * method blocks until <code>count</code> bytes have been read.
+   * 
+   * @param buffer
+   *          the byte array into which the data is read
+   * @param offset
+   *          the offset the operation start at
+   * @param length
+   *          the maximum number of bytes to read
+   * 
+   * @throws IOException
+   *           if a problem occurs while reading from this stream
+   * @throws EOFException
+   *           if reaches the end of the stream before enough bytes have been
+   *           read
+   */
+  @Override
+  public final void readFully(byte[] buffer, int offset, int length)
+      throws IOException {
+    if (length < 0) {
+      throw new IndexOutOfBoundsException();
+    }
+    if (length == 0) {
+      return;
+    }
+    if (in == null || buffer == null) {
+      throw new NullPointerException("Null Pointer to underlying input stream");
+    }
+
+    if (offset < 0 || offset > buffer.length - length) {
+      throw new IndexOutOfBoundsException();
+    }
+    while (length > 0) {
+      int result = in.read(buffer, offset, length);
+      if (result < 0) {
+        throw new EOFException();
+      }
+      offset += result;
+      length -= result;
+    }
+  }
+
+  /**
+   * Reads a 32-bit integer value from this stream.
+   * 
+   * @return the next <code>int</code> value from the source stream.
+   * 
+   * @throws IOException
+   *           If a problem occurs reading from this DataInputStream.
+   * 
+   */
+  @Override
+  public final int readInt() throws IOException {
+    if (readToBuff(4) < 0) {
+      throw new EOFException();
+    }
+    return ((buff[0] & 0xff) << 24) | ((buff[1] & 0xff) << 16)
+        | ((buff[2] & 0xff) << 8) | (buff[3] & 0xff);
+  }
+
+  /**
+   * Answers a <code>String</code> representing the next line of text available
+   * in this BufferedReader. A line is represented by 0 or more characters
+   * followed by <code>'\n'</code>, <code>'\r'</code>, <code>"\n\r"</code>
or
+   * end of stream. The <code>String</code> does not include the newline
+   * sequence.
+   * 
+   * @return the contents of the line or null if no characters were read before
+   *         end of stream.
+   * 
+   * @throws IOException
+   *           If the DataInputStream is already closed or some other IO error
+   *           occurs.
+   * 
+   * @deprecated Use BufferedReader
+   */
+  @Deprecated
+  @Override
+  public final String readLine() throws IOException {
+    StringBuilder line = new StringBuilder(80); // Typical line length
+    boolean foundTerminator = false;
+    while (true) {
+      int nextByte = in.read();
+      switch (nextByte) {
+      case -1:
+        if (line.length() == 0 && !foundTerminator) {
+          return null;
+        }
+        return line.toString();
+      case (byte) '\r':
+        if (foundTerminator) {
+          ((PushbackInputStream) in).unread(nextByte);
+          return line.toString();
+        }
+        foundTerminator = true;
+        /* Have to be able to peek ahead one byte */
+        if (!(in.getClass() == PushbackInputStream.class)) {
+          in = new PushbackInputStream(in);
+        }
+        break;
+      case (byte) '\n':
+        return line.toString();
+      default:
+        if (foundTerminator) {
+          ((PushbackInputStream) in).unread(nextByte);
+          return line.toString();
+        }
+        line.append((char) nextByte);
+      }
+    }
+  }
+
+  /**
+   * Reads a 64-bit <code>long</code> value from this stream.
+   * 
+   * @return the next <code>long</code> value from the source stream.
+   * 
+   * @throws IOException
+   *           If a problem occurs reading from this DataInputStream.
+   * 
+   */
+  @Override
+  public final long readLong() throws IOException {
+    if (readToBuff(8) < 0) {
+      throw new EOFException();
+    }
+    int i1 = ((buff[0] & 0xff) << 24) | ((buff[1] & 0xff) << 16)
+        | ((buff[2] & 0xff) << 8) | (buff[3] & 0xff);
+    int i2 = ((buff[4] & 0xff) << 24) | ((buff[5] & 0xff) << 16)
+        | ((buff[6] & 0xff) << 8) | (buff[7] & 0xff);
+
+    return ((i1 & 0xffffffffL) << 32) | (i2 & 0xffffffffL);
+  }
+
+  /**
+   * Reads a 16-bit <code>short</code> value from this stream.
+   * 
+   * @return the next <code>short</code> value from the source stream.
+   * 
+   * @throws IOException
+   *           If a problem occurs reading from this DataInputStream.
+   * 
+   */
+  @Override
+  public final short readShort() throws IOException {
+    if (readToBuff(2) < 0) {
+      throw new EOFException();
+    }
+    return (short) (((buff[0] & 0xff) << 8) | (buff[1] & 0xff));
+  }
+
+  /**
+   * Reads an unsigned 8-bit <code>byte</code> value from this stream and
+   * returns it as an int.
+   * 
+   * @return the next unsigned byte value from the source stream.
+   * 
+   * @throws IOException
+   *           If a problem occurs reading from this DataInputStream.
+   * 
+   */
+  @Override
+  public final int readUnsignedByte() throws IOException {
+    int temp = in.read();
+    if (temp < 0) {
+      throw new EOFException();
+    }
+    return temp;
+  }
+
+  /**
+   * Reads a 16-bit unsigned <code>short</code> value from this stream and
+   * returns it as an int.
+   * 
+   * @return the next unsigned <code>short</code> value from the source stream.
+   * 
+   * @throws IOException
+   *           If a problem occurs reading from this DataInputStream.
+   * 
+   */
+  @Override
+  public final int readUnsignedShort() throws IOException {
+    if (readToBuff(2) < 0) {
+      throw new EOFException();
+    }
+    return (char) (((buff[0] & 0xff) << 8) | (buff[1] & 0xff));
+  }
+
+  /**
+   * Reads a UTF format String from this Stream.
+   * 
+   * @return the next UTF String from the source stream.
+   * 
+   * @throws IOException
+   *           If a problem occurs reading from this DataInputStream.
+   * 
+   */
+  @Override
+  public final String readUTF() throws IOException {
+    return decodeUTF(readUnsignedShort());
+  }
+
+  String decodeUTF(int utfSize) throws IOException {
+    return decodeUTF(utfSize, this);
+  }
+
+  private static String decodeUTF(int utfSize, DataInput in) throws IOException {
+    byte[] buf = new byte[utfSize];
+    char[] out = new char[utfSize];
+    in.readFully(buf, 0, utfSize);
+
+    return convertUTF8WithBuf(buf, out, 0, utfSize);
+  }
+
+  /**
+   * Reads a UTF format String from the DataInput Stream <code>in</code>.
+   * 
+   * @param in
+   *          the input stream to read from
+   * @return the next UTF String from the source stream.
+   * 
+   * @throws IOException
+   *           If a problem occurs reading from this DataInputStream.
+   * 
+   */
+  public static final String readUTF(DataInput in) throws IOException {
+    return decodeUTF(in.readUnsignedShort(), in);
+  }
+
+  /**
+   * Skips <code>count</code> number of bytes in this stream. Subsequent
+   * <code>read()</code>'s will not return these bytes unless
+   * <code>reset()</code> is used.
+   * 
+   * @param count
+   *          the number of bytes to skip.
+   * @return the number of bytes actually skipped.
+   * 
+   * @throws IOException
+   *           If the stream is already closed or another IOException occurs.
+   */
+  @Override
+  public final int skipBytes(int count) throws IOException {
+    int skipped = 0;
+    long skip;
+    while (skipped < count && (skip = in.skip(count - skipped)) != 0) {
+      skipped += skip;
+    }
+    if (skipped < 0) {
+      throw new EOFException();
+    }
+    return skipped;
+  }
+
+  public static String convertUTF8WithBuf(byte[] buf, char[] out, int offset,
+      int utfSize) throws UTFDataFormatException {
+    int count = 0, s = 0, a;
+    while (count < utfSize) {
+      if ((out[s] = (char) buf[offset + count++]) < '\u0080') {
+        s++;
+      } else if (((a = out[s]) & 0xe0) == 0xc0) {
+        if (count >= utfSize) {
+          throw new UTFDataFormatException();
+        }
+        int b = buf[count++];
+        if ((b & 0xC0) != 0x80) {
+          throw new UTFDataFormatException();
+        }
+        out[s++] = (char) (((a & 0x1F) << 6) | (b & 0x3F));
+      } else if ((a & 0xf0) == 0xe0) {
+        if (count + 1 >= utfSize) {
+          throw new UTFDataFormatException();
+        }
+        int b = buf[count++];
+        int c = buf[count++];
+        if (((b & 0xC0) != 0x80) || ((c & 0xC0) != 0x80)) {
+          throw new UTFDataFormatException();
+        }
+        out[s++] = (char) (((a & 0x0F) << 12) | ((b & 0x3F) << 6) | (c
& 0x3F));
+      } else {
+        throw new UTFDataFormatException();
+      }
+    }
+    return new String(out, 0, s);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5226ca87/asterixdb/asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/NonSyncDataOutputBuffer.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/NonSyncDataOutputBuffer.java
b/asterixdb/asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/NonSyncDataOutputBuffer.java
new file mode 100644
index 0000000..878f130
--- /dev/null
+++ b/asterixdb/asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/NonSyncDataOutputBuffer.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.asterix.hivecompat.io;
+
+import java.io.DataInput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.hive.common.io.NonSyncByteArrayOutputStream;
+
+/**
+ * A thread-not-safe version of Hadoop's DataOutputBuffer, which removes all
+ * synchronized modifiers.
+ */
+public class NonSyncDataOutputBuffer extends DataOutputStream {
+
+  private final NonSyncByteArrayOutputStream buffer;
+
+  /** Constructs a new empty buffer. */
+  public NonSyncDataOutputBuffer() {
+    this(new NonSyncByteArrayOutputStream());
+  }
+
+  private NonSyncDataOutputBuffer(NonSyncByteArrayOutputStream buffer) {
+    super(buffer);
+    this.buffer = buffer;
+  }
+
+  /**
+   * Returns the current contents of the buffer. Data is only valid to
+   * {@link #getLength()}.
+   */
+  public byte[] getData() {
+    return buffer.getData();
+  }
+
+  /** Returns the length of the valid data currently in the buffer. */
+  public int getLength() {
+    return buffer.getLength();
+  }
+
+  /** Resets the buffer to empty. */
+  public NonSyncDataOutputBuffer reset() {
+    written = 0;
+    buffer.reset();
+    return this;
+  }
+
+  /** Writes bytes from a DataInput directly into the buffer. */
+  public void write(DataInput in, int length) throws IOException {
+    buffer.write(in, length);
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    buffer.write(b);
+    incCount(1);
+  }
+
+  @Override
+  public void write(byte b[], int off, int len) throws IOException {
+    buffer.write(b, off, len);
+    incCount(len);
+  }
+
+  private void incCount(int value) {
+    if (written + value < 0) {
+      written = Integer.MAX_VALUE;
+    } else {
+      written += value;
+    }
+  }
+}


Mime
View raw message