pirk-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From eawilli...@apache.org
Subject [1/6] incubator-pirk git commit: [PIRK-19] Make DataSchema/QuerySchema Agnostic of Persistent Representation -- closes apache/incubator-pirk#28
Date Thu, 28 Jul 2016 19:06:34 GMT
Repository: incubator-pirk
Updated Branches:
  refs/heads/master 89052a78a -> 7f260e03f


[PIRK-19] Make DataSchema/QuerySchema Agnostic of Persistent Representation -- closes apache/incubator-pirk#28


Project: http://git-wip-us.apache.org/repos/asf/incubator-pirk/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-pirk/commit/c557fffe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-pirk/tree/c557fffe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-pirk/diff/c557fffe

Branch: refs/heads/master
Commit: c557fffe76bb4d144fa67b7071d392566115f282
Parents: 89052a7
Author: tellison <tellison@apache.org>
Authored: Thu Jul 28 14:56:55 2016 -0400
Committer: eawilliams <eawilliams@apache.org>
Committed: Thu Jul 28 14:56:55 2016 -0400

----------------------------------------------------------------------
 .../pirk/schema/data/DataSchemaLoader.java      | 113 +++++-----
 .../data/partitioner/DataPartitioner.java       |  14 +-
 .../data/partitioner/IPDataPartitioner.java     |  12 +-
 .../partitioner/ISO8601DatePartitioner.java     |  23 +-
 .../partitioner/PrimitiveTypePartitioner.java   |  19 +-
 .../pirk/schema/query/QuerySchemaLoader.java    | 216 ++++++++++++-------
 .../pirk/schema/query/filter/FilterFactory.java |  15 +-
 src/main/resources/query-schema.xsd             |  81 ++++++-
 8 files changed, 321 insertions(+), 172 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/c557fffe/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java b/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java
index f0cca32..1199cd6 100644
--- a/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java
+++ b/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java
@@ -30,7 +30,6 @@ import javax.xml.parsers.ParserConfigurationException;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
 import org.apache.pirk.schema.data.partitioner.DataPartitioner;
 import org.apache.pirk.schema.data.partitioner.PrimitiveTypePartitioner;
 import org.apache.pirk.utils.PIRException;
@@ -69,9 +68,9 @@ public class DataSchemaLoader
 {
   private static final Logger logger = LoggerFactory.getLogger(DataSchemaLoader.class);
 
-  private static HashSet<String> allowedPrimitiveJavaTypes = new HashSet<>(Arrays.asList(PrimitiveTypePartitioner.BYTE,
PrimitiveTypePartitioner.SHORT,
-      PrimitiveTypePartitioner.INT, PrimitiveTypePartitioner.LONG, PrimitiveTypePartitioner.FLOAT,
PrimitiveTypePartitioner.DOUBLE,
-      PrimitiveTypePartitioner.CHAR, PrimitiveTypePartitioner.STRING));
+  private static HashSet<String> allowedPrimitiveJavaTypes = new HashSet<>(
+      Arrays.asList(PrimitiveTypePartitioner.BYTE, PrimitiveTypePartitioner.SHORT, PrimitiveTypePartitioner.INT,
PrimitiveTypePartitioner.LONG,
+          PrimitiveTypePartitioner.FLOAT, PrimitiveTypePartitioner.DOUBLE, PrimitiveTypePartitioner.CHAR,
PrimitiveTypePartitioner.STRING));
 
   static
   {
@@ -87,51 +86,53 @@ public class DataSchemaLoader
     }
   }
 
-  /** Kept for compatibility */
+  /* Kept for compatibility */
   public static void initialize() throws Exception
   {
     initialize(false, null);
   }
 
