drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s..@apache.org
Subject [2/4] drill git commit: DRILL-1512: Avro record reader
Date Wed, 15 Apr 2015 20:34:23 GMT
DRILL-1512: Avro record reader

Reader for Avro data files.

Supports:
- All primitive types
- Arrays
- Nested records
- Enums

Unimplemented:
- Endpoint affinity
- Recursive data types
- Complex types: Maps, Fixed, Unions


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/55a9a59d
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/55a9a59d
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/55a9a59d

Branch: refs/heads/master
Commit: 55a9a59d409ee93e42fcdbdc12afd8fb0caffe3f
Parents: 0f9887d
Author: Andrew Selden <andrew.selden@elasticsearch.com>
Authored: Fri Oct 10 16:41:59 2014 -0700
Committer: Steven Phillips <smp@apache.org>
Committed: Wed Apr 15 12:39:46 2015 -0700

----------------------------------------------------------------------
 exec/java-exec/pom.xml                          |  10 +
 .../drill/exec/store/avro/AvroFormatConfig.java |  39 +++
 .../drill/exec/store/avro/AvroFormatPlugin.java | 128 ++++++++
 .../drill/exec/store/avro/AvroGroupScan.java    | 208 ++++++++++++
 .../drill/exec/store/avro/AvroRecordReader.java | 324 +++++++++++++++++++
 .../exec/store/avro/AvroScanBatchCreator.java   |  52 +++
 .../drill/exec/store/avro/AvroSubScan.java      | 142 ++++++++
 .../drill/exec/store/avro/AvroTypeHelper.java   | 184 +++++++++++
 .../drill/exec/store/avro/MapOrListWriter.java  | 105 ++++++
 .../resources/bootstrap-storage-plugins.json    |   6 +
 .../drill/exec/store/avro/AvroFormatTest.java   | 129 ++++++++
 .../drill/exec/store/avro/AvroTestUtil.java     | 270 ++++++++++++++++
 .../resources/bootstrap-storage-plugins.json    |   3 +
 .../apache/drill/exec/proto/UserBitShared.java  |  16 +-
 .../exec/proto/beans/CoreOperatorType.java      |   4 +-
 protocol/src/main/protobuf/UserBitShared.proto  |   1 +
 16 files changed, 1617 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/55a9a59d/exec/java-exec/pom.xml
----------------------------------------------------------------------
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index fdd24ef..f5313ca 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -353,6 +353,16 @@
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-client</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro</artifactId>
+      <version>1.7.7</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro-mapred</artifactId>
+      <version>1.7.7</version>
+    </dependency>
   </dependencies>
 
   <profiles>

