drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From meh...@apache.org
Subject [2/2] drill git commit: DRILL-2766: Removed SystemRecordReader and SystemRecords + Use PojoRecordReader with ThreadsIterator and MemoryIterator + Includes DRILL-2670: Added a test case since DRILL-2714 resolves this issue
Date Tue, 14 Apr 2015 20:54:02 GMT
DRILL-2766: Removed SystemRecordReader and SystemRecords
+ Use PojoRecordReader with ThreadsIterator and MemoryIterator
+ Includes DRILL-2670: Added a test case since DRILL-2714 resolves this issue


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/7cee11c2
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/7cee11c2
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/7cee11c2

Branch: refs/heads/master
Commit: 7cee11c2bf24b0dd532f1068526bf7caecc9a935
Parents: 5441e72
Author: Sudheesh Katkam <skatkam@maprtech.com>
Authored: Fri Apr 10 14:07:39 2015 -0700
Committer: Mehant Baid <mehantr@gmail.com>
Committed: Tue Apr 14 10:56:26 2015 -0700

----------------------------------------------------------------------
 .../drill/exec/store/sys/MemoryIterator.java    |  77 ++++++++++
 .../drill/exec/store/sys/MemoryRecord.java      | 141 -------------------
 .../drill/exec/store/sys/SystemRecord.java      |  44 ------
 .../exec/store/sys/SystemRecordReader.java      |  77 ----------
 .../drill/exec/store/sys/SystemTable.java       |  46 +++---
 .../exec/store/sys/SystemTableBatchCreator.java |  12 +-
 .../drill/exec/store/sys/SystemTablePlugin.java |   4 +-
 .../drill/exec/store/sys/ThreadsIterator.java   |  70 +++++++++
 .../drill/exec/store/sys/ThreadsRecord.java     | 119 ----------------
 .../drill/exec/store/sys/TestSystemTable.java   |  14 +-
 10 files changed, 180 insertions(+), 424 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/7cee11c2/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/MemoryIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/MemoryIterator.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/MemoryIterator.java