-  /** Kept for compatibility */
+  /* Kept for compatibility */
   public static void initialize(boolean hdfs, FileSystem fs) throws Exception
   {
     String dataSchemas = SystemConfiguration.getProperty("data.schemas", "none");
-    if (!dataSchemas.equals("none"))
+    if (dataSchemas.equals("none"))
     {
-      String[] dataSchemaFiles = dataSchemas.split(",");
-      for (String schemaFile : dataSchemaFiles)
-      {
-        logger.info("Loading schemaFile = " + schemaFile + " hdfs = " + hdfs);
+      return;
+    }
 
-        // Parse and load the schema file into a DataSchema object; place in the schemaMap
-        DataSchemaLoader loader = new DataSchemaLoader();
-        InputStream is;
-        if (hdfs)
-        {
-          is = fs.open(new Path(schemaFile));
-          logger.info("hdfs: filePath = " + schemaFile.toString());
-        }
-        else
-        {
-          is = new FileInputStream(schemaFile);
-          logger.info("localFS: inputFile = " + schemaFile.toString());
-        }
+    String[] dataSchemaFiles = dataSchemas.split(",");
+    for (String schemaFile : dataSchemaFiles)
+    {
+      logger.info("Loading schemaFile = " + schemaFile + " hdfs = " + hdfs);
+
+      // Parse and load the schema file into a DataSchema object; place in the schemaMap
+      DataSchemaLoader loader = new DataSchemaLoader();
+      InputStream is;
+      if (hdfs)
+      {
+        is = fs.open(new Path(schemaFile));
+        logger.info("hdfs: filePath = " + schemaFile);
+      }
+      else
+      {
+        is = new FileInputStream(schemaFile);
+        logger.info("localFS: inputFile = " + schemaFile);
+      }
 
-        try
-        {
-          DataSchema dataSchema = loader.loadSchema(is);
-          DataSchemaRegistry.put(dataSchema);
-        } finally
-        {
-          is.close();
-        }
+      try
+      {
+        DataSchema dataSchema = loader.loadSchema(is);
+        DataSchemaRegistry.put(dataSchema);
+      } finally
+      {
+        is.close();
       }
     }
   }
 
-  /*
-   * Default constructor
+  /**
+   * Default constructor.
    */
   public DataSchemaLoader()
   {}
@@ -149,18 +150,8 @@ public class DataSchemaLoader
    */
   public DataSchema loadSchema(InputStream stream) throws IOException, PIRException
   {
-    // Read in and parse the XML schema file
-    Document doc;
-    try
-    {
-      DocumentBuilder dBuilder = DocumentBuilderFactory.newInstance().newDocumentBuilder();
-      doc = dBuilder.parse(stream);
-    } catch (ParserConfigurationException | SAXException e)
-    {
-      throw new PIRException("Data schema parsing error", e);
-    }
-    doc.getDocumentElement().normalize();
-    logger.info("Root element: " + doc.getDocumentElement().getNodeName());
+    // Read the XML schema file.
+    Document doc = parseXMLDocument(stream);
 
     // Extract the schemaName
     NodeList schemaNameList = doc.getElementsByTagName("schemaName");
@@ -181,19 +172,34 @@ public class DataSchemaLoader
       Node nNode = nList.item(i);
       if (nNode.getNodeType() == Node.ELEMENT_NODE)
       {
-        parseElementNode((Element) nNode, dataSchema);
+        extractElementNode((Element) nNode, dataSchema);
       }
     }
 
     return dataSchema;
   }
 
-  private void parseElementNode(Element eElement, DataSchema schema) throws PIRException
+  private Document parseXMLDocument(InputStream stream) throws IOException, PIRException
+  {
+    Document doc;
+    try
+    {
+      DocumentBuilder dBuilder = DocumentBuilderFactory.newInstance().newDocumentBuilder();
+      doc = dBuilder.parse(stream);
+    } catch (ParserConfigurationException | SAXException e)
+    {
+      throw new PIRException("Schema parsing error", e);
+    }
+    doc.getDocumentElement().normalize();
+    logger.info("Root element: " + doc.getDocumentElement().getNodeName());
+
+    return doc;
+  }
+
+  private void extractElementNode(Element eElement, DataSchema schema) throws PIRException
   {
     // Pull out the element name and type attributes.
     String name = eElement.getElementsByTagName("name").item(0).getTextContent().trim();
-    schema.getTextRep().put(name, new Text(name));
-
     String type = eElement.getElementsByTagName("type").item(0).getTextContent().trim();
     schema.getTypeMap().put(name, type);
 
@@ -249,6 +255,11 @@ public class DataSchemaLoader
     }
   }
 
+  /*
+   * Creates a new instance of a class with the given type name.
+   * 
+   * Throws an exception if the class cannot be instantiated, or it does not implement the
required interface.
+   */
   DataPartitioner instantiatePartitioner(String partitionerTypeName) throws PIRException
   {
     Object obj;
@@ -259,7 +270,7 @@ public class DataSchemaLoader
       obj = c.newInstance();
     } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | ClassCastException
