drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ve...@apache.org
Subject [1/5] drill git commit: DRILL-2275: Added support to get information about current cluster memory and threads
Date Thu, 26 Mar 2015 15:59:16 GMT
Repository: drill
Updated Branches:
  refs/heads/master 7bded6d3c -> 50ad974e9


DRILL-2275: Added support to get information about current cluster memory and threads

+ SystemRecordReader reads a SystemRecord e.g. MemoryRecord
+ Added generic data type for static tables
+ GroupScan can enforce width to be maximum width on ExcessiveExchangeRemover
+ GroupScan has minimum width for SimpleParallelizer


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/8ab361b1
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/8ab361b1
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/8ab361b1

Branch: refs/heads/master
Commit: 8ab361b123314b225733603560de36c170bbc117
Parents: 7bded6d
Author: Sudheesh Katkam <skatkam@maprtech.com>
Authored: Fri Mar 6 17:07:31 2015 -0800
Committer: Sudheesh Katkam <skatkam@maprtech.com>
Committed: Wed Mar 25 18:16:29 2015 -0700

----------------------------------------------------------------------
 .../org/apache/drill/common/JSONOptions.java    |  24 +++-
 .../drill/exec/memory/TopLevelAllocator.java    |   3 +
 .../exec/physical/base/AbstractGroupScan.java   |  14 +-
 .../drill/exec/physical/base/GroupScan.java     |  16 +++
 .../planner/fragment/ParallelizationInfo.java   |  10 +-
 .../planner/fragment/SimpleParallelizer.java    |   4 +-
 .../drill/exec/planner/fragment/Stats.java      |   4 +
 .../exec/planner/fragment/StatsCollector.java   |   4 +-
 .../visitor/ExcessiveExchangeIdentifier.java    |   7 +
 .../apache/drill/exec/store/RecordDataType.java |  65 +++++++++
 .../drill/exec/store/pojo/PojoDataType.java     |  42 +++---
 .../drill/exec/store/sys/DrillbitIterator.java  |   4 +-
 .../drill/exec/store/sys/MemoryRecord.java      | 141 +++++++++++++++++++
 .../drill/exec/store/sys/StaticDrillTable.java  |  19 +--
 .../drill/exec/store/sys/SystemRecord.java      |  44 ++++++
 .../exec/store/sys/SystemRecordReader.java      |  77 ++++++++++
 .../drill/exec/store/sys/SystemTable.java       |  93 +++++++++---
 .../exec/store/sys/SystemTableBatchCreator.java |  29 +++-
 .../drill/exec/store/sys/SystemTablePlugin.java |  68 ++++-----
 .../exec/store/sys/SystemTablePluginConfig.java |  12 +-
 .../drill/exec/store/sys/SystemTableScan.java   |  56 ++++++--
 .../drill/exec/store/sys/ThreadsRecord.java     | 119 ++++++++++++++++
 .../drill/exec/store/sys/TestSystemTable.java   |  12 ++
 23 files changed, 749 insertions(+), 118 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/8ab361b1/common/src/main/java/org/apache/drill/common/JSONOptions.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/JSONOptions.java b/common/src/main/java/org/apache/drill/common/JSONOptions.java