http://git-wip-us.apache.org/repos/asf/drill/blob/55a9a59d/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatConfig.java
new file mode 100644
index 0000000..4a2cfb9
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatConfig.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.avro;
+
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+import org.apache.drill.common.logical.FormatPluginConfig;
+
+/**
+ * Format plugin config for Avro data files.
+ */
+@JsonTypeName("avro")
+public class AvroFormatConfig implements FormatPluginConfig {
+
+  @Override
+  public int hashCode() {
+    return 101; // XXX - WHAT IS THIS SUPPOSED TO BE?
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    return obj instanceof AvroFormatConfig;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/55a9a59d/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
new file mode 100644
index 0000000..4fe1f71
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.avro;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.AbstractWriter;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import org.apache.drill.exec.store.dfs.BasicFormatMatcher;
+import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.FormatMatcher;
+import org.apache.drill.exec.store.dfs.FormatPlugin;
+import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Format plugin for Avro data files.
+ */
+public class AvroFormatPlugin implements FormatPlugin {
+
+  private final String name;
+  private final DrillbitContext context;
+  private final DrillFileSystem fs;
+  private final StoragePluginConfig storagePluginConfig;
+  private final AvroFormatConfig formatConfig;
+  private final BasicFormatMatcher matcher;
+
+  public AvroFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs,
+                          StoragePluginConfig storagePluginConfig) {
+    this(name, context, fs, storagePluginConfig, new AvroFormatConfig());
+  }
+
+  public AvroFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs,
+                          StoragePluginConfig storagePluginConfig, AvroFormatConfig formatConfig) {
+    this.name = name;
+    this.context = context;
+    this.fs = fs;
+    this.storagePluginConfig = storagePluginConfig;
+    this.formatConfig = formatConfig;
+
+    // XXX - What does 'compressible' mean in this context?
+    this.matcher = new BasicFormatMatcher(this, fs, Lists.newArrayList("avro"), false);
+  }
+
+  @Override
+  public boolean supportsRead() {
+    return true;
+  }
+
+  @Override
+  public boolean supportsWrite() {
+    return false;
+  }
+
+  @Override
+  public FormatMatcher getMatcher() {
+    return matcher;
+  }
+
+  @Override
+  public AbstractWriter getWriter(final PhysicalOperator child, final String location) throws IOException {
+    throw new UnsupportedOperationException("Unimplemented");
+  }
+
+  @Override
+  public AbstractGroupScan getGroupScan(final FileSelection selection) throws IOException {
+    return new AvroGroupScan(selection.getFileStatusList(fs), this, selection.selectionRoot, null);
+  }
+
+  @Override
+  public AbstractGroupScan getGroupScan(final FileSelection selection, final List<SchemaPath> columns) throws IOException {
+    return new AvroGroupScan(selection.getFileStatusList(fs), this, selection.selectionRoot, columns);
+  }
+
+  @Override
+  public Set<StoragePluginOptimizerRule> getOptimizerRules() {
+    return ImmutableSet.of();
+  }
+
+  @Override
+  public AvroFormatConfig getConfig() {
+    return formatConfig;
+  }
+
+  @Override
+  public StoragePluginConfig getStorageConfig() {
+    return storagePluginConfig;
+  }
+
+  @Override
+  public DrillFileSystem getFileSystem() {
+    return fs;
+  }
+
+  @Override
+  public DrillbitContext getContext() {
+    return context;
+  }
+
+  @Override
+  public String getName() {
+    return name;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/55a9a59d/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroGroupScan.java
new file mode 100644
index 0000000..fcc1f94
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroGroupScan.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.avro;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.PhysicalOperatorSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.dfs.ReadEntryWithPath;
+//import org.apache.drill.exec.store.avro.AvroSubScan.AvroSubScanSpec;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ * Group scan implementation for Avro data files.
+ */
+@JsonTypeName("avro-scan")
+public class AvroGroupScan extends AbstractGroupScan {
+
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AvroGroupScan.class);
+
+  private final AvroFormatPlugin formatPlugin;
+  private final AvroFormatConfig formatConfig;
+  private final List<SchemaPath> columns;
+  private final FileSystem fs;
+  private final List<ReadEntryWithPath> entries;
+  private final String selectionRoot;
+
+  //private Map<Integer, List<AvroSubScanSpec>> endpointMappings;
+
+  private List<EndpointAffinity> endpointAffinities;
+
+  @JsonCreator
+  public AvroGroupScan(@JsonProperty("entries") final List<ReadEntryWithPath> entries,
+                       @JsonProperty("storage") final StoragePluginConfig storageConfig,
+                       @JsonProperty("format") final FormatPluginConfig formatConfig,
+                       @JacksonInject final StoragePluginRegistry engineRegistry,
+                       @JsonProperty("columns") final List<SchemaPath> columns,
+                       @JsonProperty("selectionRoot") final String selectionRoot) throws ExecutionSetupException {
+
+    this.columns = columns;
+    final AvroFormatConfig afc;
+    if (formatConfig == null) {
+      afc = new AvroFormatConfig();
+    } else {
+      afc = (AvroFormatConfig) formatConfig;
+    }
+    Preconditions.checkNotNull(storageConfig);
+    Preconditions.checkNotNull(afc);
+    this.formatPlugin = (AvroFormatPlugin) engineRegistry.getFormatPlugin(storageConfig, afc);
+    Preconditions.checkNotNull(this.formatPlugin);
+    this.fs = formatPlugin.getFileSystem().getUnderlying();
+    this.formatConfig = formatPlugin.getConfig();
+    this.entries = entries;
+    this.selectionRoot = selectionRoot;
+  }
+
+  public AvroGroupScan(final List<FileStatus> files, final AvroFormatPlugin formatPlugin,
+                       final String selectionRoot, final List<SchemaPath> columns) throws IOException {
+
+    this.formatPlugin = formatPlugin;
+    this.columns = columns;
+    this.formatConfig = formatPlugin.getConfig();
+    this.fs = formatPlugin.getFileSystem().getUnderlying();
+    this.selectionRoot = selectionRoot;
+
+    this.entries = Lists.newArrayList();
+    for (final FileStatus fs : files) {
+      entries.add(new ReadEntryWithPath(fs.getPath().toString()));
+    }
+  }
+
+  @JsonProperty("format")
+  public AvroFormatConfig getFormatConfig() {
+    return this.formatConfig;
+  }
+
+  @JsonProperty("storage")
+  public StoragePluginConfig getEngineConfig() {
+    return this.formatPlugin.getStorageConfig();
+  }
+
+  private AvroGroupScan(final AvroGroupScan that, final List<SchemaPath> columns) {
+    this.columns = (columns == null) ? that.columns : columns;
+    this.entries = that.entries;
+    this.formatConfig = that.formatConfig;
+    this.formatPlugin = that.formatPlugin;
+    this.fs = that.fs;
+    this.selectionRoot = that.selectionRoot;
+
+    // XXX - DON'T FORGET TO ADD THESE AFTER WE'VE IMPLEMENTED AFFINITY
+    //this.endpointAffinities = that.endpointAffinities;
+    //this.mappings = that.mappings;
+    //this.rowCount = that.rowCount;
+    //this.rowGroupInfos = that.rowGroupInfos;
+    //this.columnValueCounts = that.columnValueCounts;
+  }
+
+  @Override
+  public void applyAssignments(final List<CoordinationProtos.DrillbitEndpoint> endpoints) throws PhysicalOperatorSetupException {
+    // XXX - Unimplemented
+    logger.warn("AvroGroupScan.applyAssignments() is not implemented");
+  }
+
+  @Override
+  public AvroSubScan getSpecificScan(final int minorFragmentId) throws ExecutionSetupException {
+
+    final AvroSubScan sub = new AvroSubScan(formatPlugin, columns, selectionRoot);
+
+    // XXX - This is a temporary hack just to get something working. Need to revisit sub-scan specs
+    //       once we work out affinity and endpoints.
+    sub.setEntry(entries.get(0));
+    sub.setFileSystem(fs);
+
+    return sub;
+  }
+
+  @Override
+  public int getMaxParallelizationWidth() {
+    // XXX - Finish
+    return 1;
+  }
+
+  @Override
+  public ScanStats getScanStats() {
+    // XXX - Is 0 the correct value for second arg? What if I don't know the row count a priori?
+    return new ScanStats(ScanStats.GroupScanProperty.NO_EXACT_ROW_COUNT, 0, 1, 1);
+  }
+
+  @Override
+  public List<EndpointAffinity> getOperatorAffinity() {
+    // XXX - Unimplemented
+    if (endpointAffinities != null) {
+      return endpointAffinities;
+    }
+    return Collections.emptyList();
+  }
+
+  @Override
+  @JsonIgnore
+  public PhysicalOperator getNewWithChildren(final List<PhysicalOperator> children) throws ExecutionSetupException {
+    Preconditions.checkArgument(children.isEmpty());
+    return new AvroGroupScan(this, null);
+  }
+
+  @Override
+  public String getDigest() {
+    return toString();
+  }
+
+  @Override
+  public String toString() {
+    return "AvroGroupScan [entries=" + entries +
+            ", selectionRoot=" + selectionRoot +
+            ", columns=" + columns + "]";
+  }
+
+  @Override
+  public GroupScan clone(final List<SchemaPath> columns) {
+    return new AvroGroupScan(this, columns);
+  }
+
+  @JsonIgnore
+  public boolean canPushdownProjects(final List<SchemaPath> columns) {
+    return true;
+  }
+
+  public List<ReadEntryWithPath> getEntries() {
+    return entries;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/55a9a59d/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
new file mode 100644
index 0000000..3b7697d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
@@ -0,0 +1,324 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.avro;
+
+import com.google.common.base.Stopwatch;
+
+import io.netty.buffer.DrillBuf;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.generic.GenericContainer;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.mapred.FsInput;
+import org.apache.avro.util.Utf8;
+
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.expr.holders.BigIntHolder;
+import org.apache.drill.exec.expr.holders.BitHolder;
+import org.apache.drill.exec.expr.holders.Float4Holder;
+import org.apache.drill.exec.expr.holders.Float8Holder;
+import org.apache.drill.exec.expr.holders.IntHolder;
+import org.apache.drill.exec.expr.holders.VarBinaryHolder;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A RecordReader implementation for Avro data files.
+ *
+ * @see RecordReader
+ */
+public class AvroRecordReader extends AbstractRecordReader {
+
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AvroRecordReader.class);
+
+  private final Path hadoop;
+  private DrillBuf buffer;
+  private VectorContainerWriter writer;
+
+  private DataFileReader<GenericContainer> reader = null;
+  private OperatorContext operatorContext;
+
+  private static final int DEFAULT_BATCH_SIZE = 1000;
+
+
+  public AvroRecordReader(final FragmentContext fragmentContext,
+                          final String inputPath,
+                          final FileSystem fileSystem,
+                          final List<SchemaPath> projectedColumns) {
+    this(fragmentContext, inputPath, fileSystem, projectedColumns, DEFAULT_BATCH_SIZE);
+  }
+
+  public AvroRecordReader(final FragmentContext fragmentContext,
+                          final String inputPath,
+                          final FileSystem fileSystem,
+                          List<SchemaPath> projectedColumns, final int defaultBatchSize) {
+
+    hadoop = new Path(inputPath);
+    buffer = fragmentContext.getManagedBuffer();
+
+    setColumns(projectedColumns);
+  }
+
+  @Override
+  public void setup(final OutputMutator output) throws ExecutionSetupException {
+
+    writer = new VectorContainerWriter(output);
+
+    try {
+      reader = new DataFileReader<>(new FsInput(hadoop, new Configuration()), new GenericDatumReader<GenericContainer>());
+    } catch (IOException e) {
+      throw new ExecutionSetupException(e);
+    }
+  }
+
+  @Override
+  public void setOperatorContext(OperatorContext operatorContext) {
+    this.operatorContext = operatorContext;
+  }
+
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public int next() {
+    final Stopwatch watch = new Stopwatch().start();
+
+    if (reader == null) {
+      throw new IllegalStateException("Avro reader is not open.");
+    }
+    if (!reader.hasNext()) {
+      return 0;
+    }
+
+    int recordCount = 0;
+    writer.allocate();
+    writer.reset();
+
+    try {
+
+      // XXX - Implement batch size
+
+      for (GenericContainer container = null; reader.hasNext(); recordCount++) {
+        writer.setPosition(recordCount);
+        container = reader.next(container);
+        processRecord(container, container.getSchema());
+      }
+
+      writer.setValueCount(recordCount);
+
+    } catch (IOException e) {
+      throw new DrillRuntimeException(e);
+    }
+
+    logger.debug("Read {} records in {} ms", recordCount, watch.elapsed(TimeUnit.MILLISECONDS));
+    return recordCount;
+  }
+
+  private void processRecord(final GenericContainer container, final Schema schema) {
+
+    final Schema.Type type = schema.getType();
+
+    switch (type) {
+      case RECORD:
+        process(container, schema, null, new MapOrListWriter(writer.rootAsMap()));
+        break;
+      default:
+        throw new DrillRuntimeException("Root object must be record type. Found: " + type);
+    }
+  }
+
+  private void process(final Object value, final Schema schema, final String fieldName, final MapOrListWriter writer) {
+
+    writer.start();
+    final Schema.Type type = schema.getType();
+
+    switch (type) {
+      case RECORD:
+        for (final Schema.Field field : schema.getFields()) {
+
+          MapOrListWriter _writer = writer;
+          if (field.schema().getType() == Schema.Type.RECORD) {
+            _writer = writer.map(field.name());
+          }
+
+          process(((GenericRecord) value).get(field.name()), field.schema(), field.name(), _writer);
+        }
+        break;
+      case ARRAY:
+        assert fieldName != null;
+        final GenericArray array = (GenericArray) value;
+        for (final Object o : array) {
+          process(o, array.getSchema().getElementType(), fieldName, writer.list(fieldName));
+        }
+        break;
+      case FIXED:
+      case UNION:
+      case MAP:
+        throw new UnsupportedOperationException("Unimplemented type: " + type.toString());
+      case ENUM:  // Enum symbols are strings
+      case NULL:  // Treat null type as a primitive
+      default:
+        assert fieldName != null;
+
+        if (writer.isMapWriter()) {
+          SchemaPath path;
+          if (writer.map.getField().getPath().getRootSegment().getPath().equals("")) {
+            path = new SchemaPath(new PathSegment.NameSegment(fieldName));
+          } else {
+            path = writer.map.getField().getPath().getChild(fieldName);
+          }
+
+          if (!selected(path)) {
+            break;
+          }
+        }
+
+        processPrimitive(value, schema.getType(), fieldName, writer);
+        break;
+    }
+
+    writer.end();
+  }
+
+  private void processPrimitive(final Object value, final Schema.Type type, final String fieldName,
+                                final MapOrListWriter writer) {
+
+    switch (type) {
+      case STRING:
+        final Utf8 utf8 = (Utf8) value;
+        final int length = utf8.length();
+        final VarCharHolder vh = new VarCharHolder();
+        ensure(length);
+        buffer.setBytes(0, utf8.getBytes());
+        vh.buffer = buffer;
+        vh.start = 0;
+        vh.end = length;
+        writer.varChar(fieldName).write(vh);
+        break;
+      case INT:
+        final IntHolder ih = new IntHolder();
+        ih.value = (Integer) value;
+        writer.integer(fieldName).write(ih);
+        break;
+      case LONG:
+        final BigIntHolder bh = new BigIntHolder();
+        bh.value = (Long) value;
+        writer.bigInt(fieldName).write(bh);
+        break;
+      case FLOAT:
+        final Float4Holder fh = new Float4Holder();
+        fh.value = (Float) value;
+        writer.float4(fieldName).write(fh);
+        break;
+      case DOUBLE:
+        final Float8Holder f8h = new Float8Holder();
+        f8h.value = (Double) value;
+        writer.float8(fieldName).write(f8h);
+        break;
+      case BOOLEAN:
+        final BitHolder bit = new BitHolder();
+        bit.value = (Boolean) value ? 1 : 0;
+        writer.bit(fieldName).write(bit);
+        break;
+      case BYTES:
+        // XXX - Not sure if this is correct. Nothing prints from sqlline for byte fields.
+        final VarBinaryHolder vb = new VarBinaryHolder();
+        final ByteBuffer buf = (ByteBuffer) value;
+        final byte[] bytes = buf.array();
+        ensure(bytes.length);
+        buffer.setBytes(0, bytes);
+        vb.buffer = buffer;
+        vb.start = 0;
+        vb.end = bytes.length;
+        writer.binary(fieldName).write(vb);
+        break;
+      case NULL:
+        // Nothing to do for null type
+        break;
+      case ENUM:
+        final String symbol = value.toString();
+        final byte[] b;
+        try {
+          b = symbol.getBytes("UTF-8");
+        } catch (UnsupportedEncodingException e) {
+          throw new DrillRuntimeException("Unable to read enum value for field: " + fieldName, e);
+        }
+        final VarCharHolder vch = new VarCharHolder();
+        ensure(b.length);
+        buffer.setBytes(0, b);
+        vch.buffer = buffer;
+        vch.start = 0;
+        vch.end = b.length;
+        writer.varChar(fieldName).write(vch);
+        break;
+      default:
+        throw new DrillRuntimeException("Unhandled Avro type: " + type.toString());
+    }
+  }
+
+  private boolean selected(SchemaPath field) {
+    if (isStarQuery()) {
+      return true;
+    }
+    for (final SchemaPath sp : getColumns()) {
+      if (sp.contains(field)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private void ensure(final int length) {
+    buffer = buffer.reallocIfNeeded(length);
+  }
+
+  @Override
+  public void cleanup() {
+    if (reader != null) {
+      try {
+        reader.close();
+      } catch (IOException e) {
+        logger.warn("Error closing Avro reader", e);
+      } finally {
+        reader = null;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/55a9a59d/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroScanBatchCreator.java
new file mode 100644
index 0000000..42c8e99
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroScanBatchCreator.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.avro;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.store.RecordReader;
+
+import java.util.List;
+
+/**
+ * Batch creator for Avro scans.
+ */
+public class AvroScanBatchCreator implements BatchCreator<AvroSubScan> {
+
+
+  @Override
+  public RecordBatch getBatch(final FragmentContext context, final AvroSubScan subScan,
+                              final List<RecordBatch> children) throws ExecutionSetupException {
+
+    Preconditions.checkArgument(children.isEmpty());
+    List<SchemaPath> columns = subScan.getColumns();
+    List<RecordReader> readers = Lists.newArrayList();
+
+    readers.add(new AvroRecordReader(context, subScan.getEntry().getPath(), subScan.getFileSystem(), columns));
+
+    return new ScanBatch(subScan, context, readers.iterator());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/55a9a59d/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroSubScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroSubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroSubScan.java
new file mode 100644
index 0000000..0a579aa
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroSubScan.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.avro;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+import com.google.common.collect.Iterators;
+import com.google.common.base.Preconditions;
+import org.apache.drill.exec.store.dfs.ReadEntryWithPath;
+import org.apache.hadoop.fs.FileSystem;
+
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Contains information for reading a single Avro row group from HDFS.
+ */
+@JsonTypeName("avro-sub-scan")
+public class AvroSubScan extends AbstractBase implements SubScan {
+
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AvroSubScan.class);
+
+  private final AvroFormatPlugin formatPlugin;
+  private final AvroFormatConfig formatConfig;
+  private final List<SchemaPath> columns;
+  private final String selectionRoot;
+
+  private ReadEntryWithPath entry;
+  private FileSystem fs;
+
+  @JsonCreator
+  public AvroSubScan(@JacksonInject final StoragePluginRegistry registry,
+                     @JsonProperty("storage") final StoragePluginConfig storageConfig,
+                     @JsonProperty("format") final FormatPluginConfig formatConfig,
+                     @JsonProperty("columns") final List<SchemaPath> columns,
+                     @JsonProperty("selectionRoot") final String selectionRoot) throws ExecutionSetupException {
+    this((AvroFormatPlugin) registry.getFormatPlugin(Preconditions.checkNotNull(storageConfig),
+            formatConfig == null ? new AvroFormatConfig() : formatConfig), columns, selectionRoot);
+  }
+
+  public AvroSubScan(final AvroFormatPlugin formatPlugin, final List<SchemaPath> columns,
+                     final String selectionRoot) {
+    this.formatPlugin = Preconditions.checkNotNull(formatPlugin);
+    this.formatConfig = formatPlugin.getConfig();
+    this.columns = columns;
+    this.selectionRoot = selectionRoot;
+  }
+
+  @JsonProperty("storage")
+  public StoragePluginConfig getEngineConfig() {
+    return formatPlugin.getStorageConfig();
+  }
+
+  @JsonProperty("format")
+  public AvroFormatConfig getFormatConfig() {
+    return formatConfig;
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(final PhysicalVisitor<T, X, E> physicalVisitor, final X value) throws E {
+    return physicalVisitor.visitSubScan(this, value);
+  }
+
+  @Override
+  public PhysicalOperator getNewWithChildren(final List<PhysicalOperator> children) throws ExecutionSetupException {
+    Preconditions.checkArgument(children.isEmpty());
+    return new AvroSubScan(formatPlugin, columns, selectionRoot);
+  }
+
+  @Override
+  public int getOperatorType() {
+    return UserBitShared.CoreOperatorType.AVRO_ROW_GROUP_SCAN_VALUE;
+  }
+
+  @Override
+  public Iterator<PhysicalOperator> iterator() {
+    return Iterators.emptyIterator();
+  }
+
+  @JsonIgnore
+  public List<SchemaPath> getColumns() {
+    return columns;
+  }
+
+  /*
+  public static class AvroSubScanSpec {
+
+  }
+  */
+
+  /** XXX - temp hacks **/
+
+  @JsonIgnore
+  public void setEntry(ReadEntryWithPath entry) {
+    this.entry = entry;
+  }
+
+  @JsonIgnore
+  public ReadEntryWithPath getEntry() {
+    return entry;
+  }
+
+  @JsonIgnore
+  public void setFileSystem(FileSystem fs) {
+    this.fs = fs;
+  }
+
+  @JsonIgnore
+  public FileSystem getFileSystem() {
+    return fs;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/55a9a59d/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroTypeHelper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroTypeHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroTypeHelper.java
new file mode 100644
index 0000000..aeb659f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroTypeHelper.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.avro;
+
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.Types;
+
+/**
+ * Utility class for working with Avro data types.
+ */
+public final class AvroTypeHelper {
+
+  // XXX - Decide what to do about Avro's NULL type
+  /*
+  public static final MajorType MAJOR_TYPE_NULL_OPTIONAL      = Types.optional(MinorType.NULL);
+  public static final MajorType MAJOR_TYPE_NULL_REQUIRED      = Types.required(MinorType.NULL);
+  public static final MajorType MAJOR_TYPE_NULL_REPEATED      = Types.repeated(MinorType.NULL);
+  */
+  public static final MajorType MAJOR_TYPE_BOOL_OPTIONAL      = Types.optional(MinorType.UINT1);
+  public static final MajorType MAJOR_TYPE_BOOL_REQUIRED      = Types.required(MinorType.UINT1);
+  public static final MajorType MAJOR_TYPE_BOOL_REPEATED      = Types.repeated(MinorType.UINT1);
+  public static final MajorType MAJOR_TYPE_INT_OPTIONAL       = Types.optional(MinorType.INT);
+  public static final MajorType MAJOR_TYPE_INT_REQUIRED       = Types.required(MinorType.INT);
+  public static final MajorType MAJOR_TYPE_INT_REPEATED       = Types.repeated(MinorType.INT);
+  public static final MajorType MAJOR_TYPE_BIGINT_OPTIONAL    = Types.optional(MinorType.BIGINT);
+  public static final MajorType MAJOR_TYPE_BIGINT_REQUIRED    = Types.required(MinorType.BIGINT);
+  public static final MajorType MAJOR_TYPE_BIGINT_REPEATED    = Types.repeated(MinorType.BIGINT);
+  public static final MajorType MAJOR_TYPE_FLOAT4_OPTIONAL    = Types.optional(MinorType.FLOAT4);
+  public static final MajorType MAJOR_TYPE_FLOAT4_REQUIRED    = Types.required(MinorType.FLOAT4);
+  public static final MajorType MAJOR_TYPE_FLOAT4_REPEATED    = Types.repeated(MinorType.FLOAT4);
+  public static final MajorType MAJOR_TYPE_FLOAT8_OPTIONAL    = Types.optional(MinorType.FLOAT8);
+  public static final MajorType MAJOR_TYPE_FLOAT8_REQUIRED    = Types.required(MinorType.FLOAT8);
+  public static final MajorType MAJOR_TYPE_FLOAT8_REPEATED    = Types.repeated(MinorType.FLOAT8);
+  public static final MajorType MAJOR_TYPE_VARBINARY_OPTIONAL = Types.optional(MinorType.VARBINARY);
+  public static final MajorType MAJOR_TYPE_VARBINARY_REQUIRED = Types.required(MinorType.VARBINARY);
+  public static final MajorType MAJOR_TYPE_VARBINARY_REPEATED = Types.repeated(MinorType.VARBINARY);
+  public static final MajorType MAJOR_TYPE_VARCHAR_OPTIONAL   = Types.optional(MinorType.VARCHAR);
+  public static final MajorType MAJOR_TYPE_VARCHAR_REQUIRED   = Types.required(MinorType.VARCHAR);
+  public static final MajorType MAJOR_TYPE_VARCHAR_REPEATED   = Types.repeated(MinorType.VARCHAR);
+
+
+  private static final String UNSUPPORTED = "Unsupported type: %s [%s]";
+
+  private AvroTypeHelper() { }
+
+  /**
+   * Maintains a mapping between Avro types and Drill types. Given an Avro data
+   * type, this method will return the corresponding Drill field major type.
+   *
+   * @param field   Avro field
+   * @return        Major type or null if no corresponding type
+   */
+  public static MajorType getFieldMajorType(final Field field, final DataMode mode) {
+    return getFieldMajorType(field.schema().getType(), mode);
+  }
+
+  /**
+   * Maintains a mapping between Avro types and Drill types. Given an Avro data
+   * type, this method will return the corresponding Drill field major type.
+   *
+   * @param type Avro type
+   * @param mode Data mode
+   * @return     Drill major type or null if no corresponding type
+   */
+  public static MajorType getFieldMajorType(final Type type, final DataMode mode) {
+
+    switch (type) {
+      case MAP:
+      case RECORD:
+      case ENUM:
+      case UNION:
+        throw new UnsupportedOperationException("Complex types are unimplemented");
+      case NULL:
+        /*
+        switch (mode) {
+          case OPTIONAL:
+            return MAJOR_TYPE_NULL_OPTIONAL;
+          case REQUIRED:
+            return MAJOR_TYPE_NULL_REQUIRED;
+          case REPEATED:
+            return MAJOR_TYPE_NULL_REPEATED;
+        }
+        break;
+        */
+        throw new UnsupportedOperationException(String.format(UNSUPPORTED, type.getName(), mode.name()));
+      case ARRAY:
+        break;
+      case BOOLEAN:
+        switch (mode) {
+          case OPTIONAL:
+            return MAJOR_TYPE_BOOL_OPTIONAL;
+          case REQUIRED:
+            return MAJOR_TYPE_BOOL_REQUIRED;
+          case REPEATED:
+            return MAJOR_TYPE_BOOL_REPEATED;
+        }
+        break;
+      case INT:
+        switch (mode) {
+          case OPTIONAL:
+            return MAJOR_TYPE_INT_OPTIONAL;
+          case REQUIRED:
+            return MAJOR_TYPE_INT_REQUIRED;
+          case REPEATED:
+            return MAJOR_TYPE_INT_REPEATED;
+        }
+        break;
+      case LONG:
+        switch (mode) {
+          case OPTIONAL:
+            return MAJOR_TYPE_BIGINT_OPTIONAL;
+          case REQUIRED:
+            return MAJOR_TYPE_BIGINT_REQUIRED;
+          case REPEATED:
+            return MAJOR_TYPE_BIGINT_REPEATED;
+        }
+        break;
+      case FLOAT:
+        switch (mode) {
+          case OPTIONAL:
+            return MAJOR_TYPE_FLOAT4_OPTIONAL;
+          case REQUIRED:
+            return MAJOR_TYPE_FLOAT4_REQUIRED;
+          case REPEATED:
+            return MAJOR_TYPE_FLOAT4_REPEATED;
+        }
+        break;
+      case DOUBLE:
+        switch (mode) {
+          case OPTIONAL:
+            return MAJOR_TYPE_FLOAT8_OPTIONAL;
+          case REQUIRED:
+            return MAJOR_TYPE_FLOAT8_REQUIRED;
+          case REPEATED:
+            return MAJOR_TYPE_FLOAT8_REPEATED;
+        }
+        break;
+      case BYTES:
+        switch (mode) {
+          case OPTIONAL:
+            return MAJOR_TYPE_VARBINARY_OPTIONAL;
+          case REQUIRED:
+            return MAJOR_TYPE_VARBINARY_REQUIRED;
+          case REPEATED:
+            return MAJOR_TYPE_VARBINARY_REPEATED;
+        }
+        break;
+      case STRING:
+        switch (mode) {
+          case OPTIONAL:
+            return MAJOR_TYPE_VARCHAR_OPTIONAL;
+          case REQUIRED:
+            return MAJOR_TYPE_VARCHAR_REQUIRED;
+          case REPEATED:
+            return MAJOR_TYPE_VARCHAR_REPEATED;
+        }
+        break;
+      default:
+        throw new UnsupportedOperationException(String.format(UNSUPPORTED, type.getName(), mode.name()));
+    }
+
+    throw new UnsupportedOperationException(String.format(UNSUPPORTED, type.getName(), mode.name()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/55a9a59d/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/MapOrListWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/MapOrListWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/MapOrListWriter.java
new file mode 100644
index 0000000..d2a1031
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/MapOrListWriter.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.avro;
+
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.apache.drill.exec.vector.complex.writer.BigIntWriter;
+import org.apache.drill.exec.vector.complex.writer.BitWriter;
+import org.apache.drill.exec.vector.complex.writer.Float4Writer;
+import org.apache.drill.exec.vector.complex.writer.Float8Writer;
+import org.apache.drill.exec.vector.complex.writer.IntWriter;
+import org.apache.drill.exec.vector.complex.writer.VarBinaryWriter;
+import org.apache.drill.exec.vector.complex.writer.VarCharWriter;
+
+/**
+ * Impersonates a map writer or a list writer depending on construction type.
+ * Perhaps this is a tragic misuse of polymorphism?
+ */
+public class MapOrListWriter {
+
+  final BaseWriter.MapWriter map;
+  final BaseWriter.ListWriter list;
+
+  MapOrListWriter(final BaseWriter.MapWriter writer) {
+    this.map = writer;
+    this.list = null;
+  }
+
+  MapOrListWriter(final BaseWriter.ListWriter writer) {
+    this.map = null;
+    this.list = writer;
+  }
+
+  void start() {
+    if (map != null) {
+      map.start();
+    } else {
+      list.start();
+    }
+  }
+
+  void end() {
+    if (map != null) {
+      map.end();
+    } else {
+      list.end();
+    }
+  }
+
+  MapOrListWriter map(final String name) {
+    assert map != null;
+    return new MapOrListWriter(map.map(name));
+  }
+
+  MapOrListWriter list(final String name) {
+    assert map != null;
+    return new MapOrListWriter(map.list(name));
+  }
+
+  boolean isMapWriter() {
+    return map != null;
+  }
+
+  VarCharWriter varChar(final String name) {
+    return (map != null) ? map.varChar(name) : list.varChar();
+  }
+
+  IntWriter integer(final String name) {
+    return (map != null) ? map.integer(name) : list.integer();
+  }
+
+  BigIntWriter bigInt(final String name) {
+    return (map != null) ? map.bigInt(name) : list.bigInt();
+  }
+
+  Float4Writer float4(final String name) {
+    return (map != null) ? map.float4(name) : list.float4();
+  }
+
+  Float8Writer float8(final String name) {
+    return (map != null) ? map.float8(name) : list.float8();
+  }
+
+  BitWriter bit(final String name) {
+    return (map != null) ? map.bit(name) : list.bit();
+  }
+
+  VarBinaryWriter binary(final String name) {
+    return (map != null) ? map.varBinary(name) : list.varBinary();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/55a9a59d/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json b/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json
index 6bf1872..3253e80 100644
--- a/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json
+++ b/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json
@@ -34,6 +34,9 @@
         },
         "json" : {
           type: "json"
+        },
+        "avro" : {
+          type: "avro"
         }
       }
     },
@@ -57,6 +60,9 @@
         },
         "parquet" : {
           type: "parquet"
+        },
+        "avro" : {
+          type: "avro"
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/55a9a59d/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroFormatTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroFormatTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroFormatTest.java
new file mode 100644
index 0000000..2d2522b
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroFormatTest.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.avro;
+
+import org.apache.drill.BaseTestQuery;
+
+import org.junit.Test;
+
+/**
+ * Unit tests for Avro record reader.
+ */
+public class AvroFormatTest extends BaseTestQuery {
+
+  // XXX
+  //      1. Need to test nested field names with same name as top-level names for conflict.
+  //      2. Avro supports linked lists, right? Can we test this?
+  //      3. Avro supports recursive types? Can we test this?
+  //      4. Test queries with not all columns projected.
+
+  @Test
+  public void testSimplePrimitiveSchema_NoNullValues() throws Exception {
+
+    final String file = AvroTestUtil.generateSimplePrimitiveSchema_NoNullValues();
+    final String sql =
+            "select a_string, b_int, c_long, d_float, e_double, f_bytes, h_boolean, g_null " +
+             "from dfs_test.`" + file + "`";
+    test(sql);
+  }
+
+  @Test
+  public void testSimplePrimitiveSchema_StarQuery() throws Exception {
+
+    final String file = AvroTestUtil.generateSimplePrimitiveSchema_NoNullValues();
+    final String sql = "select * from dfs_test.`" + file + "`";
+    test(sql);
+  }
+
+  @Test
+  public void testSimplePrimitiveSchema_SelectColumnSubset() throws Exception {
+
+    final String file = AvroTestUtil.generateSimplePrimitiveSchema_NoNullValues();
+    final String sql = "select h_boolean, e_double from dfs_test.`" + file + "`";
+    test(sql);
+  }
+
+  @Test
+  public void testSimpleArraySchema_NoNullValues() throws Exception {
+
+    final String file = AvroTestUtil.generateSimpleArraySchema_NoNullValues();
+    final String sql = "select a_string, c_string_array[0], e_float_array[2] " +
+            "from dfs_test.`" + file + "`";
+    test(sql);
+  }
+
+  @Test
+  public void testSimpleArraySchema_StarQuery() throws Exception {
+
+    final String file = AvroTestUtil.generateSimpleArraySchema_NoNullValues();
+    final String sql = "select * from dfs_test.`" + file + "`";
+    test(sql);
+  }
+
+  @Test
+  public void testSimpleNestedSchema_NoNullValues() throws Exception {
+
+    final String file = AvroTestUtil.generateSimpleNestedSchema_NoNullValues();
+    final String sql = "select a_string, b_int, c_record['nested_1_string'], c_record['nested_1_int'] " +
+            "from dfs_test.`" + file + "`";
+    test(sql);
+  }
+
+  @Test
+  public void testSimpleNestedSchema_StarQuery() throws Exception {
+
+    final String file = AvroTestUtil.generateSimpleNestedSchema_NoNullValues();
+    final String sql = "select * from dfs_test.`" + file + "`";
+    test(sql);
+  }
+
+  @Test
+  public void testDoubleNestedSchema_NoNullValues() throws Exception {
+
+    final String file = AvroTestUtil.generateDoubleNestedSchema_NoNullValues();
+    final String sql = "select a_string, b_int, c_record['nested_1_string'], c_record['nested_1_int'], " +
+            "c_record['nested_1_record']['double_nested_1_string'], " +
+            "c_record['nested_1_record']['double_nested_1_int'] " +
+            "from dfs_test.`" + file + "`";
+    test(sql);
+  }
+
+  @Test
+  public void testDoubleNestedSchema_StarQuery() throws Exception {
+
+    final String file = AvroTestUtil.generateDoubleNestedSchema_NoNullValues();
+    final String sql = "select * from dfs_test.`" + file + "`";
+    test(sql);
+  }
+
+  @Test
+  public void testSimpleEnumSchema_NoNullValues() throws Exception {
+
+    final String file = AvroTestUtil.generateSimpleEnumSchema_NoNullValues();
+    final String sql = "select a_string, b_enum from dfs_test.`" + file + "`";
+    test(sql);
+  }
+
+  @Test
+  public void testSimpleEnumSchema_StarQuery() throws Exception {
+
+    final String file = AvroTestUtil.generateSimpleEnumSchema_NoNullValues();
+    final String sql = "select * from dfs_test.`" + file + "`";
+    test(sql);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/55a9a59d/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroTestUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroTestUtil.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroTestUtil.java
new file mode 100644
index 0000000..419c054
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroTestUtil.java
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+
+/**
+ * Utilities for generating Avro test data.
+ */
+public class AvroTestUtil {
+
+  public static final int RECORD_COUNT = 10;
+
+  public static String generateSimplePrimitiveSchema_NoNullValues() throws Exception {
+
+    final Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
+            .namespace("org.apache.drill.exec.store.avro")
+            .fields()
+            .name("a_string").type().stringType().noDefault()
+            .name("b_int").type().intType().noDefault()
+            .name("c_long").type().longType().noDefault()
+            .name("d_float").type().floatType().noDefault()
+            .name("e_double").type().doubleType().noDefault()
+            .name("f_bytes").type().bytesType().noDefault()
+            .name("g_null").type().nullType().noDefault()
+            .name("h_boolean").type().booleanType().noDefault()
+            .endRecord();
+
+    final File file = File.createTempFile("avro-primitive-test", ".avro");
+    file.deleteOnExit();
+
+    final DataFileWriter writer = new DataFileWriter(new GenericDatumWriter(schema));
+    try {
+      writer.create(schema, file);
+
+      ByteBuffer bb = ByteBuffer.allocate(1);
+      bb.put(0, (byte) 1);
+
+      for (int i = 0; i < RECORD_COUNT; i++) {
+        final GenericRecord record = new GenericData.Record(schema);
+        record.put("a_string", "a_" + i);
+        record.put("b_int", i);
+        record.put("c_long", (long) i);
+        record.put("d_float", (float) i);
+        record.put("e_double", (double) i);
+        record.put("f_bytes", bb);
+        record.put("g_null", null);
+        record.put("h_boolean", (i % 2 == 0));
+        writer.append(record);
+      }
+    } finally {
+      writer.close();
+    }
+
+    return file.getAbsolutePath();
+  }
+
+  public static String generateSimpleEnumSchema_NoNullValues() throws Exception {
+
+    final String[] symbols = { "E_SYM_A", "E_SYM_B", "E_SYM_C", "E_SYM_D" };
+
+    final Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
+            .namespace("org.apache.drill.exec.store.avro")
+            .fields()
+            .name("a_string").type().stringType().noDefault()
+            .name("b_enum").type().enumeration("my_enum").symbols(symbols).noDefault()
+            .endRecord();
+
+    final File file = File.createTempFile("avro-primitive-test", ".avro");
+    file.deleteOnExit();
+
+    final Schema enumSchema = schema.getField("b_enum").schema();
+
+    final DataFileWriter writer = new DataFileWriter(new GenericDatumWriter(schema));
+
+    try {
+      writer.create(schema, file);
+
+      for (int i = 0; i < RECORD_COUNT; i++) {
+        final GenericRecord record = new GenericData.Record(schema);
+        record.put("a_string", "a_" + i);
+        final GenericData.EnumSymbol symbol =
+                new GenericData.EnumSymbol(enumSchema, symbols[(i + symbols.length) % symbols.length]);
+        record.put("b_enum", symbol);
+        writer.append(record);
+      }
+    } finally {
+      writer.close();
+    }
+
+    return file.getAbsolutePath();
+  }
+
+  public static String generateSimpleArraySchema_NoNullValues() throws Exception {
+
+    final File file = File.createTempFile("avro-array-test", ".avro");
+    file.deleteOnExit();
+
+    final Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
+            .namespace("org.apache.drill.exec.store.avro")
+            .fields()
+            .name("a_string").type().stringType().noDefault()
+            .name("b_int").type().intType().noDefault()
+            .name("c_string_array").type().array().items().stringType().noDefault()
+            .name("d_int_array").type().array().items().intType().noDefault()
+            .name("e_float_array").type().array().items().floatType().noDefault()
+            .endRecord();
+
+    final DataFileWriter writer = new DataFileWriter(new GenericDatumWriter(schema));
+    try {
+      writer.create(schema, file);
+
+      for (int i = 0; i < RECORD_COUNT; i++) {
+        final GenericRecord record = new GenericData.Record(schema);
+        record.put("a_string", "a_" + i);
+        record.put("b_int", i);
+
+        GenericArray array = new GenericData.Array<String>(RECORD_COUNT, schema.getField("c_string_array").schema());
+        for (int j = 0; j < RECORD_COUNT; j++) {
+          array.add(j, "c_string_array_" + i + "_" + j);
+        }
+        record.put("c_string_array", array);
+
+        array = new GenericData.Array<String>(RECORD_COUNT, schema.getField("d_int_array").schema());
+        for (int j = 0; j < RECORD_COUNT; j++) {
+          array.add(j, i * j);
+        }
+        record.put("d_int_array", array);
+
+        array = new GenericData.Array<String>(RECORD_COUNT, schema.getField("e_float_array").schema());
+        for (int j = 0; j < RECORD_COUNT; j++) {
+          array.add(j, (float) (i * j));
+        }
+        record.put("e_float_array", array);
+
+        writer.append(record);
+      }
+
+    } finally {
+      writer.close();
+    }
+    return file.getAbsolutePath();
+  }
+
+  public static String generateSimpleNestedSchema_NoNullValues() throws Exception {
+
+    final File file = File.createTempFile("avro-nested-test", ".avro");
+    file.deleteOnExit();
+
+    final Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
+            .namespace("org.apache.drill.exec.store.avro")
+            .fields()
+            .name("a_string").type().stringType().noDefault()
+            .name("b_int").type().intType().noDefault()
+            .name("c_record").type().record("my_record_1")
+              .namespace("foo.blah.org")
+              .fields()
+              .name("nested_1_string").type().stringType().noDefault()
+              .name("nested_1_int").type().intType().noDefault()
+              .endRecord()
+            .noDefault()
+            .endRecord();
+
+    final Schema nestedSchema = schema.getField("c_record").schema();
+
+    final DataFileWriter writer = new DataFileWriter(new GenericDatumWriter(schema));
+    writer.create(schema, file);
+
+    try {
+      for (int i = 0; i < RECORD_COUNT; i++) {
+        final GenericRecord record = new GenericData.Record(schema);
+        record.put("a_string", "a_" + i);
+        record.put("b_int", i);
+
+        final GenericRecord nestedRecord = new GenericData.Record(nestedSchema);
+        nestedRecord.put("nested_1_string", "nested_1_string_" +  i);
+        nestedRecord.put("nested_1_int", i * i);
+
+        record.put("c_record", nestedRecord);
+        writer.append(record);
+      }
+    } finally {
+      writer.close();
+    }
+
+    return file.getAbsolutePath();
+  }
+
+  public static String generateDoubleNestedSchema_NoNullValues() throws Exception {
+
+    final File file = File.createTempFile("avro-double-nested-test", ".avro");
+    file.deleteOnExit();
+
+    final Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
+            .namespace("org.apache.drill.exec.store.avro")
+            .fields()
+            .name("a_string").type().stringType().noDefault()
+            .name("b_int").type().intType().noDefault()
+            .name("c_record").type().record("my_record_1")
+              .namespace("foo.blah.org")
+              .fields()
+              .name("nested_1_string").type().stringType().noDefault()
+              .name("nested_1_int").type().intType().noDefault()
+              .name("nested_1_record").type().record("my_double_nested_record_1")
+                .namespace("foo.blah.org.rot")
+                .fields()
+                .name("double_nested_1_string").type().stringType().noDefault()
+                .name("double_nested_1_int").type().intType().noDefault()
+                .endRecord()
+                .noDefault()
+              .endRecord()
+              .noDefault()
+            .endRecord();
+
+    final Schema nestedSchema = schema.getField("c_record").schema();
+    final Schema doubleNestedSchema = nestedSchema.getField("nested_1_record").schema();
+
+    final DataFileWriter writer = new DataFileWriter(new GenericDatumWriter(schema));
+    writer.create(schema, file);
+
+    try {
+      for (int i = 0; i < RECORD_COUNT; i++) {
+        final GenericRecord record = new GenericData.Record(schema);
+        record.put("a_string", "a_" + i);
+        record.put("b_int", i);
+
+        final GenericRecord nestedRecord = new GenericData.Record(nestedSchema);
+        nestedRecord.put("nested_1_string", "nested_1_string_" +  i);
+        nestedRecord.put("nested_1_int", i * i);
+
+        final GenericRecord doubleNestedRecord = new GenericData.Record(doubleNestedSchema);
+        doubleNestedRecord.put("double_nested_1_string", "double_nested_1_string_" + i + "_" + i);
+        doubleNestedRecord.put("double_nested_1_int", i * i * i);
+
+        nestedRecord.put("nested_1_record", doubleNestedRecord);
+        record.put("c_record", nestedRecord);
+
+        writer.append(record);
+      }
+    } finally {
+      writer.close();
+    }
+
+    return file.getAbsolutePath();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/55a9a59d/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json b/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json
index e9772cf..d4d81f6 100644
--- a/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json
+++ b/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json
@@ -38,6 +38,9 @@
         "txt" : {
           type : "text",
           extensions: [ "txt" ]
+        },
+        "avro" : {
+          type: "avro"
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/55a9a59d/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
index 4b4e558..9a095aa 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
@@ -474,6 +474,10 @@ public final class UserBitShared {
      * <code>NESTED_LOOP_JOIN = 35;</code>
      */
     NESTED_LOOP_JOIN(35, 35),
+    /**
+     * <code>AVRO_SUB_SCAN = 36;</code>
+     */
+    AVRO_SUB_SCAN(36, 36),
     ;
 
     /**
@@ -620,6 +624,10 @@ public final class UserBitShared {
      * <code>NESTED_LOOP_JOIN = 35;</code>
      */
     public static final int NESTED_LOOP_JOIN_VALUE = 35;
+    /**
+     * <code>AVRO_SUB_SCAN = 36;</code>
+     */
+    public static final int AVRO_SUB_SCAN_VALUE = 36;
 
 
     public final int getNumber() { return value; }
@@ -662,6 +670,7 @@ public final class UserBitShared {
         case 33: return HBASE_SUB_SCAN;
         case 34: return WINDOW;
         case 35: return NESTED_LOOP_JOIN;
+        case 36: return AVRO_SUB_SCAN;
         default: return null;
       }
     }
@@ -19856,7 +19865,7 @@ public final class UserBitShared {
       "yType\022\007\n\003SQL\020\001\022\013\n\007LOGICAL\020\002\022\014\n\010PHYSICAL\020" +
       "\003*k\n\rFragmentState\022\013\n\007SENDING\020\000\022\027\n\023AWAIT" +
       "ING_ALLOCATION\020\001\022\013\n\007RUNNING\020\002\022\014\n\010FINISHE" +
-      "D\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005*\312\005\n\020CoreO" +
+      "D\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005*\335\005\n\020CoreO" +
       "peratorType\022\021\n\rSINGLE_SENDER\020\000\022\024\n\020BROADC" +
       "AST_SENDER\020\001\022\n\n\006FILTER\020\002\022\022\n\016HASH_AGGREGA" +
       "TE\020\003\022\r\n\tHASH_JOIN\020\004\022\016\n\nMERGE_JOIN\020\005\022\031\n\025H",
@@ -19874,8 +19883,9 @@ public final class UserBitShared {
       "AN\020\034\022\021\n\rJSON_SUB_SCAN\020\035\022\030\n\024INFO_SCHEMA_S" +
       "UB_SCAN\020\036\022\023\n\017COMPLEX_TO_JSON\020\037\022\025\n\021PRODUC" +
       "ER_CONSUMER\020 \022\022\n\016HBASE_SUB_SCAN\020!\022\n\n\006WIN" +
-      "DOW\020\"\022\024\n\020NESTED_LOOP_JOIN\020#B.\n\033org.apach" +
-      "e.drill.exec.protoB\rUserBitSharedH\001"
+      "DOW\020\"\022\024\n\020NESTED_LOOP_JOIN\020#\022\021\n\rAVRO_SUB_" +
+      "SCAN\020$B.\n\033org.apache.drill.exec.protoB\rU" +
+      "serBitSharedH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {

http://git-wip-us.apache.org/repos/asf/drill/blob/55a9a59d/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
index a37209d..b21d3ae 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
@@ -57,7 +57,8 @@ public enum CoreOperatorType implements com.dyuproject.protostuff.EnumLite<CoreO
     PRODUCER_CONSUMER(32),
     HBASE_SUB_SCAN(33),
     WINDOW(34),
-    NESTED_LOOP_JOIN(35);
+    NESTED_LOOP_JOIN(35),
+    AVRO_SUB_SCAN(36);
     
     public final int number;
     
@@ -111,6 +112,7 @@ public enum CoreOperatorType implements com.dyuproject.protostuff.EnumLite<CoreO
             case 33: return HBASE_SUB_SCAN;
             case 34: return WINDOW;
             case 35: return NESTED_LOOP_JOIN;
+            case 36: return AVRO_SUB_SCAN;
             default: return null;
         }
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/55a9a59d/protocol/src/main/protobuf/UserBitShared.proto
----------------------------------------------------------------------
diff --git a/protocol/src/main/protobuf/UserBitShared.proto b/protocol/src/main/protobuf/UserBitShared.proto
index 0f86958..10c2790 100644
--- a/protocol/src/main/protobuf/UserBitShared.proto
+++ b/protocol/src/main/protobuf/UserBitShared.proto
@@ -243,4 +243,5 @@ enum CoreOperatorType {
   HBASE_SUB_SCAN = 33;
   WINDOW = 34;
   NESTED_LOOP_JOIN = 35;
+  AVRO_SUB_SCAN = 36;
 }


Mime
View raw message