e)
     {
-      throw new PIRException("partitioner = " + partitionerTypeName + " cannot be instantiated
or does not implement DataParitioner.", e);
+      throw new PIRException("partitioner = " + partitionerTypeName + " cannot be instantiated
or does not implement DataPartitioner.", e);
     }
 
     return (DataPartitioner) obj;

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/c557fffe/src/main/java/org/apache/pirk/schema/data/partitioner/DataPartitioner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/schema/data/partitioner/DataPartitioner.java b/src/main/java/org/apache/pirk/schema/data/partitioner/DataPartitioner.java
index cd1e632..8981dcf 100644
--- a/src/main/java/org/apache/pirk/schema/data/partitioner/DataPartitioner.java
+++ b/src/main/java/org/apache/pirk/schema/data/partitioner/DataPartitioner.java
@@ -23,6 +23,8 @@ import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.pirk.utils.PIRException;
+
 /**
  * Interface for data partitioning
  * <p>
@@ -35,32 +37,32 @@ public interface DataPartitioner extends Serializable
    * <p>
    * If the Object does not have/need a specific type identifier, use null
    */
-  ArrayList<BigInteger> toPartitions(Object object, String type) throws Exception;
+  ArrayList<BigInteger> toPartitions(Object object, String type) throws PIRException;
 
   /**
    * Method to reconstruct an Object given an ArrayList of its BigInteger partition elements
and its type identifier
    * <p>
    * If the Object does not have/need a specific type identifier, use null
    */
-  Object fromPartitions(ArrayList<BigInteger> parts, int partsIndex, String type) throws
Exception;
+  Object fromPartitions(ArrayList<BigInteger> parts, int partsIndex, String type) throws
PIRException;
 
   /**
    * Method to return the number of bits of an object with the given type
    */
-  int getBits(String type) throws Exception;
+  int getBits(String type) throws PIRException;
 
   /**
    * Create partitions for an array of the same type of elements - used when a data value
field is an array and we wish to encode these into the return value
    */
-  ArrayList<BigInteger> arrayToPartitions(List<?> elementList, String type) throws
Exception;
+  ArrayList<BigInteger> arrayToPartitions(List<?> elementList, String type) throws
PIRException;
 
   /**
    * Method to get an empty set of partitions by data type - used for padding return array
values
    */
-  ArrayList<BigInteger> getPaddedPartitions(String type) throws Exception;
+  ArrayList<BigInteger> getPaddedPartitions(String type) throws PIRException;
 
   /**
    * Method to get the number of partitions of the data object given the type
    */
-  int getNumPartitions(String type) throws Exception;
+  int getNumPartitions(String type) throws PIRException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/c557fffe/src/main/java/org/apache/pirk/schema/data/partitioner/IPDataPartitioner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/schema/data/partitioner/IPDataPartitioner.java
b/src/main/java/org/apache/pirk/schema/data/partitioner/IPDataPartitioner.java
index 494aba0..6f458e2 100644
--- a/src/main/java/org/apache/pirk/schema/data/partitioner/IPDataPartitioner.java
+++ b/src/main/java/org/apache/pirk/schema/data/partitioner/IPDataPartitioner.java
@@ -34,7 +34,7 @@ public class IPDataPartitioner implements DataPartitioner
   private static final long serialVersionUID = 1L;
 
   @Override
-  public ArrayList<BigInteger> toPartitions(Object object, String type) throws Exception
+  public ArrayList<BigInteger> toPartitions(Object object, String type)
   {
     ArrayList<BigInteger> parts = new ArrayList<>();
 
@@ -48,7 +48,7 @@ public class IPDataPartitioner implements DataPartitioner
   }
 
   @Override
-  public Object fromPartitions(ArrayList<BigInteger> parts, int partsIndex, String
type) throws Exception
+  public Object fromPartitions(ArrayList<BigInteger> parts, int partsIndex, String
type)
   {
     Object element;
 
@@ -59,13 +59,13 @@ public class IPDataPartitioner implements DataPartitioner
   }
 
   @Override
-  public int getBits(String type) throws Exception
+  public int getBits(String type)
   {
     return Integer.SIZE;
   }
 
   @Override
-  public ArrayList<BigInteger> getPaddedPartitions(String type) throws Exception
+  public ArrayList<BigInteger> getPaddedPartitions(String type)
   {
     ArrayList<BigInteger> parts = new ArrayList<>();
 
@@ -80,7 +80,7 @@ public class IPDataPartitioner implements DataPartitioner
    * Create partitions for an array of the same type of elements - used when a data value
field is an array and we wish to encode these into the return value
    */
   @Override
-  public ArrayList<BigInteger> arrayToPartitions(List<?> elementList, String
type) throws Exception
+  public ArrayList<BigInteger> arrayToPartitions(List<?> elementList, String
type)
   {
     ArrayList<BigInteger> parts = new ArrayList<>();
 
@@ -101,7 +101,7 @@ public class IPDataPartitioner implements DataPartitioner
   }
 
   @Override
-  public int getNumPartitions(String type) throws Exception
+  public int getNumPartitions(String type)
   {
     return 4;
   }

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/c557fffe/src/main/java/org/apache/pirk/schema/data/partitioner/ISO8601DatePartitioner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/schema/data/partitioner/ISO8601DatePartitioner.java
b/src/main/java/org/apache/pirk/schema/data/partitioner/ISO8601DatePartitioner.java
index 715bf15..329a083 100644
--- a/src/main/java/org/apache/pirk/schema/data/partitioner/ISO8601DatePartitioner.java
+++ b/src/main/java/org/apache/pirk/schema/data/partitioner/ISO8601DatePartitioner.java
@@ -19,10 +19,12 @@
 package org.apache.pirk.schema.data.partitioner;
 
 import java.math.BigInteger;
+import java.text.ParseException;
 import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.pirk.utils.ISO8601DateParser;
+import org.apache.pirk.utils.PIRException;
 
 /**
  * Partitioner class for ISO8061 dates
@@ -41,15 +43,22 @@ public class ISO8601DatePartitioner implements DataPartitioner
   }
 
   @Override
-  public ArrayList<BigInteger> toPartitions(Object object, String type) throws Exception
+  public ArrayList<BigInteger> toPartitions(Object object, String type) throws PIRException
   {
-    long dateLongFormat = ISO8601DateParser.getLongDate((String) object);
+    long dateLongFormat;
+    try
+    {
+      dateLongFormat = ISO8601DateParser.getLongDate((String) object);
+    } catch (ParseException e)
+    {
+      throw new PIRException("Unable to parse ISO8601 date " + object, e);
+    }
 
     return ptp.toPartitions(dateLongFormat, PrimitiveTypePartitioner.LONG);
   }
 
   @Override
-  public Object fromPartitions(ArrayList<BigInteger> parts, int partsIndex, String
type) throws Exception
+  public Object fromPartitions(ArrayList<BigInteger> parts, int partsIndex, String
type) throws PIRException
   {
     long dateLongFormat = (long) ptp.fromPartitions(parts, partsIndex, PrimitiveTypePartitioner.LONG);
 
@@ -57,25 +66,25 @@ public class ISO8601DatePartitioner implements DataPartitioner
   }
 
   @Override
-  public int getBits(String type) throws Exception
+  public int getBits(String type)
   {
     return Long.SIZE;
   }
 
   @Override
-  public ArrayList<BigInteger> arrayToPartitions(List<?> elementList, String
type) throws Exception
+  public ArrayList<BigInteger> arrayToPartitions(List<?> elementList, String
type) throws PIRException
   {
     return ptp.arrayToPartitions(elementList, PrimitiveTypePartitioner.LONG);
   }
 
   @Override
-  public ArrayList<BigInteger> getPaddedPartitions(String type) throws Exception
+  public ArrayList<BigInteger> getPaddedPartitions(String type) throws PIRException
   {
     return ptp.getPaddedPartitions(PrimitiveTypePartitioner.LONG);
   }
 
   @Override
-  public int getNumPartitions(String type) throws Exception
+  public int getNumPartitions(String type) throws PIRException
   {
     return ptp.getNumPartitions(PrimitiveTypePartitioner.LONG);
   }

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/c557fffe/src/main/java/org/apache/pirk/schema/data/partitioner/PrimitiveTypePartitioner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/schema/data/partitioner/PrimitiveTypePartitioner.java
b/src/main/java/org/apache/pirk/schema/data/partitioner/PrimitiveTypePartitioner.java
index 3688e81..e0473f6 100644
--- a/src/main/java/org/apache/pirk/schema/data/partitioner/PrimitiveTypePartitioner.java
+++ b/src/main/java/org/apache/pirk/schema/data/partitioner/PrimitiveTypePartitioner.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.http.util.ByteArrayBuffer;
+import org.apache.pirk.utils.PIRException;
 import org.apache.pirk.utils.SystemConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -108,7 +109,7 @@ public class PrimitiveTypePartitioner implements DataPartitioner
    * 
    */
   @Override
-  public int getNumPartitions(String type) throws Exception
+  public int getNumPartitions(String type) throws PIRException
   {
     int partitionSize = 8;
 
@@ -140,7 +141,7 @@ public class PrimitiveTypePartitioner implements DataPartitioner
         numParts = Integer.parseInt(SystemConfiguration.getProperty("pir.stringBits")) /
partitionSize;
         break;
       default:
-        throw new Exception("type = " + type + " not recognized!");
+        throw new PIRException("type = " + type + " not recognized!");
     }
     return numParts;
   }
@@ -149,7 +150,7 @@ public class PrimitiveTypePartitioner implements DataPartitioner
    * Get the bit size of the allowed primitive java types
    */
   @Override
-  public int getBits(String type) throws Exception
+  public int getBits(String type) throws PIRException
   {
     int bits;
     switch (type)
@@ -179,7 +180,7 @@ public class PrimitiveTypePartitioner implements DataPartitioner
         bits = Integer.parseInt(SystemConfiguration.getProperty("pir.stringBits"));
         break;
       default:
-        throw new Exception("type = " + type + " not recognized!");
+        throw new PIRException("type = " + type + " not recognized!");
     }
     return bits;
   }
@@ -188,7 +189,7 @@ public class PrimitiveTypePartitioner implements DataPartitioner
    * Reconstructs the object from the partitions
    */
   @Override
-  public Object fromPartitions(ArrayList<BigInteger> parts, int partsIndex, String
type) throws Exception
+  public Object fromPartitions(ArrayList<BigInteger> parts, int partsIndex, String
type) throws PIRException
   {
     Object element;
 
@@ -242,7 +243,7 @@ public class PrimitiveTypePartitioner implements DataPartitioner
         break;
       }
       default:
-        throw new Exception("type = " + type + " not recognized!");
+        throw new PIRException("type = " + type + " not recognized!");
     }
     return element;
   }