index 64e6d52..945cd92 100644
--- a/common/src/main/java/org/apache/drill/common/JSONOptions.java
+++ b/common/src/main/java/org/apache/drill/common/JSONOptions.java
@@ -48,7 +48,7 @@ import com.fasterxml.jackson.databind.ser.std.StdSerializer;
 @JsonDeserialize(using = De.class)
 public class JSONOptions {
 
-  final static Logger logger = LoggerFactory.getLogger(JSONOptions.class);
+  private final static Logger logger = LoggerFactory.getLogger(JSONOptions.class);
 
   private JsonNode root;
   private JsonLocation location;
@@ -67,17 +67,30 @@ public class JSONOptions {
   public <T> T getWith(DrillConfig config, Class<T> c) {
     try {
       if (opaque != null) {
-        if (opaque.getClass().equals(c)) {
+        final Class<?> opaqueClass = opaque.getClass();
+        if (opaqueClass.equals(c)) {
           return (T) opaque;
         } else {
-          throw new IllegalArgumentException(String.format("Attmpted to retrieve a option with type of %s.  However, the JSON options carried an opaque value of type %s.", c.getName(), opaque.getClass().getName()));
+          // Enum values that override methods are given $1, $2 ... extensions. Ignore the extension.
+          // e.g. SystemTable$1 for SystemTable.OPTION
+          if (c.isEnum()) {
+            final String opaqueName = opaqueClass.getName().replaceAll("\\$\\d+$", "");
+            final String cName = c.getName();
+            if(opaqueName.equals(cName)) {
+              return (T) opaque;
+            }
+          }
+          throw new IllegalArgumentException(String.format("Attempted to retrieve a option with type of %s.  " +
+            "However, the JSON options carried an opaque value of type %s.", c.getName(), opaqueClass.getName()));
         }
       }
 
       //logger.debug("Read tree {}", root);
       return config.getMapper().treeToValue(root, c);
     } catch (JsonProcessingException e) {
-      throw new LogicalPlanParsingException(String.format("Failure while trying to convert late bound json options to type of %s. Reference was originally located at line %d, column %d.", c.getCanonicalName(), location.getLineNr(), location.getColumnNr()), e);
+      throw new LogicalPlanParsingException(String.format("Failure while trying to convert late bound " +
+        "json options to type of %s. Reference was originally located at line %d, column %d.",
+        c.getCanonicalName(), location.getLineNr(), location.getColumnNr()), e);
     }
   }
 
@@ -95,7 +108,8 @@ public class JSONOptions {
       if ( c.equals(opaque.getClass())) {
         return (T) opaque;
       } else {
-        throw new IOException(String.format("Attmpted to retrieve a list with type of %s.  However, the JSON options carried an opaque value of type %s.", t.getType(), opaque.getClass().getName()));
+        throw new IOException(String.format("Attempted to retrieve a list with type of %s.  However, the JSON " +
+          "options carried an opaque value of type %s.", t.getType(), opaque.getClass().getName()));
       }
     }
     if (root == null) {

http://git-wip-us.apache.org/repos/asf/drill/blob/8ab361b1/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
index af8c1dc..d22651e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
@@ -37,6 +37,8 @@ import org.apache.drill.exec.util.Pointer;
 public class TopLevelAllocator implements BufferAllocator {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TopLevelAllocator.class);
 
+  public static long MAXIMUM_DIRECT_MEMORY;
+
   private static final boolean ENABLE_ACCOUNTING = AssertionUtil.isAssertionsEnabled();
   private final Map<ChildAllocator, StackTraceElement[]> childrenMap;
   private final PooledByteBufAllocatorL innerAllocator = PooledByteBufAllocatorL.DEFAULT;
@@ -56,6 +58,7 @@ public class TopLevelAllocator implements BufferAllocator {
   }
 
   private TopLevelAllocator(DrillConfig config, long maximumAllocation, boolean errorOnLeak){
+    MAXIMUM_DIRECT_MEMORY = maximumAllocation;
     this.config=(config!=null) ? config : DrillConfig.create();
     this.errorOnLeak = errorOnLeak;
     this.acct = new Accountor(config, errorOnLeak, null, null, maximumAllocation, 0, true);

http://git-wip-us.apache.org/repos/asf/drill/blob/8ab361b1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
index 276ecb5..8fe21e6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
@@ -28,7 +28,7 @@ import com.google.common.collect.Iterators;
 import org.apache.drill.exec.physical.EndpointAffinity;
 
 public abstract class AbstractGroupScan extends AbstractBase implements GroupScan {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractGroupScan.class);
+//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractGroupScan.class);
 
   @Override
   public Iterator<PhysicalOperator> iterator() {
@@ -57,6 +57,18 @@ public abstract class AbstractGroupScan extends AbstractBase implements GroupSca
 
   @Override
   @JsonIgnore
+  public int getMinParallelizationWidth() {
+    return 1;
+  }
+
+  @Override
+  @JsonIgnore
+  public boolean enforceWidth() {
+    return false;
+  }
+
+  @Override
+  @JsonIgnore
   public long getInitialAllocation() {
     return 0;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/8ab361b1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
index 23860a3..60b8330 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
@@ -45,6 +45,22 @@ public interface GroupScan extends Scan, HasAffinity{
   public int getMaxParallelizationWidth();
 
   /**
+   * At minimum, the GroupScan requires these many fragments to run.
+   * Currently, this is used in {@link org.apache.drill.exec.planner.fragment.SimpleParallelizer}
+   * @return the minimum number of fragments that should run
+   */
+  @JsonIgnore
+  public int getMinParallelizationWidth();
+
+  /**
+   * Check if GroupScan enforces width to be maximum parallelization width.
+   * Currently, this is used in {@link org.apache.drill.exec.planner.physical.visitor.ExcessiveExchangeIdentifier}
+   * @return if maximum width should be enforced
+   */
+  @JsonIgnore
+  public boolean enforceWidth();
+
+  /**
    * Returns a signature of the {@link GroupScan} which should usually be composed of
    * all its attributes which could describe it uniquely.
    */

http://git-wip-us.apache.org/repos/asf/drill/blob/8ab361b1/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ParallelizationInfo.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ParallelizationInfo.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ParallelizationInfo.java
index 75a009e..8e775af 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ParallelizationInfo.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ParallelizationInfo.java
@@ -96,8 +96,8 @@ public class ParallelizationInfo {
     private final Map<DrillbitEndpoint, EndpointAffinity> affinityMap = Maps.newHashMap();
 
     public void add(ParallelizationInfo parallelizationInfo) {
-      this.minWidth = Math.max(minWidth, parallelizationInfo.minWidth);
-      this.maxWidth = Math.min(maxWidth, parallelizationInfo.maxWidth);
+      minWidth = Math.max(minWidth, parallelizationInfo.minWidth);
+      maxWidth = Math.min(maxWidth, parallelizationInfo.maxWidth);
 
       Map<DrillbitEndpoint, EndpointAffinity> affinityMap = parallelizationInfo.getEndpointAffinityMap();
       for(Map.Entry<DrillbitEndpoint, EndpointAffinity> epAff : affinityMap.entrySet()) {
@@ -106,7 +106,11 @@ public class ParallelizationInfo {
     }
 
     public void addMaxWidth(int newMaxWidth) {
-      this.maxWidth = Math.min(maxWidth, newMaxWidth);
+      maxWidth = Math.min(maxWidth, newMaxWidth);
+    }
+
+    public void addMinWidth(int newMinWidth) {
+      minWidth = Math.max(minWidth, newMinWidth);
     }
 
     public void addEndpointAffinities(List<EndpointAffinity> endpointAffinities) {

http://git-wip-us.apache.org/repos/asf/drill/blob/8ab361b1/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
index 12043ce..66ba229 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
@@ -264,8 +264,8 @@ public class SimpleParallelizer {
       }
 
       if (width < numRequiredNodes) {
-        throw new PhysicalOperatorSetupException("Can not parallelize the fragment as the parallelization width is " +
-            "less than the number of mandatory nodes (nodes with +INFINITE affinity).");
+        throw new PhysicalOperatorSetupException("Can not parallelize the fragment as the parallelization width (" + width + ") is " +
+            "less than the number of mandatory nodes (" + numRequiredNodes + " nodes with +INFINITE affinity).");
       }
 
       // Find the maximum number of slots which should go to endpoints with affinity (See DRILL-825 for details)

http://git-wip-us.apache.org/repos/asf/drill/blob/8ab361b1/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Stats.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Stats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Stats.java
index e61b38f..b5b8ce4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Stats.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Stats.java
@@ -39,6 +39,10 @@ public class Stats {
     collector.addMaxWidth(maxWidth);
   }
 
+  public void addMinWidth(int minWidth) {
+    collector.addMinWidth(minWidth);
+  }
+
   public void addEndpointAffinities(List<EndpointAffinity> endpointAffinityList) {
     collector.addEndpointAffinities(endpointAffinityList);
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/8ab361b1/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
index 1f56556..4f4e0b5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
@@ -85,7 +85,9 @@ public class StatsCollector extends AbstractOpWrapperVisitor<Void, RuntimeExcept
 
   @Override
   public Void visitGroupScan(GroupScan groupScan, Wrapper wrapper) {
-    wrapper.getStats().addMaxWidth(groupScan.getMaxParallelizationWidth());
+    final Stats stats = wrapper.getStats();
+    stats.addMaxWidth(groupScan.getMaxParallelizationWidth());
+    stats.addMinWidth(groupScan.getMinParallelizationWidth());
     return super.visitGroupScan(groupScan, wrapper);
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/8ab361b1/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java
index a237014..9d74802 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java
@@ -94,6 +94,7 @@ public class ExcessiveExchangeIdentifier extends BasePrelVisitor<Prel, Excessive
   class MajorFragmentStat {
     private double maxRows = 0d;
     private int maxWidth = Integer.MAX_VALUE;
+    private boolean enforceWidth = false;
 
     public void add(Prel prel) {
       maxRows = Math.max(prel.getRows(), maxRows);
@@ -105,10 +106,16 @@ public class ExcessiveExchangeIdentifier extends BasePrelVisitor<Prel, Excessive
 
     public void addScan(ScanPrel prel) {
       maxWidth = Math.min(maxWidth, prel.getGroupScan().getMaxParallelizationWidth());
+      enforceWidth = prel.getGroupScan().enforceWidth();
       add(prel);
     }
 
     public boolean isSingular() {
+      // do not remove exchanges when a scan enforces width (e.g. SystemTableScan)
+      if (enforceWidth) {
+        return false;
+      }
+
       int suggestedWidth = (int) Math.ceil((maxRows+1)/targetSliceSize);
 
       int w = Math.min(maxWidth, suggestedWidth);

http://git-wip-us.apache.org/repos/asf/drill/blob/8ab361b1/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordDataType.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordDataType.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordDataType.java
new file mode 100644
index 0000000..889db12
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordDataType.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store;
+
+import com.google.common.collect.Lists;
+import org.eigenbase.reltype.RelDataType;
+import org.eigenbase.reltype.RelDataTypeFactory;
+import org.eigenbase.sql.type.SqlTypeName;
+
+import java.util.List;
+
+/**
+ * RecordDataType defines names and data types of columns in a static drill table.
+ */
+public abstract class RecordDataType {
+
+  /**
+   * @return the {@link org.eigenbase.sql.type.SqlTypeName} of columns in the table
+   */
+  public abstract List<SqlTypeName> getFieldSqlTypeNames();
+
+  /**
+   * @return the column names in the table
+   */
+  public abstract List<String> getFieldNames();
+
+  /**
+   * This method constructs a {@link org.eigenbase.reltype.RelDataType} based on the
+   * {@link org.apache.drill.exec.store.RecordDataType}'s field sql types and field names.
+   *
+   * @param factory helps construct a {@link org.eigenbase.reltype.RelDataType}
+   * @return the constructed type
+   */
+  public final RelDataType getRowType(RelDataTypeFactory factory) {
+    final List<SqlTypeName> types = getFieldSqlTypeNames();
+    final List<String> names = getFieldNames();
+    final List<RelDataType> fields = Lists.newArrayList();
+    for (final SqlTypeName typeName : types) {
+      switch (typeName) {
+        case VARCHAR:
+          fields.add(factory.createSqlType(typeName, Integer.MAX_VALUE));
+          break;
+        default:
+          fields.add(factory.createSqlType(typeName));
+      }
+    }
+    return factory.createStructType(fields, names);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8ab361b1/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoDataType.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoDataType.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoDataType.java
index c1e64e6..2acb727 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoDataType.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoDataType.java
@@ -22,24 +22,24 @@ import java.lang.reflect.Modifier;
 import java.sql.Timestamp;
 import java.util.List;
 
-import org.eigenbase.reltype.RelDataType;
-import org.eigenbase.reltype.RelDataTypeFactory;
+import org.apache.drill.exec.store.RecordDataType;
 import org.eigenbase.sql.type.SqlTypeName;
 
 import com.google.common.collect.Lists;
 
-public class PojoDataType {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PojoDataType.class);
+/**
+ * This class uses reflection of a Java class to construct a {@link org.apache.drill.exec.store.RecordDataType}.
+ */
+public class PojoDataType extends RecordDataType {
+//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PojoDataType.class);
 
-  public List<SqlTypeName> types = Lists.newArrayList();
-  public List<String> names = Lists.newArrayList();
+  private final List<SqlTypeName> types = Lists.newArrayList();
+  private final List<String> names = Lists.newArrayList();
+  private final Class<?> pojoClass;
 
   public PojoDataType(Class<?> pojoClass) {
-    logger.debug(pojoClass.getName());
-    Field[] fields = pojoClass.getDeclaredFields();
-    for (int i = 0; i < fields.length; i++) {
-      Field f = fields[i];
-
+    this.pojoClass = pojoClass;
+    for (Field f : pojoClass.getDeclaredFields()) {
       if (Modifier.isStatic(f.getModifiers())) {
         continue;
       }
@@ -62,17 +62,23 @@ public class PojoDataType {
       } else if (type == Timestamp.class) {
         types.add(SqlTypeName.TIMESTAMP);
       } else {
-        throw new RuntimeException(String.format("PojoRecord reader doesn't yet support conversions from type [%s].", type));
+        throw new RuntimeException(String.format("PojoDataType doesn't yet support conversions from type [%s].", type));
       }
     }
   }
 
-  public RelDataType getRowType(RelDataTypeFactory f) {
-    List<RelDataType> fields = Lists.newArrayList();
-    for (SqlTypeName n : types) {
-      fields.add(f.createSqlType(n));
-    }
-    return f.createStructType(fields, names);
+  public Class<?> getPojoClass() {
+    return pojoClass;
+  }
+
+  @Override
+  public List<SqlTypeName> getFieldSqlTypeNames() {
+    return types;
+  }
+
+  @Override
+  public List<String> getFieldNames() {
+    return names;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/8ab361b1/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/DrillbitIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/DrillbitIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/DrillbitIterator.java
index 67d7cf9..08bc0ac 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/DrillbitIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/DrillbitIterator.java
@@ -34,7 +34,7 @@ public class DrillbitIterator implements Iterator<Object> {
   }
 
   public static class DrillbitInstance {
-    public String host;
+    public String hostname;
     public int user_port;
     public int control_port;
     public int data_port;
@@ -51,7 +51,7 @@ public class DrillbitIterator implements Iterator<Object> {
     DrillbitEndpoint ep = endpoints.next();
     DrillbitInstance i = new DrillbitInstance();
     i.current = ep.equals(current);
-    i.host = ep.getAddress();
+    i.hostname = ep.getAddress();
     i.user_port = ep.getUserPort();
     i.control_port = ep.getControlPort();
     i.data_port = ep.getDataPort();

http://git-wip-us.apache.org/repos/asf/drill/blob/8ab361b1/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/MemoryRecord.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/MemoryRecord.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/MemoryRecord.java
new file mode 100644
index 0000000..9cb001d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/MemoryRecord.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys;
+
+import com.google.common.collect.Lists;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.TopLevelAllocator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.vector.BigIntVector;
+import org.apache.drill.exec.vector.VarCharVector;
+import org.eigenbase.sql.type.SqlTypeName;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.util.List;
+
+/**
+ * A {@link org.apache.drill.exec.store.sys.SystemRecord} that holds information about drillbit memory
+ */
+public class MemoryRecord extends SystemRecord {
+//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MemoryRecord.class);
+
+  private static final MemoryRecord INSTANCE = new MemoryRecord();
+
+  public static SystemRecord getInstance() {
+    return INSTANCE;
+  }
+
+  private static final String HOST_NAME = "hostname";
+  private static final MaterializedField hostNameField = MaterializedField.create(HOST_NAME,
+    Types.required(TypeProtos.MinorType.VARCHAR));
+  private VarCharVector hostName;
+
+  private static final String USER_PORT = "user_port";
+  private static final MaterializedField userPortField = MaterializedField.create(USER_PORT,
+    Types.required(TypeProtos.MinorType.BIGINT));
+  private BigIntVector userPort;
+
+  private static final String CURRENT_HEAP_SIZE = "heap_current";
+  private static final MaterializedField currentHeapSizeField = MaterializedField.create(CURRENT_HEAP_SIZE,
+    Types.required(TypeProtos.MinorType.BIGINT));
+  private BigIntVector currentHeapSize;
+
+  private static final String MAX_HEAP_SIZE = "heap_max";
+  private static final MaterializedField maxHeapSizeField = MaterializedField.create(MAX_HEAP_SIZE,
+    Types.required(TypeProtos.MinorType.BIGINT));
+  private BigIntVector maxHeapSize;
+
+  private static final String CURRENT_DIRECT_MEMORY = "direct_current";
+  private static final MaterializedField currentDirectMemoryField = MaterializedField.create(CURRENT_DIRECT_MEMORY,
+    Types.required(TypeProtos.MinorType.BIGINT));
+  private BigIntVector currentDirectMemory;
+
+  private static final String MAX_DIRECT_MEMORY = "direct_max";
+  private static final MaterializedField maxDirectMemoryField = MaterializedField.create(MAX_DIRECT_MEMORY,
+    Types.required(TypeProtos.MinorType.BIGINT));
+  private BigIntVector maxDirectMemory;
+
+  private static final List<SqlTypeName> FIELDS = Lists.newArrayList(SqlTypeName.VARCHAR, SqlTypeName.BIGINT,
+    SqlTypeName.BIGINT, SqlTypeName.BIGINT, SqlTypeName.BIGINT, SqlTypeName.BIGINT);
+
+  private static final List<String> NAMES = Lists.newArrayList(HOST_NAME, USER_PORT, CURRENT_HEAP_SIZE, MAX_HEAP_SIZE,
+    CURRENT_DIRECT_MEMORY, MAX_DIRECT_MEMORY);
+
+  private MemoryRecord() {
+  }
+
+  @Override
+  public void setup(final OutputMutator output) throws SchemaChangeException {
+    hostName = output.addField(hostNameField, VarCharVector.class);
+    userPort = output.addField(userPortField, BigIntVector.class);
+    currentHeapSize = output.addField(currentHeapSizeField, BigIntVector.class);
+    maxHeapSize = output.addField(maxHeapSizeField, BigIntVector.class);
+    currentDirectMemory = output.addField(currentDirectMemoryField, BigIntVector.class);
+    maxDirectMemory = output.addField(maxDirectMemoryField, BigIntVector.class);
+  }
+
+  @Override
+  public void setRecordValues(final FragmentContext context) {
+    final DrillbitContext drillbitContext = context.getDrillbitContext();
+    final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
+
+    final CoordinationProtos.DrillbitEndpoint endpoint = drillbitContext.getEndpoint();
+    final String address = endpoint.getAddress();
+    final VarCharVector.Mutator hostNameMutator = hostName.getMutator();
+    hostNameMutator.setSafe(0, address.getBytes());
+    hostNameMutator.setValueCount(1);
+
+    final int port = endpoint.getUserPort();
+    final BigIntVector.Mutator userPortMutator = userPort.getMutator();
+    userPortMutator.setSafe(0, port);
+    userPortMutator.setValueCount(1);
+
+    final BigIntVector.Mutator currentHeapSizeMutator = currentHeapSize.getMutator();
+    currentHeapSizeMutator.setSafe(0, memoryMXBean.getHeapMemoryUsage().getUsed());
+    currentHeapSizeMutator.setValueCount(1);
+
+    final BigIntVector.Mutator maxHeapSizeMutator = maxHeapSize.getMutator();
+    maxHeapSizeMutator.setSafe(0, memoryMXBean.getHeapMemoryUsage().getMax());
+    maxHeapSizeMutator.setValueCount(1);
+
+    final BigIntVector.Mutator currentDirectMemoryMutator = currentDirectMemory.getMutator();
+    currentDirectMemoryMutator.setSafe(0, drillbitContext.getAllocator().getAllocatedMemory());
+    currentDirectMemoryMutator.setValueCount(1);
+
+    final BigIntVector.Mutator maxDirectMemoryMutator = maxDirectMemory.getMutator();
+    maxDirectMemoryMutator.setSafe(0, TopLevelAllocator.MAXIMUM_DIRECT_MEMORY);
+    maxDirectMemoryMutator.setValueCount(1);
+  }
+
+  @Override
+  public List<SqlTypeName> getFieldSqlTypeNames() {
+    return FIELDS;
+  }
+
+  @Override
+  public List<String> getFieldNames() {
+    return NAMES;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8ab361b1/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/StaticDrillTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/StaticDrillTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/StaticDrillTable.java
index c1e8dd1..d9374cb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/StaticDrillTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/StaticDrillTable.java
@@ -18,24 +18,27 @@
 package org.apache.drill.exec.store.sys;
 
 import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.store.RecordDataType;
 import org.apache.drill.exec.store.StoragePlugin;
-import org.apache.drill.exec.store.pojo.PojoDataType;
 import org.eigenbase.reltype.RelDataType;
 import org.eigenbase.reltype.RelDataTypeFactory;
 
-public class StaticDrillTable extends DrillTable{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StaticDrillTable.class);
+/**
+ * A {@link org.apache.drill.exec.planner.logical.DrillTable} with a defined schema
+ * Currently, this is a wrapper class for {@link org.apache.drill.exec.store.sys.SystemTable}.
+ */
+public class StaticDrillTable extends DrillTable {
+//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StaticDrillTable.class);
 
-  private final PojoDataType type;
+  private final RecordDataType dataType;
 
-  public StaticDrillTable(PojoDataType type, String storageEngineName, StoragePlugin plugin, Object selection) {
+  public StaticDrillTable(String storageEngineName, StoragePlugin plugin, Object selection, RecordDataType dataType) {
     super(storageEngineName, plugin, selection);
-    this.type = type;
+    this.dataType = dataType;
   }
 
   @Override
   public RelDataType getRowType(RelDataTypeFactory typeFactory) {
-    return type.getRowType(typeFactory);
+    return dataType.getRowType(typeFactory);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/8ab361b1/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemRecord.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemRecord.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemRecord.java
new file mode 100644
index 0000000..5bdb9b1
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemRecord.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.RecordDataType;
+
+/**
+ * A system record holds system information (e.g. memory usage).
+ * Currently, there is only one such system record per Drillbit.
+ */
+public abstract class SystemRecord extends RecordDataType {
+
+  /**
+   * Setup value vectors to hold system information
+   * @param output the mutator from {@link org.apache.drill.exec.store.sys.SystemRecordReader}
+   * @throws SchemaChangeException
+   */
+  public abstract void setup(OutputMutator output) throws SchemaChangeException;
+
+  /**
+   * Set the values of value vectors when requested
+   * @param context the context from {@link org.apache.drill.exec.store.sys.SystemRecordReader}
+   */
+  public abstract void setRecordValues(FragmentContext context);
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8ab361b1/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemRecordReader.java
new file mode 100644
index 0000000..9f8d0d9
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemRecordReader.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+
+/**
+ * A record reader to populate a {@link SystemRecord}.
+ */
+public class SystemRecordReader extends AbstractRecordReader {
+//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SystemRecordReader.class);
+
+  private final FragmentContext fragmentContext;
+  private final SystemRecord record;
+  private boolean read;
+
+  private OperatorContext operatorContext;
+
+  public SystemRecordReader(FragmentContext context, SystemRecord record) {
+    this.fragmentContext = context;
+    this.record = record;
+    this.read = false;
+  }
+
+  @Override
+  public void setup(OutputMutator output) throws ExecutionSetupException {
+    try {
+      record.setup(output);
+    } catch (SchemaChangeException e) {
+      throw new ExecutionSetupException(e);
+    }
+  }
+
+  @Override
+  public void setOperatorContext(OperatorContext operatorContext) {
+    this.operatorContext = operatorContext;
+  }
+
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public int next() {
+    // send only one record
+    if (!read) {
+      record.setRecordValues(fragmentContext);
+      read = true;
+      return 1;
+    }
+    return 0;
+  }
+
+  @Override
+  public void cleanup() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8ab361b1/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java
index 0bf2156..2c338ca 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java
@@ -17,40 +17,93 @@
  */
 package org.apache.drill.exec.store.sys;
 
+import com.google.common.collect.Iterators;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.server.options.DrillConfigIterator;
+import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.server.options.OptionValue;
+import org.apache.drill.exec.store.RecordDataType;
 import org.apache.drill.exec.store.pojo.PojoDataType;
-import org.eigenbase.reltype.RelDataType;
-import org.eigenbase.reltype.RelDataTypeFactory;
 
+import java.util.Iterator;
+
+/**
+ * An enumeration of all system tables that Drill supports.
+ * <p/>
+ * OPTION, DRILLBITS and VERSION are local tables available on every Drillbit.
+ * MEMORY and THREADS are distributed tables with one record on every Drillbit.
+ */
 public enum SystemTable {
-  OPTION("options", OptionValue.class),
-  DRILLBITS("drillbits", DrillbitIterator.DrillbitInstance.class),
-  VERSION("version", VersionIterator.VersionInfo.class)
-  ;
 
-  private final PojoDataType type;
+  OPTION("options", false, new PojoDataType(OptionValue.class)) {
+    @Override
+    public Iterator<Object> getLocalIterator(final FragmentContext context) {
+      final DrillConfigIterator configOptions = new DrillConfigIterator(context.getConfig());
+      final OptionManager fragmentOptions = context.getOptions();
+      return (Iterator<Object>) (Object) Iterators.concat(configOptions.iterator(), fragmentOptions.iterator());
+    }
+  },
+
+  DRILLBITS("drillbits", false, new PojoDataType(DrillbitIterator.DrillbitInstance.class)) {
+    @Override
+    public Iterator<Object> getLocalIterator(final FragmentContext context) {
+      return new DrillbitIterator(context);
+    }
+  },
+
+  VERSION("version", false, new PojoDataType(VersionIterator.VersionInfo.class)) {
+    @Override
+    public Iterator<Object> getLocalIterator(final FragmentContext context) {
+      return new VersionIterator();
+    }
+  },
+
+  MEMORY("memory", true, MemoryRecord.getInstance()) {
+    @Override
+    public SystemRecord getSystemRecord() {
+      return MemoryRecord.getInstance();
+    }
+  },
+
+  THREADS("threads", true, ThreadsRecord.getInstance()) {
+    @Override
+    public SystemRecord getSystemRecord() {
+      return ThreadsRecord.getInstance();
+    }
+  };
+
+//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SystemTable.class);
+
   private final String tableName;
-  private final Class<?> pojoClass;
+  private final boolean distributed;
+  private final RecordDataType dataType;
 
-  SystemTable(String tableName, Class<?> clazz){
-    this.type = new PojoDataType(clazz);
+  SystemTable(String tableName, boolean distributed, RecordDataType dataType) {
     this.tableName = tableName;
-    this.pojoClass = clazz;
+    this.distributed = distributed;
+    this.dataType = dataType;
   }
 
-  public String getTableName(){
-    return tableName;
+  // Distributed tables must override this method
+  public SystemRecord getSystemRecord() {
+    throw new UnsupportedOperationException("Local table does not support this function.");
   }
 
-  public RelDataType getRowType(RelDataTypeFactory f){
-    return type.getRowType(f);
+  // Local tables must override this method
+  public Iterator<Object> getLocalIterator(FragmentContext context) {
+    throw new UnsupportedOperationException("Distributed table does not support this function.");
   }
 
-  public PojoDataType getType(){
-    return type;
+  public String getTableName() {
+    return tableName;
   }
 
-  public Class<?> getPojoClass(){
-    return pojoClass;
+  public boolean isDistributed() {
+    return distributed;
   }
-}
\ No newline at end of file
+
+  public RecordDataType getDataType() {
+    return dataType;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8ab361b1/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java
index a1bec1e..0152b63 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java
@@ -27,18 +27,33 @@ import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.physical.impl.ScanBatch;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.pojo.PojoDataType;
 import org.apache.drill.exec.store.pojo.PojoRecordReader;
 
-public class SystemTableBatchCreator implements BatchCreator<SystemTableScan>{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SystemTableBatchCreator.class);
+/**
+ * This class creates batches based on the the type of {@link org.apache.drill.exec.store.sys.SystemTable}.
+ * The distributed tables and the local tables use different record readers.
+ * Local system tables do not require a full-fledged query because these records are present on every Drillbit.
+ */
+public class SystemTableBatchCreator implements BatchCreator<SystemTableScan> {
+//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SystemTableBatchCreator.class);
 
   @SuppressWarnings({ "rawtypes", "unchecked" })
   @Override
-  public RecordBatch getBatch(FragmentContext context, SystemTableScan scan, List<RecordBatch> children)
-      throws ExecutionSetupException {
-    Iterator<Object> iter = scan.getPlugin().getRecordIterator(context, scan.getTable());
-    PojoRecordReader reader = new PojoRecordReader(scan.getTable().getPojoClass(), iter);
+  public RecordBatch getBatch(final FragmentContext context, final SystemTableScan scan,
+                              final List<RecordBatch> children)
+    throws ExecutionSetupException {
+    final SystemTable table = scan.getTable();
+    final RecordReader reader;
+    if (table.isDistributed()) {
+      final SystemRecord record = table.getSystemRecord();
+      reader = new SystemRecordReader(context, record);
+    } else {
+      final Iterator<Object> iter = table.getLocalIterator(context);
+      final PojoDataType type = (PojoDataType) table.getDataType();
+      reader = new PojoRecordReader(type.getPojoClass(), iter);
+    }
 
-    return new ScanBatch(scan, context, Collections.singleton( (RecordReader) reader).iterator());
+    return new ScanBatch(scan, context, Collections.singleton(reader).iterator());
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/8ab361b1/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
index 2c70fd4..13e0ff6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
@@ -18,45 +18,51 @@
 package org.apache.drill.exec.store.sys;
 
 import java.io.IOException;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
 import net.hydromatic.optiq.SchemaPlus;
-
 import org.apache.drill.common.JSONOptions;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.StoragePluginConfig;
-import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
 import org.apache.drill.exec.planner.logical.DrillTable;
 import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.server.options.DrillConfigIterator;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.AbstractStoragePlugin;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Sets;
+/**
+ * A "storage" plugin for system tables.
+ */
+public class SystemTablePlugin extends AbstractStoragePlugin {
+  // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SystemTablePlugin.class);
 
-public class SystemTablePlugin extends AbstractStoragePlugin{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SystemTablePlugin.class);
+  public static final String SYS_SCHEMA_NAME = "sys";
 
   private final DrillbitContext context;
   private final String name;
+  private final SystemTablePluginConfig config;
+  private final SystemSchema schema = new SystemSchema();
 
-  public SystemTablePlugin(SystemTablePluginConfig configuration, DrillbitContext context, String name){
+  public SystemTablePlugin(SystemTablePluginConfig config, DrillbitContext context, String name) {
+    this.config = config;
     this.context = context;
     this.name = name;
   }
 
-  private SystemSchema schema = new SystemSchema();
-
   @Override
   public StoragePluginConfig getConfig() {
-    return SystemTablePluginConfig.INSTANCE;
+    return config;
+  }
+
+  @JsonIgnore
+  public DrillbitContext getContext() {
+    return this.context;
   }
 
   @Override
@@ -64,35 +70,23 @@ public class SystemTablePlugin extends AbstractStoragePlugin{
     parent.add(schema.getName(), schema);
   }
 
-  public Iterator<Object> getRecordIterator(FragmentContext context, SystemTable table){
-    switch(table){
-    case VERSION:
-      return new VersionIterator();
-    case DRILLBITS:
-      return new DrillbitIterator(context);
-    case OPTION:
-      return Iterables.concat((Iterable<Object>)(Object) new DrillConfigIterator(context.getConfig()), //
-          context.getOptions()).iterator();
-    default:
-      throw new UnsupportedOperationException("Unable to create record iterator for table: " + table.getTableName());
-    }
-  }
-
-
   @Override
   public AbstractGroupScan getPhysicalScan(JSONOptions selection, List<SchemaPath> columns) throws IOException {
     SystemTable table = selection.getWith(context.getConfig(), SystemTable.class);
     return new SystemTableScan(table, this);
   }
 
-  private class SystemSchema extends AbstractSchema{
+  /**
+   * This class defines a namespace for {@link org.apache.drill.exec.store.sys.SystemTable}s
+   */
+  private class SystemSchema extends AbstractSchema {
 
-    private Set<String> tableNames;
+    private final Set<String> tableNames;
 
     public SystemSchema() {
-      super(ImmutableList.<String>of(), "sys");
+      super(ImmutableList.<String>of(), SYS_SCHEMA_NAME);
       Set<String> names = Sets.newHashSet();
-      for(SystemTable t : SystemTable.values()){
+      for (SystemTable t : SystemTable.values()) {
         names.add(t.getTableName());
       }
       this.tableNames = ImmutableSet.copyOf(names);
@@ -103,16 +97,14 @@ public class SystemTablePlugin extends AbstractStoragePlugin{
       return tableNames;
     }
 
-
     @Override
     public DrillTable getTable(String name) {
-      for(SystemTable table : SystemTable.values()){
-        if(table.getTableName().equalsIgnoreCase(name)){
-          return new StaticDrillTable(table.getType(), SystemTablePlugin.this.name, SystemTablePlugin.this, table);
+      for (SystemTable table : SystemTable.values()) {
+        if (table.getTableName().equalsIgnoreCase(name)) {
+          return new StaticDrillTable(SystemTablePlugin.this.name, SystemTablePlugin.this, table, table.getDataType());
         }
       }
       return null;
-
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/8ab361b1/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePluginConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePluginConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePluginConfig.java
index 93fe68e..b3348a2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePluginConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePluginConfig.java
@@ -19,15 +19,17 @@ package org.apache.drill.exec.store.sys;
 
 import org.apache.drill.common.logical.StoragePluginConfig;
 
+/**
+ * A namesake plugin configuration for system tables.
+ */
 public class SystemTablePluginConfig extends StoragePluginConfig {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SystemTablePluginConfig.class);
-
-  public static String NAME = "system-tables";
+//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SystemTablePluginConfig.class);
 
-  public static SystemTablePluginConfig INSTANCE = new SystemTablePluginConfig();
+  public static final String NAME = "system-tables";
 
-  private SystemTablePluginConfig(){
+  public static final SystemTablePluginConfig INSTANCE = new SystemTablePluginConfig();
 
+  private SystemTablePluginConfig() {
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/8ab361b1/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
index cdd0d18..ce029ce 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 
+import com.google.common.collect.Lists;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.physical.EndpointAffinity;
@@ -37,12 +38,13 @@ import org.apache.drill.exec.store.StoragePluginRegistry;
 import parquet.org.codehaus.jackson.annotate.JsonCreator;
 
 import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 
 @JsonTypeName("sys")
-public class SystemTableScan extends AbstractGroupScan implements SubScan{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SystemTableScan.class);
+public class SystemTableScan extends AbstractGroupScan implements SubScan {
+  // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SystemTableScan.class);
 
   private final SystemTable table;
   private final SystemTablePlugin plugin;
@@ -51,17 +53,22 @@ public class SystemTableScan extends AbstractGroupScan implements SubScan{
   public SystemTableScan( //
       @JsonProperty("table") SystemTable table, //
       @JacksonInject StoragePluginRegistry engineRegistry //
-      ) throws IOException, ExecutionSetupException {
+  ) throws IOException, ExecutionSetupException {
     this.table = table;
     this.plugin = (SystemTablePlugin) engineRegistry.getPlugin(SystemTablePluginConfig.INSTANCE);
   }
 
-  public SystemTableScan(SystemTable table, SystemTablePlugin plugin){
+  public SystemTableScan(SystemTable table, SystemTablePlugin plugin) {
     this.table = table;
     this.plugin = plugin;
   }
 
-  public ScanStats getScanStats(){
+  /**
+   * System tables do not need stats.
+   * @return a trivial stats table
+   */
+  @Override
+  public ScanStats getScanStats() {
     return ScanStats.TRIVIAL_TABLE;
   }
 
@@ -79,9 +86,22 @@ public class SystemTableScan extends AbstractGroupScan implements SubScan{
     return this;
   }
 
+  // If distributed, the scan needs to happen on every node.
   @Override
   public int getMaxParallelizationWidth() {
-    return 1;
+    return table.isDistributed() ? plugin.getContext().getBits().size() : 1;
+  }
+
+  // If distributed, the scan needs to happen on every node.
+  @Override
+  public int getMinParallelizationWidth() {
+    return table.isDistributed() ? plugin.getContext().getBits().size() : 1;
+  }
+
+  // This enforces maximum parallelization width.
+  @Override
+  public boolean enforceWidth() {
+    return true;
   }
 
   @Override
@@ -96,7 +116,8 @@ public class SystemTableScan extends AbstractGroupScan implements SubScan{
 
   @Override
   public String getDigest() {
-    return "SystemTableScan: " + table.name();
+    return "SystemTableScan [table=" + table.name() +
+      ", distributed=" + table.isDistributed() + "]";
   }
 
   @Override
@@ -104,6 +125,25 @@ public class SystemTableScan extends AbstractGroupScan implements SubScan{
     return CoreOperatorType.SYSTEM_TABLE_SCAN_VALUE;
   }
 
+  /**
+   * If distributed, the scan needs to happen on every node. Since width is enforced, the number of fragments equals
+   * number of Drillbits. And here we set, endpoint affinities to Double.POSITIVE_INFINITY to ensure every
+   * Drillbit executes a fragment.
+   * @return the Drillbit endpoint affinities
+   */
+  @Override
+  public List<EndpointAffinity> getOperatorAffinity() {
+    if (table.isDistributed()) {
+      final List<EndpointAffinity> affinities = Lists.newArrayList();
+      for (final DrillbitEndpoint endpoint : plugin.getContext().getBits()) {
+        affinities.add(new EndpointAffinity(endpoint, Double.POSITIVE_INFINITY));
+      }
+      return affinities;
+    } else {
+      return Collections.emptyList();
+    }
+  }
+
   @Override
   public GroupScan clone(List<SchemaPath> columns) {
     return this;
@@ -113,9 +153,9 @@ public class SystemTableScan extends AbstractGroupScan implements SubScan{
     return table;
   }
 
+  @JsonIgnore
   public SystemTablePlugin getPlugin() {
     return plugin;
   }
 
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/8ab361b1/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ThreadsRecord.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ThreadsRecord.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ThreadsRecord.java
new file mode 100644
index 0000000..b184880
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ThreadsRecord.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys;
+
+import com.google.common.collect.Lists;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.vector.BigIntVector;
+import org.apache.drill.exec.vector.VarCharVector;
+import org.eigenbase.sql.type.SqlTypeName;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadMXBean;
+import java.util.List;
+
+/**
+ * A {@link org.apache.drill.exec.store.sys.SystemRecord} that holds information about drillbit threads
+ */
+public class ThreadsRecord extends SystemRecord {
+//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ThreadsRecord.class);
+
+  private static final ThreadsRecord INSTANCE = new ThreadsRecord();
+
+  public static SystemRecord getInstance() {
+    return INSTANCE;
+  }
+
+  private static final String HOST_NAME = "hostname";
+  private static final MaterializedField hostNameField = MaterializedField.create(HOST_NAME,
+    Types.required(TypeProtos.MinorType.VARCHAR));
+  private VarCharVector hostName;
+
+  private static final String USER_PORT = "user_port";
+  private static final MaterializedField userPortField = MaterializedField.create(USER_PORT,
+    Types.required(TypeProtos.MinorType.BIGINT));
+  private BigIntVector userPort;
+
+  private static final String TOTAL_THREADS = "total_threads";
+  private static final MaterializedField totalThreadsField = MaterializedField.create(TOTAL_THREADS,
+    Types.required(TypeProtos.MinorType.BIGINT));
+  private BigIntVector totalThreads;
+
+  private static final String BUSY_THREADS = "busy_threads";
+  private static final MaterializedField busyThreadsField = MaterializedField.create(BUSY_THREADS,
+    Types.required(TypeProtos.MinorType.BIGINT));
+  private BigIntVector busyThreads;
+
+  private static final List<SqlTypeName> FIELDS = Lists.newArrayList(SqlTypeName.VARCHAR, SqlTypeName.BIGINT,
+    SqlTypeName.BIGINT, SqlTypeName.BIGINT);
+
+  private static final List<String> NAMES = Lists.newArrayList(HOST_NAME, USER_PORT, TOTAL_THREADS, BUSY_THREADS);
+
+  private ThreadsRecord() {
+  }
+
+  @Override
+  public void setup(final OutputMutator output) throws SchemaChangeException {
+    hostName = output.addField(hostNameField, VarCharVector.class);
+    userPort = output.addField(userPortField, BigIntVector.class);
+    totalThreads = output.addField(totalThreadsField, BigIntVector.class);
+    busyThreads = output.addField(busyThreadsField, BigIntVector.class);
+  }
+
+  @Override
+  public void setRecordValues(final FragmentContext context) {
+    final DrillbitContext drillbitContext = context.getDrillbitContext();
+    final ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
+
+    final CoordinationProtos.DrillbitEndpoint endpoint = drillbitContext.getEndpoint();
+    final String address = endpoint.getAddress();
+    final VarCharVector.Mutator hostNameMutator = hostName.getMutator();
+    hostNameMutator.setSafe(0, address.getBytes());
+    hostNameMutator.setValueCount(1);
+
+    final int port = endpoint.getUserPort();
+    final BigIntVector.Mutator userPortMutator = userPort.getMutator();
+    userPortMutator.setSafe(0, port);
+    userPortMutator.setValueCount(1);
+
+    final BigIntVector.Mutator totalThreadsMutator = totalThreads.getMutator();
+    totalThreadsMutator.setSafe(0, threadMXBean.getPeakThreadCount());
+    totalThreadsMutator.setValueCount(1);
+
+    final BigIntVector.Mutator busyThreadsMutator = busyThreads.getMutator();
+    busyThreadsMutator.setSafe(0, threadMXBean.getThreadCount());
+    busyThreadsMutator.setValueCount(1);
+  }
+
+  @Override
+  public List<SqlTypeName> getFieldSqlTypeNames() {
+    return FIELDS;
+  }
+
+  @Override
+  public List<String> getFieldNames() {
+    return NAMES;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8ab361b1/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestSystemTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestSystemTable.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestSystemTable.java
index c1803bc..4f4d29b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestSystemTable.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestSystemTable.java
@@ -44,4 +44,16 @@ public class TestSystemTable extends BaseTestQuery {
       .baselineValues(true)
       .go();
   }
+
+  // need to enhance this
+  @Test
+  public void testThreadsTable() throws Exception {
+    test("select * from sys.threads");
+  }
+
+  // need to enhance this
+  @Test
+  public void testMemoryTable() throws Exception {
+    test("select * from sys.memory");
+  }
 }


Mime
View raw message