new file mode 100644
index 0000000..4ceeebe
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/MemoryIterator.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys;
+
+
+import org.apache.drill.exec.memory.TopLevelAllocator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryUsage;
+import java.util.Iterator;
+
+public class MemoryIterator implements Iterator<Object> {
+
+  private boolean beforeFirst = true;
+  private final FragmentContext context;
+
+  public MemoryIterator(final FragmentContext context) {
+    this.context = context;
+  }
+
+  @Override
+  public boolean hasNext() {
+    return beforeFirst;
+  }
+
+  @Override
+  public Object next() {
+    if (!beforeFirst) {
+      throw new IllegalStateException();
+    }
+    beforeFirst = false;
+    final MemoryInfo memoryInfo = new MemoryInfo();
+
+    final DrillbitEndpoint endpoint = context.getIdentity();
+    memoryInfo.hostname = endpoint.getAddress();
+    memoryInfo.user_port = endpoint.getUserPort();
+
+    final MemoryUsage heapMemoryUsage = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
+    memoryInfo.heap_current = heapMemoryUsage.getUsed();
+    memoryInfo.heap_max = heapMemoryUsage.getMax();
+
+    memoryInfo.direct_current = context.getDrillbitContext().getAllocator().getAllocatedMemory();
+    memoryInfo.direct_max = TopLevelAllocator.MAXIMUM_DIRECT_MEMORY;
+    return memoryInfo;
+  }
+
+  @Override
+  public void remove() {
+    throw new UnsupportedOperationException();
+  }
+
+  public static class MemoryInfo {
+    public String hostname;
+    public long user_port;
+    public long heap_current;
+    public long heap_max;
+    public long direct_current;
+    public long direct_max;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/7cee11c2/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/MemoryRecord.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/MemoryRecord.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/MemoryRecord.java
deleted file mode 100644
index 9cb001d..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/MemoryRecord.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.sys;
-
-import com.google.common.collect.Lists;
-import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.memory.TopLevelAllocator;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.proto.CoordinationProtos;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.vector.BigIntVector;
-import org.apache.drill.exec.vector.VarCharVector;
-import org.eigenbase.sql.type.SqlTypeName;
-
-import java.lang.management.ManagementFactory;
-import java.lang.management.MemoryMXBean;
-import java.util.List;
-
-/**
- * A {@link org.apache.drill.exec.store.sys.SystemRecord} that holds information about drillbit
memory
- */
-public class MemoryRecord extends SystemRecord {
-//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MemoryRecord.class);
-
-  private static final MemoryRecord INSTANCE = new MemoryRecord();
-
-  public static SystemRecord getInstance() {
-    return INSTANCE;
-  }
-
-  private static final String HOST_NAME = "hostname";
-  private static final MaterializedField hostNameField = MaterializedField.create(HOST_NAME,
-    Types.required(TypeProtos.MinorType.VARCHAR));
-  private VarCharVector hostName;
-
-  private static final String USER_PORT = "user_port";
-  private static final MaterializedField userPortField = MaterializedField.create(USER_PORT,
-    Types.required(TypeProtos.MinorType.BIGINT));
-  private BigIntVector userPort;
-
-  private static final String CURRENT_HEAP_SIZE = "heap_current";
-  private static final MaterializedField currentHeapSizeField = MaterializedField.create(CURRENT_HEAP_SIZE,
-    Types.required(TypeProtos.MinorType.BIGINT));
-  private BigIntVector currentHeapSize;
-
-  private static final String MAX_HEAP_SIZE = "heap_max";
-  private static final MaterializedField maxHeapSizeField = MaterializedField.create(MAX_HEAP_SIZE,
-    Types.required(TypeProtos.MinorType.BIGINT));
-  private BigIntVector maxHeapSize;
-
-  private static final String CURRENT_DIRECT_MEMORY = "direct_current";
-  private static final MaterializedField currentDirectMemoryField = MaterializedField.create(CURRENT_DIRECT_MEMORY,
-    Types.required(TypeProtos.MinorType.BIGINT));
-  private BigIntVector currentDirectMemory;
-
-  private static final String MAX_DIRECT_MEMORY = "direct_max";
-  private static final MaterializedField maxDirectMemoryField = MaterializedField.create(MAX_DIRECT_MEMORY,
-    Types.required(TypeProtos.MinorType.BIGINT));
-  private BigIntVector maxDirectMemory;
-
-  private static final List<SqlTypeName> FIELDS = Lists.newArrayList(SqlTypeName.VARCHAR,
SqlTypeName.BIGINT,
-    SqlTypeName.BIGINT, SqlTypeName.BIGINT, SqlTypeName.BIGINT, SqlTypeName.BIGINT);
-
-  private static final List<String> NAMES = Lists.newArrayList(HOST_NAME, USER_PORT,
CURRENT_HEAP_SIZE, MAX_HEAP_SIZE,
-    CURRENT_DIRECT_MEMORY, MAX_DIRECT_MEMORY);
-
-  private MemoryRecord() {
-  }
-
-  @Override
-  public void setup(final OutputMutator output) throws SchemaChangeException {
-    hostName = output.addField(hostNameField, VarCharVector.class);
-    userPort = output.addField(userPortField, BigIntVector.class);
-    currentHeapSize = output.addField(currentHeapSizeField, BigIntVector.class);
-    maxHeapSize = output.addField(maxHeapSizeField, BigIntVector.class);
-    currentDirectMemory = output.addField(currentDirectMemoryField, BigIntVector.class);
-    maxDirectMemory = output.addField(maxDirectMemoryField, BigIntVector.class);
-  }
-
-  @Override
-  public void setRecordValues(final FragmentContext context) {
-    final DrillbitContext drillbitContext = context.getDrillbitContext();
-    final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
-
-    final CoordinationProtos.DrillbitEndpoint endpoint = drillbitContext.getEndpoint();
-    final String address = endpoint.getAddress();
-    final VarCharVector.Mutator hostNameMutator = hostName.getMutator();
-    hostNameMutator.setSafe(0, address.getBytes());
-    hostNameMutator.setValueCount(1);
-
-    final int port = endpoint.getUserPort();
-    final BigIntVector.Mutator userPortMutator = userPort.getMutator();
-    userPortMutator.setSafe(0, port);
-    userPortMutator.setValueCount(1);
-
-    final BigIntVector.Mutator currentHeapSizeMutator = currentHeapSize.getMutator();
-    currentHeapSizeMutator.setSafe(0, memoryMXBean.getHeapMemoryUsage().getUsed());
-    currentHeapSizeMutator.setValueCount(1);
-
-    final BigIntVector.Mutator maxHeapSizeMutator = maxHeapSize.getMutator();
-    maxHeapSizeMutator.setSafe(0, memoryMXBean.getHeapMemoryUsage().getMax());
-    maxHeapSizeMutator.setValueCount(1);
-
-    final BigIntVector.Mutator currentDirectMemoryMutator = currentDirectMemory.getMutator();
-    currentDirectMemoryMutator.setSafe(0, drillbitContext.getAllocator().getAllocatedMemory());
-    currentDirectMemoryMutator.setValueCount(1);
-
-    final BigIntVector.Mutator maxDirectMemoryMutator = maxDirectMemory.getMutator();
-    maxDirectMemoryMutator.setSafe(0, TopLevelAllocator.MAXIMUM_DIRECT_MEMORY);
-    maxDirectMemoryMutator.setValueCount(1);
-  }
-
-  @Override
-  public List<SqlTypeName> getFieldSqlTypeNames() {
-    return FIELDS;
-  }
-
-  @Override
-  public List<String> getFieldNames() {
-    return NAMES;
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/7cee11c2/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemRecord.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemRecord.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemRecord.java
deleted file mode 100644
index 5bdb9b1..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemRecord.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.sys;
-
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.store.RecordDataType;
-
-/**
- * A system record holds system information (e.g. memory usage).
- * Currently, there is only one such system record per Drillbit.
- */
-public abstract class SystemRecord extends RecordDataType {
-
-  /**
-   * Setup value vectors to hold system information
-   * @param output the mutator from {@link org.apache.drill.exec.store.sys.SystemRecordReader}
-   * @throws SchemaChangeException
-   */
-  public abstract void setup(OutputMutator output) throws SchemaChangeException;
-
-  /**
-   * Set the values of value vectors when requested
-   * @param context the context from {@link org.apache.drill.exec.store.sys.SystemRecordReader}
-   */
-  public abstract void setRecordValues(FragmentContext context);
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/7cee11c2/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemRecordReader.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemRecordReader.java
deleted file mode 100644
index 9f8d0d9..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemRecordReader.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.sys;
-
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.store.AbstractRecordReader;
-
-/**
- * A record reader to populate a {@link SystemRecord}.
- */
-public class SystemRecordReader extends AbstractRecordReader {
-//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SystemRecordReader.class);
-
-  private final FragmentContext fragmentContext;
-  private final SystemRecord record;
-  private boolean read;
-
-  private OperatorContext operatorContext;
-
-  public SystemRecordReader(FragmentContext context, SystemRecord record) {
-    this.fragmentContext = context;
-    this.record = record;
-    this.read = false;
-  }
-
-  @Override
-  public void setup(OutputMutator output) throws ExecutionSetupException {
-    try {
-      record.setup(output);
-    } catch (SchemaChangeException e) {
-      throw new ExecutionSetupException(e);
-    }
-  }
-
-  @Override
-  public void setOperatorContext(OperatorContext operatorContext) {
-    this.operatorContext = operatorContext;
-  }
-
-  public OperatorContext getOperatorContext() {
-    return operatorContext;
-  }
-
-  @Override
-  public int next() {
-    // send only one record
-    if (!read) {
-      record.setRecordValues(fragmentContext);
-      read = true;
-      return 1;
-    }
-    return 0;
-  }
-
-  @Override
-  public void cleanup() {
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/7cee11c2/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java
index 2c338ca..e2ac9ed 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java
@@ -22,8 +22,6 @@ import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.server.options.DrillConfigIterator;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.server.options.OptionValue;
-import org.apache.drill.exec.store.RecordDataType;
-import org.apache.drill.exec.store.pojo.PojoDataType;
 
 import java.util.Iterator;
 
@@ -35,40 +33,40 @@ import java.util.Iterator;
  */
 public enum SystemTable {
 
-  OPTION("options", false, new PojoDataType(OptionValue.class)) {
+  OPTION("options", false, OptionValue.class) {
     @Override
-    public Iterator<Object> getLocalIterator(final FragmentContext context) {
+    public Iterator<Object> getIterator(final FragmentContext context) {
       final DrillConfigIterator configOptions = new DrillConfigIterator(context.getConfig());
       final OptionManager fragmentOptions = context.getOptions();
       return (Iterator<Object>) (Object) Iterators.concat(configOptions.iterator(),
fragmentOptions.iterator());
     }
   },
 
-  DRILLBITS("drillbits", false, new PojoDataType(DrillbitIterator.DrillbitInstance.class))
{
+  DRILLBITS("drillbits", false,DrillbitIterator.DrillbitInstance.class) {
     @Override
-    public Iterator<Object> getLocalIterator(final FragmentContext context) {
+    public Iterator<Object> getIterator(final FragmentContext context) {
       return new DrillbitIterator(context);
     }
   },
 
-  VERSION("version", false, new PojoDataType(VersionIterator.VersionInfo.class)) {
+  VERSION("version", false, VersionIterator.VersionInfo.class) {
     @Override
-    public Iterator<Object> getLocalIterator(final FragmentContext context) {
+    public Iterator<Object> getIterator(final FragmentContext context) {
       return new VersionIterator();
     }
   },
 
-  MEMORY("memory", true, MemoryRecord.getInstance()) {
+  MEMORY("memory", true, MemoryIterator.MemoryInfo.class) {
     @Override
-    public SystemRecord getSystemRecord() {
-      return MemoryRecord.getInstance();
+    public Iterator<Object> getIterator(final FragmentContext context) {
+      return new MemoryIterator(context);
     }
   },
 
-  THREADS("threads", true, ThreadsRecord.getInstance()) {
+  THREADS("threads", true, ThreadsIterator.ThreadsInfo.class) {
     @Override
-    public SystemRecord getSystemRecord() {
-      return ThreadsRecord.getInstance();
+  public Iterator<Object> getIterator(final FragmentContext context) {
+      return new ThreadsIterator(context);
     }
   };
 
@@ -76,22 +74,16 @@ public enum SystemTable {
 
   private final String tableName;
   private final boolean distributed;
-  private final RecordDataType dataType;
+  private final Class<?> pojoClass;
 
-  SystemTable(String tableName, boolean distributed, RecordDataType dataType) {
+  SystemTable(final String tableName, final boolean distributed, final Class<?> pojoClass)
{
     this.tableName = tableName;
     this.distributed = distributed;
-    this.dataType = dataType;
+    this.pojoClass = pojoClass;
   }
 
-  // Distributed tables must override this method
-  public SystemRecord getSystemRecord() {
-    throw new UnsupportedOperationException("Local table does not support this function.");
-  }
-
-  // Local tables must override this method
-  public Iterator<Object> getLocalIterator(FragmentContext context) {
-    throw new UnsupportedOperationException("Distributed table does not support this function.");
+  public Iterator<Object> getIterator(final FragmentContext context) {
+    throw new UnsupportedOperationException(tableName + " must override this method.");
   }
 
   public String getTableName() {
@@ -102,8 +94,8 @@ public enum SystemTable {
     return distributed;
   }
 
-  public RecordDataType getDataType() {
-    return dataType;
+  public Class getPojoClass() {
+    return pojoClass;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/7cee11c2/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java
index 0152b63..92f676a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java
@@ -27,7 +27,6 @@ import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.physical.impl.ScanBatch;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.store.RecordReader;
-import org.apache.drill.exec.store.pojo.PojoDataType;
 import org.apache.drill.exec.store.pojo.PojoRecordReader;
 
 /**
@@ -44,15 +43,8 @@ public class SystemTableBatchCreator implements BatchCreator<SystemTableScan>
{
                               final List<RecordBatch> children)
     throws ExecutionSetupException {
     final SystemTable table = scan.getTable();
-    final RecordReader reader;
-    if (table.isDistributed()) {
-      final SystemRecord record = table.getSystemRecord();
-      reader = new SystemRecordReader(context, record);
-    } else {
-      final Iterator<Object> iter = table.getLocalIterator(context);
-      final PojoDataType type = (PojoDataType) table.getDataType();
-      reader = new PojoRecordReader(type.getPojoClass(), iter);
-    }
+    final Iterator<Object> iterator = table.getIterator(context);
+    final RecordReader reader = new PojoRecordReader(table.getPojoClass(), iterator);
 
     return new ScanBatch(scan, context, Collections.singleton(reader).iterator());
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/7cee11c2/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
index 13e0ff6..b92f98c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
@@ -35,6 +35,7 @@ import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.AbstractStoragePlugin;
+import org.apache.drill.exec.store.pojo.PojoDataType;
 
 /**
  * A "storage" plugin for system tables.
@@ -101,7 +102,8 @@ public class SystemTablePlugin extends AbstractStoragePlugin {
     public DrillTable getTable(String name) {
       for (SystemTable table : SystemTable.values()) {
         if (table.getTableName().equalsIgnoreCase(name)) {
-          return new StaticDrillTable(SystemTablePlugin.this.name, SystemTablePlugin.this,
table, table.getDataType());
+          return new StaticDrillTable(SystemTablePlugin.this.name, SystemTablePlugin.this,
table,
+            new PojoDataType(table.getPojoClass()));
         }
       }
       return null;

http://git-wip-us.apache.org/repos/asf/drill/blob/7cee11c2/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ThreadsIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ThreadsIterator.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ThreadsIterator.java
new file mode 100644
index 0000000..e9bc7ff
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ThreadsIterator.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys;
+
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadMXBean;
+import java.util.Iterator;
+
+public class ThreadsIterator implements Iterator<Object> {
+
+  private boolean beforeFirst = true;
+  private final FragmentContext context;
+
+  public ThreadsIterator(final FragmentContext context) {
+    this.context = context;
+  }
+
+  @Override
+  public boolean hasNext() {
+    return beforeFirst;
+  }
+
+  @Override
+  public Object next() {
+    if (!beforeFirst) {
+      throw new IllegalStateException();
+    }
+    beforeFirst = false;
+    final ThreadsInfo threadsInfo = new ThreadsInfo();
+
+    final DrillbitEndpoint endpoint = context.getIdentity();
+    threadsInfo.hostname = endpoint.getAddress();
+    threadsInfo.user_port = endpoint.getUserPort();
+
+    final ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
+    threadsInfo.total_threads = threadMXBean.getPeakThreadCount();
+    threadsInfo.busy_threads = threadMXBean.getThreadCount();
+    return threadsInfo;
+  }
+
+  @Override
+  public void remove() {
+    throw new UnsupportedOperationException();
+  }
+
+  public static class ThreadsInfo {
+    public String hostname;
+    public long user_port;
+    public long total_threads;
+    public long busy_threads;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/7cee11c2/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ThreadsRecord.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ThreadsRecord.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ThreadsRecord.java
deleted file mode 100644
index b184880..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ThreadsRecord.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.sys;
-
-import com.google.common.collect.Lists;
-import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.proto.CoordinationProtos;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.vector.BigIntVector;
-import org.apache.drill.exec.vector.VarCharVector;
-import org.eigenbase.sql.type.SqlTypeName;
-
-import java.lang.management.ManagementFactory;
-import java.lang.management.ThreadMXBean;
-import java.util.List;
-
-/**
- * A {@link org.apache.drill.exec.store.sys.SystemRecord} that holds information about drillbit
threads
- */
-public class ThreadsRecord extends SystemRecord {
-//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ThreadsRecord.class);
-
-  private static final ThreadsRecord INSTANCE = new ThreadsRecord();
-
-  public static SystemRecord getInstance() {
-    return INSTANCE;
-  }
-
-  private static final String HOST_NAME = "hostname";
-  private static final MaterializedField hostNameField = MaterializedField.create(HOST_NAME,
-    Types.required(TypeProtos.MinorType.VARCHAR));
-  private VarCharVector hostName;
-
-  private static final String USER_PORT = "user_port";
-  private static final MaterializedField userPortField = MaterializedField.create(USER_PORT,
-    Types.required(TypeProtos.MinorType.BIGINT));
-  private BigIntVector userPort;
-
-  private static final String TOTAL_THREADS = "total_threads";
-  private static final MaterializedField totalThreadsField = MaterializedField.create(TOTAL_THREADS,
-    Types.required(TypeProtos.MinorType.BIGINT));
-  private BigIntVector totalThreads;
-
-  private static final String BUSY_THREADS = "busy_threads";
-  private static final MaterializedField busyThreadsField = MaterializedField.create(BUSY_THREADS,
-    Types.required(TypeProtos.MinorType.BIGINT));
-  private BigIntVector busyThreads;
-
-  private static final List<SqlTypeName> FIELDS = Lists.newArrayList(SqlTypeName.VARCHAR,
SqlTypeName.BIGINT,
-    SqlTypeName.BIGINT, SqlTypeName.BIGINT);
-
-  private static final List<String> NAMES = Lists.newArrayList(HOST_NAME, USER_PORT,
TOTAL_THREADS, BUSY_THREADS);
-
-  private ThreadsRecord() {
-  }
-
-  @Override
-  public void setup(final OutputMutator output) throws SchemaChangeException {
-    hostName = output.addField(hostNameField, VarCharVector.class);
-    userPort = output.addField(userPortField, BigIntVector.class);
-    totalThreads = output.addField(totalThreadsField, BigIntVector.class);
-    busyThreads = output.addField(busyThreadsField, BigIntVector.class);
-  }
-
-  @Override
-  public void setRecordValues(final FragmentContext context) {
-    final DrillbitContext drillbitContext = context.getDrillbitContext();
-    final ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
-
-    final CoordinationProtos.DrillbitEndpoint endpoint = drillbitContext.getEndpoint();
-    final String address = endpoint.getAddress();
-    final VarCharVector.Mutator hostNameMutator = hostName.getMutator();
-    hostNameMutator.setSafe(0, address.getBytes());
-    hostNameMutator.setValueCount(1);
-
-    final int port = endpoint.getUserPort();
-    final BigIntVector.Mutator userPortMutator = userPort.getMutator();
-    userPortMutator.setSafe(0, port);
-    userPortMutator.setValueCount(1);
-
-    final BigIntVector.Mutator totalThreadsMutator = totalThreads.getMutator();
-    totalThreadsMutator.setSafe(0, threadMXBean.getPeakThreadCount());
-    totalThreadsMutator.setValueCount(1);
-
-    final BigIntVector.Mutator busyThreadsMutator = busyThreads.getMutator();
-    busyThreadsMutator.setSafe(0, threadMXBean.getThreadCount());
-    busyThreadsMutator.setValueCount(1);
-  }
-
-  @Override
-  public List<SqlTypeName> getFieldSqlTypeNames() {
-    return FIELDS;
-  }
-
-  @Override
-  public List<String> getFieldNames() {
-    return NAMES;
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/7cee11c2/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestSystemTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestSystemTable.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestSystemTable.java
index 4f4d29b..e86fc28 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestSystemTable.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestSystemTable.java
@@ -22,7 +22,7 @@ import org.apache.drill.exec.ExecConstants;
 import org.junit.Test;
 
 public class TestSystemTable extends BaseTestQuery {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestSystemTable.class);
+//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestSystemTable.class);
 
   @Test
   public void alterSessionOption() throws Exception {
@@ -45,15 +45,19 @@ public class TestSystemTable extends BaseTestQuery {
       .go();
   }
 
-  // need to enhance this
+  // DRILL-2670
   @Test
-  public void testThreadsTable() throws Exception {
+  public void optionsOrderBy() throws Exception {
+    test("select * from sys.options order by name");
+  }
+
+  @Test
+  public void threadsTable() throws Exception {
     test("select * from sys.threads");
   }
 
-  // need to enhance this
   @Test
-  public void testMemoryTable() throws Exception {
+  public void memoryTable() throws Exception {
     test("select * from sys.memory");
   }
 }


Mime
View raw message