@@ -264,7 +265,7 @@ public class PrimitiveTypePartitioner implements DataPartitioner
    * Partitions an object to an ArrayList of BigInteger values, currently represents an 8-bit
partitioning
    */
   @Override
-  public ArrayList<BigInteger> toPartitions(Object obj, String type) throws Exception
+  public ArrayList<BigInteger> toPartitions(Object obj, String type) throws PIRException
   {
     ArrayList<BigInteger> parts = new ArrayList<>();
 
@@ -378,7 +379,7 @@ public class PrimitiveTypePartitioner implements DataPartitioner
    * Method to get an empty set of partitions by data type - used for padding return array
values
    */
   @Override
-  public ArrayList<BigInteger> getPaddedPartitions(String type) throws Exception
+  public ArrayList<BigInteger> getPaddedPartitions(String type) throws PIRException
   {
     ArrayList<BigInteger> parts = new ArrayList<>();
 
@@ -433,7 +434,7 @@ public class PrimitiveTypePartitioner implements DataPartitioner
    * Create partitions for an array of the same type of elements - used when a data value
field is an array and we wish to encode these into the return value
    */
   @Override
-  public ArrayList<BigInteger> arrayToPartitions(List<?> elementList, String
type) throws Exception
+  public ArrayList<BigInteger> arrayToPartitions(List<?> elementList, String
type) throws PIRException
   {
     ArrayList<BigInteger> parts = new ArrayList<>();
 

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/c557fffe/src/main/java/org/apache/pirk/schema/query/QuerySchemaLoader.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/schema/query/QuerySchemaLoader.java b/src/main/java/org/apache/pirk/schema/query/QuerySchemaLoader.java
index a8445ca..d767f2d 100644
--- a/src/main/java/org/apache/pirk/schema/query/QuerySchemaLoader.java
+++ b/src/main/java/org/apache/pirk/schema/query/QuerySchemaLoader.java
@@ -18,13 +18,16 @@
  */
 package org.apache.pirk.schema.query;
 
-import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
 import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.Set;
 
 import javax.xml.parsers.DocumentBuilder;
 import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -33,6 +36,7 @@ import org.apache.pirk.schema.data.DataSchemaRegistry;
 import org.apache.pirk.schema.data.partitioner.DataPartitioner;
 import org.apache.pirk.schema.query.filter.DataFilter;
 import org.apache.pirk.schema.query.filter.FilterFactory;
+import org.apache.pirk.utils.PIRException;
 import org.apache.pirk.utils.SystemConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,6 +44,7 @@ import org.w3c.dom.Document;
 import org.w3c.dom.Element;
 import org.w3c.dom.Node;
 import org.w3c.dom.NodeList;
+import org.xml.sax.SAXException;
 
 /**
  * Class to load any query schemas specified in the properties file, 'query.schemas'
@@ -85,79 +90,102 @@ public class QuerySchemaLoader
     }
   }
 
+  /* Kept for compatibility */
   public static void initialize() throws Exception
   {
     initialize(false, null);
   }
 
+  /* Kept for compatibility */
   public static void initialize(boolean hdfs, FileSystem fs) throws Exception
   {
     String querySchemas = SystemConfiguration.getProperty("query.schemas", "none");
-    if (!querySchemas.equals("none"))
+    if (querySchemas.equals("none"))
     {
-      String[] querySchemaFiles = querySchemas.split(",");
-      for (String schemaFile : querySchemaFiles)
+      return;
+    }
+    String[] querySchemaFiles = querySchemas.split(",");
+    for (String schemaFile : querySchemaFiles)
+    {
+      logger.info("Loading schemaFile = " + schemaFile);
+
+      // Parse and load the schema file into a QuerySchema object; place in the schemaMap
+      QuerySchemaLoader loader = new QuerySchemaLoader();
+      InputStream is;
+      if (hdfs)
       {
-        logger.info("Loading schemaFile = " + schemaFile);
+        is = fs.open(new Path(schemaFile));
+        logger.info("hdfs: filePath = " + schemaFile);
+      }
+      else
+      {
+        is = new FileInputStream(schemaFile);
+        logger.info("localFS: inputFile = " + schemaFile);
+      }
 
-        // Parse and load the schema file into a QuerySchema object; place in the schemaMap
-        QuerySchema querySchema = loadQuerySchemaFile(schemaFile, hdfs, fs);
+      try
+      {
+        QuerySchema querySchema = loader.loadSchema(is);
         QuerySchemaRegistry.put(querySchema);
+      } finally
+      {
+        is.close();
       }
     }
   }
 
-  private static QuerySchema loadQuerySchemaFile(String schemaFile, boolean hdfs, FileSystem
fs) throws Exception
+  /**
+   * Default constructor.
+   */
+  public QuerySchemaLoader()
   {
-    QuerySchema querySchema;
 
-    DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance();
-    DocumentBuilder dBuilder = dbFactory.newDocumentBuilder();
+  }
 
-    // Read in and parse the schema file
-    Document doc;
-    if (hdfs)
-    {
-      Path filePath = new Path(schemaFile);
-      doc = dBuilder.parse(fs.open(filePath));
-      logger.info("hdfs: filePath = " + filePath.toString());
-    }
-    else
-    {
-      File inputFile = new File(schemaFile);
-      doc = dBuilder.parse(inputFile);
-      logger.info("localFS: inputFile = " + inputFile.toString());
-    }
-    doc.getDocumentElement().normalize();
-    logger.info("Root element: " + doc.getDocumentElement().getNodeName());
+  /**
+   * Returns the query schema as defined in XML format on the given stream.
+   * 
+   * @param stream
+   *          The source of the XML query schema description.
+   * @return The query schema.
+   * @throws IOException
+   *           A problem occurred reading from the given stream.
+   * @throws PIRException
+   *           The schema description is invalid.
+   */
+  public QuerySchema loadSchema(InputStream stream) throws IOException, PIRException
+  {
+    // Read in and parse the XML file.
+    Document doc = parseXMLDocument(stream);
 
-    // Extract the schemaName
+    // Extract the schemaName.
     String schemaName = extractValue(doc, "schemaName");
     logger.info("schemaName = " + schemaName);
 
-    // Extract the dataSchemaName
+    // Extract the dataSchemaName.
     String dataSchemaName = extractValue(doc, "dataSchemaName");
     logger.info("dataSchemaName = " + dataSchemaName);
 
+    // We must have a matching data schema for this query.
     DataSchema dataSchema = DataSchemaRegistry.get(dataSchemaName);
     if (dataSchema == null)
     {
-      throw new Exception("Loaded DataSchema does not exist for dataSchemaName = " + dataSchemaName);
+      throw new PIRException("Loaded DataSchema does not exist for dataSchemaName = " + dataSchemaName);
     }
 
-    // Extract the selectorName
+    // Extract the selectorName, and ensure it matches an element in the data schema.
     String selectorName = extractValue(doc, "selectorName");
     logger.info("selectorName = " + selectorName);
     if (!dataSchema.containsElement(selectorName))
     {
-      throw new Exception("dataSchema = " + dataSchemaName + " does not contain selectorName
= " + selectorName);
+      throw new PIRException("dataSchema = " + dataSchemaName + " does not contain selectorName
= " + selectorName);
     }
 
-    // Extract the elements
+    // Extract the query elements.
     NodeList elementsList = doc.getElementsByTagName("elements");
-    if (elementsList.getLength() > 1)
+    if (elementsList.getLength() != 1)
     {
-      throw new Exception("elementsList.getLength() = " + elementsList.getLength() + " --
should be 1");
+      throw new PIRException("elementsList.getLength() = " + elementsList.getLength() + "
-- should be 1");
     }
     Element elements = (Element) elementsList.item(0);
 
@@ -169,29 +197,27 @@ public class QuerySchemaLoader
       Node nNode = nList.item(i);
       if (nNode.getNodeType() == Node.ELEMENT_NODE)
       {
-        Element eElement = (Element) nNode;
-
-        // Pull the name and add to the TreeSet
-        String name = eElement.getFirstChild().getNodeValue().trim();
-        elementNames.add(name);
-
-        // Compute the number of bits for this element
-        logger.info("name = " + name);
-        logger.info("partitionerName = " + dataSchema.getPartitionerTypeName(name));
-        if ((dataSchema.getPartitionerForElement(name)) == null)
+        // Pull the name
+        String queryElementName = ((Element) nNode).getFirstChild().getNodeValue().trim();
+        if (!dataSchema.containsElement(queryElementName))
         {
-          logger.info("partitioner is null");
+          throw new PIRException("dataSchema = " + dataSchemaName + " does not contain requested
element name = " + queryElementName);
         }
-        int bits = ((DataPartitioner) dataSchema.getPartitionerForElement(name)).getBits(dataSchema.getElementType(name));
+        elementNames.add(queryElementName);
+        logger.info("name = " + queryElementName + " partitionerName = " + dataSchema.getPartitionerTypeName(queryElementName));
 
-        // Multiply by the number of array elements allowed, if applicable
-        if (dataSchema.getArrayElements().contains(name))
+        // Compute the number of bits for this element.
+        DataPartitioner partitioner = dataSchema.getPartitionerForElement(queryElementName);
+        int bits = partitioner.getBits(dataSchema.getElementType(queryElementName));
+
+        // Multiply by the number of array elements allowed, if applicable.
+        if (dataSchema.isArrayElement(queryElementName))
         {
           bits *= Integer.parseInt(SystemConfiguration.getProperty("pir.numReturnArrayElements"));
         }
         dataElementSize += bits;
 
-        logger.info("name = " + name + " bits = " + bits + " dataElementSize = " + dataElementSize);
+        logger.info("name = " + queryElementName + " bits = " + bits + " dataElementSize
= " + dataElementSize);
       }
     }
 
@@ -202,62 +228,88 @@ public class QuerySchemaLoader
       filterTypeName = doc.getElementsByTagName("filter").item(0).getTextContent().trim();
     }
 
-    // Extract the filterNames, if they exist
-    HashSet<String> filterNamesSet = new HashSet<>();
-    if (doc.getElementsByTagName("filterNames").item(0) != null)
+    // Create a filter over the query elements.
+    Set<String> filteredNamesSet = extractFilteredElementNames(doc);
+    DataFilter filter = instantiateFilter(filterTypeName, filteredNamesSet);
+
+    // Create and return the query schema object.
+    QuerySchema querySchema = new QuerySchema(schemaName, dataSchemaName, selectorName, filterTypeName,
filter, dataElementSize);
+    querySchema.getElementNames().addAll(elementNames);
+    querySchema.getFilteredElementNames().addAll(filteredNamesSet);
+    return querySchema;
+  }
+
+  /*
+   * Parses and normalizes the XML document available on the given stream.
+   */
+  private Document parseXMLDocument(InputStream stream) throws IOException, PIRException
+  {
+    Document doc;
+    try
+    {
+      DocumentBuilder dBuilder = DocumentBuilderFactory.newInstance().newDocumentBuilder();
+      doc = dBuilder.parse(stream);
+    } catch (ParserConfigurationException | SAXException e)
+    {
+      throw new PIRException("Schema parsing error", e);
+    }
+    doc.getDocumentElement().normalize();
+    logger.info("Root element: " + doc.getDocumentElement().getNodeName());
+
+    return doc;
+  }
+
+  /*
+   * Returns the possibly empty set of element names over which the filter is applied, maintaining
document order.
+   */
+  private Set<String> extractFilteredElementNames(Document doc) throws PIRException
+  {
+    HashSet<String> filteredNamesSet = new HashSet<>();
+
+    NodeList filterNamesList = doc.getElementsByTagName("filterNames");
+    if (filterNamesList.getLength() != 0)
     {
-      NodeList filterNamesList = doc.getElementsByTagName("filterNames");
       if (filterNamesList.getLength() > 1)
       {
-        throw new Exception("filterNamesList.getLength() = " + filterNamesList.getLength()
+ " -- should be 1");
+        throw new PIRException("filterNamesList.getLength() = " + filterNamesList.getLength()
+ " -- should be 0 or 1");
       }
-      Element filterNames = (Element) filterNamesList.item(0);
 
-      NodeList filterNList = filterNames.getElementsByTagName("name");
+      // Extract element names from the list.
+      Element foo = (Element) filterNamesList.item(0);
+      NodeList filterNList = ((Element) filterNamesList.item(0)).getElementsByTagName("name");
       for (int i = 0; i < filterNList.getLength(); i++)
       {
         Node nNode = filterNList.item(i);
         if (nNode.getNodeType() == Node.ELEMENT_NODE)
         {
-          Element eElement = (Element) nNode;
-
-          // Pull the name and add to the TreeSet
-          String name = eElement.getFirstChild().getNodeValue().trim();
-          filterNamesSet.add(name);
+          // Pull the name and add to the set.
+          String name = ((Element) nNode).getFirstChild().getNodeValue().trim();
+          filteredNamesSet.add(name);
 
           logger.info("filterName = " + name);
         }
       }
     }
-
-    // Create the query schema object
-
-    DataFilter filter = instantiateFilter(filterTypeName, filterNamesSet);
-    querySchema = new QuerySchema(schemaName, dataSchemaName, selectorName, filterTypeName,
filter, dataElementSize);
-    querySchema.getElementNames().addAll(elementNames);
-    querySchema.getFilteredElementNames().addAll(filterNamesSet);
-    return querySchema;
+    return filteredNamesSet;
   }
 
-  /**
-   * Extracts a top level, single value from the xml structure
+  /*
+   * Extracts a top level, single value from the XML structure.
+   * 
+   * Throws an exception if there is not exactly one tag with the given name.
    */
-  private static String extractValue(Document doc, String valueName) throws Exception
+  private String extractValue(Document doc, String tagName) throws PIRException
   {
-    NodeList itemList = doc.getElementsByTagName(valueName);
-    if (itemList.getLength() > 1)
+    NodeList itemList = doc.getElementsByTagName(tagName);
+    if (itemList.getLength() != 1)
     {
-      throw new Exception("itemList.getLength() = " + itemList.getLength() + " -- should
be 1");
+      throw new PIRException("itemList.getLength() = " + itemList.getLength() + " -- should
be 1");
     }
     return itemList.item(0).getTextContent().trim();
   }
 
-  private static DataFilter instantiateFilter(String filterTypeName, Set<String> filteredElementNames)
throws Exception
+  private DataFilter instantiateFilter(String filterTypeName, Set<String> filteredElementNames)
throws IOException, PIRException
   {
-    if (!filterTypeName.equals(NO_FILTER))
-    {
-      return FilterFactory.getFilter(filterTypeName, filteredElementNames);
-    }
-    return null;
+    return filterTypeName.equals(NO_FILTER) ? null : FilterFactory.getFilter(filterTypeName,
filteredElementNames);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/c557fffe/src/main/java/org/apache/pirk/schema/query/filter/FilterFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/schema/query/filter/FilterFactory.java b/src/main/java/org/apache/pirk/schema/query/filter/FilterFactory.java
index bfef7e3..c44e1e8 100644
--- a/src/main/java/org/apache/pirk/schema/query/filter/FilterFactory.java
+++ b/src/main/java/org/apache/pirk/schema/query/filter/FilterFactory.java
@@ -21,6 +21,7 @@ package org.apache.pirk.schema.query.filter;
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileReader;
+import java.io.IOException;
 import java.io.InputStreamReader;
 import java.util.HashSet;
 import java.util.Set;
@@ -28,6 +29,7 @@ import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.pirk.utils.PIRException;
 import org.apache.pirk.utils.SystemConfiguration;
 
 /**
@@ -35,7 +37,7 @@ import org.apache.pirk.utils.SystemConfiguration;
  */
 public class FilterFactory
 {
-  public static DataFilter getFilter(String filterName, Set<String> filteredElementNames)
throws Exception
+  public static DataFilter getFilter(String filterName, Set<String> filteredElementNames)
throws IOException, PIRException
   {
     Object obj = null;
 
@@ -72,11 +74,14 @@ public class FilterFactory
     else
     {
       // Instantiate and validate the interface implementation
-      Class c = Class.forName(filterName);
-      obj = c.newInstance();
-      if (!(obj instanceof DataFilter))
+      try
       {
-        throw new Exception("filterName = " + filterName + " DOES NOT implement the DataFilter
interface");
+        @SuppressWarnings("unchecked")
+        Class<? extends DataFilter> c = (Class<? extends DataFilter>) Class.forName(filterName);
+        obj = c.newInstance();
+      } catch (ClassNotFoundException | InstantiationException | IllegalAccessException |
ClassCastException e)
+      {
+        throw new PIRException("filterName = " + filterName + " cannot be instantiated or
does not implement DataFilter interface");
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/c557fffe/src/main/resources/query-schema.xsd
----------------------------------------------------------------------
diff --git a/src/main/resources/query-schema.xsd b/src/main/resources/query-schema.xsd
index a0657e4..65a36ce 100644
--- a/src/main/resources/query-schema.xsd
+++ b/src/main/resources/query-schema.xsd
@@ -22,24 +22,93 @@
     <xs:element name="schema">
         <xs:complexType>
             <xs:sequence>
-                <xs:element name="schemaName" type="xs:string" />
-                <xs:element name="dataSchemaName" type="xs:string" />
-                <xs:element name="selectorName" type="xs:string" />
+                <xs:element name="schemaName" type="xs:string">
+                    <xs:annotation>
+                        <xs:documentation>
+                            The name of the query schema.
+                            The name omits leading and trailing
+                            whitespace, and is case sensitive.
+                        </xs:documentation>
+                    </xs:annotation>
+                </xs:element>
+
+                <xs:element name="dataSchemaName" type="xs:string">
+                    <xs:annotation>
+                        <xs:documentation>
+                            The name of the data schema
+                            over which this query is run. The name omits
+                            leading and trailing whitespace, and is case
+                            sensitive.
+                        </xs:documentation>
+                    </xs:annotation>
+                </xs:element>
+
+                <xs:element name="selectorName" type="xs:string">
+                    <xs:annotation>
+                        <xs:documentation>The name of the name of the
+                            element in the data schema that will be the
+                            selector for this query.
+                        </xs:documentation>
+                    </xs:annotation>
+                </xs:element>
+
                 <xs:element name="elements">
+                    <xs:annotation>
+                        <xs:documentation>
+                            The set of element names to
+                            include in the query response.
+                        </xs:documentation>
+                    </xs:annotation>
                     <xs:complexType>
                         <xs:sequence>
+
                             <xs:element name="name" type="xs:string"
-                                maxOccurs="unbounded" />
+                                maxOccurs="unbounded">
+                                <xs:annotation>
+                                    <xs:documentation>
+                                        The name of an
+                                        element in the data schema to
+                                        include in the query response.
+                                    </xs:documentation>
+                                </xs:annotation>
+                            </xs:element>
+
                         </xs:sequence>
                     </xs:complexType>
                 </xs:element>
+
                 <xs:element name="filter" type="xs:string"
-                    minOccurs="0" />
+                    minOccurs="0">
+                    <xs:annotation>
+                        <xs:documentation>
+                            The name of a class used to
+                            filter the query response data.
+                        </xs:documentation>
+                    </xs:annotation>
+                </xs:element>
+
                 <xs:element name="filterNames" minOccurs="0"
                     maxOccurs="unbounded">
+                    <xs:annotation>
+                        <xs:documentation>
+                            The set of data element names
+                            over which the
+                            response filter is applied.
+                        </xs:documentation>
+                    </xs:annotation>
                     <xs:complexType>
                         <xs:sequence>
-                            <xs:element name="name" type="xs:string" />
+
+                            <xs:element name="name" type="xs:string">
+                                <xs:annotation>
+                                    <xs:documentation>
+                                        The name of an
+                                        element in the data schema over
+                                        which to apply the filter.
+                                    </xs:documentation>
+                                </xs:annotation>
+                            </xs:element>
+
                         </xs:sequence>
                     </xs:complexType>
                 </xs:element>



Mime
View raw message