pirk-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From eawilli...@apache.org
Subject incubator-pirk git commit: PIRK-25 Serialization and storage changes to Querier, Query, and Response - closes apache/incubator-pirk#18
Date Sat, 23 Jul 2016 23:35:03 GMT
Repository: incubator-pirk
Updated Branches:
  refs/heads/master ef8d1c1a5 -> 442b21790


PIRK-25 Serialization and storage changes to Querier, Query, and Response - closes apache/incubator-pirk#18


Project: http://git-wip-us.apache.org/repos/asf/incubator-pirk/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-pirk/commit/442b2179
Tree: http://git-wip-us.apache.org/repos/asf/incubator-pirk/tree/442b2179
Diff: http://git-wip-us.apache.org/repos/asf/incubator-pirk/diff/442b2179

Branch: refs/heads/master
Commit: 442b21790d16bf185b6bd8b56d649ae79b35ade0
Parents: ef8d1c1
Author: tellison <tellison@apache.org>
Authored: Sat Jul 23 19:34:45 2016 -0400
Committer: eawilliams <eawilliams@apache.org>
Committed: Sat Jul 23 19:34:45 2016 -0400

----------------------------------------------------------------------
 .../apache/pirk/querier/wideskies/Querier.java  |  91 +-----------
 .../pirk/querier/wideskies/QuerierDriver.java   |   9 +-
 .../querier/wideskies/encrypt/EncryptQuery.java |  34 -----
 .../org/apache/pirk/query/wideskies/Query.java  | 148 +------------------
 .../responder/wideskies/ResponderDriver.java    |   3 +-
 .../wideskies/mapreduce/ColumnMultReducer.java  |   3 +-
 .../mapreduce/ComputeResponseTool.java          |   5 +-
 .../wideskies/mapreduce/ExpTableMapper.java     |   5 +-
 .../mapreduce/FinalResponseReducer.java         |   9 +-
 .../HashSelectorsAndPartitionDataMapper.java    |   4 +-
 .../wideskies/mapreduce/RowCalcReducer.java     |   3 +-
 .../wideskies/spark/ComputeExpLookupTable.java  |  10 +-
 .../wideskies/spark/ComputeResponse.java        |  16 +-
 .../wideskies/standalone/Responder.java         |   3 +-
 .../pirk/response/wideskies/Response.java       | 134 +----------------
 .../serialization/HadoopFileSystemStore.java    |  94 ++++++++++++
 .../pirk/serialization/JavaSerializer.java      |  49 ++++++
 .../pirk/serialization/JsonSerializer.java      |  41 +++++
 .../serialization/LocalFileSystemStore.java     |  82 ++++++++++
 .../serialization/SerializationService.java     |  49 ++++++
 .../org/apache/pirk/serialization/Storable.java |  25 ++++
 .../pirk/serialization/StorageService.java      |  39 +++++
 .../distributed/testsuite/DistTestSuite.java    |   5 +-
 .../apache/pirk/test/utils/StandaloneQuery.java |  13 +-
 24 files changed, 448 insertions(+), 426 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/querier/wideskies/Querier.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/querier/wideskies/Querier.java b/src/main/java/org/apache/pirk/querier/wideskies/Querier.java
index 7ffc7a0..4d6523d 100644
--- a/src/main/java/org/apache/pirk/querier/wideskies/Querier.java
+++ b/src/main/java/org/apache/pirk/querier/wideskies/Querier.java
@@ -1,4 +1,4 @@
-/*
+/*******************************************************************************
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,15 +15,9 @@
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- */
+ *******************************************************************************/
 package org.apache.pirk.querier.wideskies;
 
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -31,6 +25,7 @@ import java.util.HashMap;
 import org.apache.pirk.encryption.Paillier;
 import org.apache.pirk.query.wideskies.Query;
 import org.apache.pirk.query.wideskies.QueryInfo;
+import org.apache.pirk.serialization.Storable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,7 +33,7 @@ import org.slf4j.LoggerFactory;
  * Class to hold the information necessary for the PIR querier to perform decryption
  * 
  */
-public class Querier implements Serializable
+public class Querier implements Serializable, Storable
 {
   private static final long serialVersionUID = 1L;
 
@@ -95,82 +90,4 @@ public class Querier implements Serializable
   {
     return embedSelectorMap;
   }
-
-  /**
-   * Method to serialize the Querier object to a file
-   */
-  public void writeToFile(String filename) throws IOException
-  {
-    writeToFile(new File(filename));
-  }
-
-  /**
-   * Method to serialize the Querier object to a file
-   */
-  public void writeToFile(File file) throws IOException
-  {
-    ObjectOutputStream oos = null;
-    FileOutputStream fout = null;
-    try
-    {
-      fout = new FileOutputStream(file, true);
-      oos = new ObjectOutputStream(fout);
-      oos.writeObject(this);
-    } catch (Exception ex)
-    {
-      ex.printStackTrace();
-    } finally
-    {
-      if (oos != null)
-      {
-        oos.close();
-      }
-      if (fout != null)
-      {
-        fout.close();
-      }
-    }
-  }
-
-  /**
-   * Reconstruct the Querier object from its file serialization
-   */
-  public static Querier readFromFile(String filename) throws IOException
-  {
-
-    return readFromFile(new File(filename));
-  }
-
-  /**
-   * Reconstruct the Querier object from its file serialization
-   */
-  public static Querier readFromFile(File file) throws IOException
-  {
-    Querier querier = null;
-
-    FileInputStream fIn = null;
-    ObjectInputStream oIn;
-    try
-    {
-      fIn = new FileInputStream(file);
-      oIn = new ObjectInputStream(fIn);
-      querier = (Querier) oIn.readObject();
-    } catch (IOException | ClassNotFoundException e)
-    {
-      e.printStackTrace();
-    } finally
-    {
-      if (fIn != null)
-      {
-        try
-        {
-          fIn.close();
-        } catch (IOException e)
-        {
-          e.printStackTrace();
-        }
-      }
-    }
-    return querier;
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/querier/wideskies/QuerierDriver.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/querier/wideskies/QuerierDriver.java b/src/main/java/org/apache/pirk/querier/wideskies/QuerierDriver.java
index 01a6c86..8f287fd 100644
--- a/src/main/java/org/apache/pirk/querier/wideskies/QuerierDriver.java
+++ b/src/main/java/org/apache/pirk/querier/wideskies/QuerierDriver.java
@@ -29,6 +29,7 @@ import org.apache.pirk.querier.wideskies.encrypt.EncryptQuery;
 import org.apache.pirk.query.wideskies.QueryInfo;
 import org.apache.pirk.response.wideskies.Response;
 import org.apache.pirk.schema.query.LoadQuerySchemas;
+import org.apache.pirk.serialization.LocalFileSystemStore;
 import org.apache.pirk.utils.FileIOUtils;
 import org.apache.pirk.utils.PIRException;
 import org.apache.pirk.utils.SystemConfiguration;
@@ -82,6 +83,7 @@ public class QuerierDriver implements Serializable
     String outputFile;
     String queryType = null;
     int numThreads;
+    LocalFileSystemStore storage = new LocalFileSystemStore();
 
     // Encryption variables
     int hashBitSize = 0;
@@ -182,14 +184,15 @@ public class QuerierDriver implements Serializable
       // Write necessary output files - two files written -
       // (1) Querier object to <outputFile>-QuerierConst.QUERIER_FILETAG
       // (2) Query object to <outputFile>-QuerierConst.QUERY_FILETAG
-      encryptQuery.writeOutputFiles(outputFile);
+      storage.store(outputFile + "-" + QuerierConst.QUERIER_FILETAG, encryptQuery.getQuerier());
+      storage.store(outputFile + "-" + QuerierConst.QUERY_FILETAG, encryptQuery.getQuery());
     }
     else
     // Decryption
     {
       // Reconstruct the necessary objects from the files
-      Response response = Response.readFromFile(inputFile);
-      Querier querier = Querier.readFromFile(querierFile);
+      Response response = storage.recall(inputFile, Response.class);
+      Querier querier = storage.recall(querierFile, Querier.class);
 
       // Perform decryption and output the result file
       DecryptResponse decryptResponse = new DecryptResponse(response, querier);

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQuery.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQuery.java b/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQuery.java
index 87ee9d9..a277c46 100644
--- a/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQuery.java
+++ b/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQuery.java
@@ -258,38 +258,4 @@ public class EncryptQuery
     }
     logger.info("Completed creation of encrypted query vectors");
   }
-
-  /**
-   * Creates two output files - two files written:
-   * <p>
-   * (1) Querier object to <filePrefix>-QuerierConst.QUERIER_FILETAG
-   * <p>
-   * (2) Query object to <filePrefix>-QuerierConst.QUERY_FILETAG
-   */
-  public void writeOutputFiles(String filePrefix) throws IOException
-  {
-    // Write the Querier object
-    querier.writeToFile(filePrefix + "-" + QuerierConst.QUERIER_FILETAG);
-
-    // Write the Query object
-    query.writeToFile(filePrefix + "-" + QuerierConst.QUERY_FILETAG);
-  }
-
-  /**
-   * Creates two output files - two files written:
-   * <p>
-   * (1) Querier object to <filePrefix>-QuerierConst.QUERIER_FILETAG
-   * <p>
-   * (2) Query object to <filePrefix>-QuerierConst.QUERY_FILETAG
-   * <p>
-   * This method is used for functional testing
-   */
-  public void writeOutputFiles(File fileQuerier, File fileQuery) throws IOException
-  {
-    // Write the Querier object
-    querier.writeToFile(fileQuerier);
-
-    // Write the Query object
-    query.writeToFile(fileQuery);
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/query/wideskies/Query.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/query/wideskies/Query.java b/src/main/java/org/apache/pirk/query/wideskies/Query.java
index ebaafbb..2035d4b 100644
--- a/src/main/java/org/apache/pirk/query/wideskies/Query.java
+++ b/src/main/java/org/apache/pirk/query/wideskies/Query.java
@@ -18,12 +18,6 @@
  */
 package org.apache.pirk.query.wideskies;
 
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 import java.io.Serializable;
 import java.math.BigInteger;
 import java.util.ArrayList;
@@ -33,10 +27,9 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.pirk.encryption.ModPowAbstraction;
 import org.apache.pirk.querier.wideskies.encrypt.ExpTableRunnable;
+import org.apache.pirk.serialization.Storable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,7 +37,7 @@ import org.slf4j.LoggerFactory;
  * Class to hold the PIR query vectors
  *
  */
-public class Query implements Serializable
+public class Query implements Serializable, Storable
 {
   private static final long serialVersionUID = 1L;
 
@@ -225,141 +218,4 @@ public class Query implements Serializable
   {
     return expTable.get(value).get(power);
   }
-
-  public void writeToFile(String filename) throws IOException
-  {
-    writeToFile(new File(filename));
-  }
-
-  public void writeToFile(File file) throws IOException
-  {
-    ObjectOutputStream oos = null;
-    FileOutputStream fout = null;
-    try
-    {
-      fout = new FileOutputStream(file, true);
-      oos = new ObjectOutputStream(fout);
-      oos.writeObject(this);
-    } catch (Exception ex)
-    {
-      ex.printStackTrace();
-    } finally
-    {
-      if (oos != null)
-      {
-        oos.close();
-      }
-      if (fout != null)
-      {
-        fout.close();
-      }
-    }
-  }
-
-  /**
-   * Reconstruct the Query object from its file serialization
-   */
-  public static Query readFromFile(String filename) throws IOException
-  {
-
-    return readFromFile(new File(filename));
-  }
-
-  /**
-   * Reconstruct the Query object from its file serialization
-   */
-  public static Query readFromFile(File file) throws IOException
-  {
-    Query query = null;
-
-    FileInputStream fIn = null;
-    ObjectInputStream oIn;
-    try
-    {
-      fIn = new FileInputStream(file);
-      oIn = new ObjectInputStream(fIn);
-      query = (Query) oIn.readObject();
-    } catch (IOException | ClassNotFoundException e)
-    {
-      e.printStackTrace();
-    } finally
-    {
-      if (fIn != null)
-      {
-        try
-        {
-          fIn.close();
-        } catch (IOException e)
-        {
-          e.printStackTrace();
-        }
-      }
-    }
-    return query;
-  }
-
-  /**
-   * Method to write the Query object to a file in HDFS
-   *
-   */
-  public void writeToHDFSFile(Path fileName, FileSystem fs)
-  {
-
-    ObjectOutputStream oos = null;
-    try
-    {
-      oos = new ObjectOutputStream(fs.create(fileName));
-      oos.writeObject(this);
-      oos.close();
-    } catch (IOException e)
-    {
-      e.printStackTrace();
-    } finally
-    {
-      if (oos != null)
-      {
-        try
-        {
-          oos.close();
-        } catch (IOException e)
-        {
-          e.printStackTrace();
-        }
-      }
-    }
-  }
-
-  /**
-   * Method to reconstruct the Query object from its file serialization in HDFS
-   */
-  public static Query readFromHDFSFile(Path filename, FileSystem fs)
-  {
-    Query pirWLQuery = null;
-
-    ObjectInputStream ois = null;
-    try
-    {
-      ois = new ObjectInputStream(fs.open(filename));
-      pirWLQuery = (Query) ois.readObject();
-      ois.close();
-
-    } catch (IOException | ClassNotFoundException e1)
-    {
-      e1.printStackTrace();
-    } finally
-    {
-      if (ois != null)
-      {
-        try
-        {
-          ois.close();
-        } catch (IOException e)
-        {
-          e.printStackTrace();
-        }
-      }
-    }
-
-    return pirWLQuery;
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java b/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java
index 4cd6b5f..61dbb23 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java
@@ -25,6 +25,7 @@ import org.apache.pirk.query.wideskies.Query;
 import org.apache.pirk.responder.wideskies.mapreduce.ComputeResponseTool;
 import org.apache.pirk.responder.wideskies.spark.ComputeResponse;
 import org.apache.pirk.responder.wideskies.standalone.Responder;
+import org.apache.pirk.serialization.LocalFileSystemStore;
 import org.apache.pirk.utils.SystemConfiguration;
 
 /**
@@ -65,7 +66,7 @@ public class ResponderDriver
       System.out.println("Launching Standalone Responder:");
 
       String queryInput = SystemConfiguration.getProperty("pir.queryInput");
-      Query query = Query.readFromFile(queryInput);
+      Query query = new LocalFileSystemStore().recall(queryInput, Query.class);
 
       Responder pirResponder = new Responder(query);
       pirResponder.computeStandaloneResponse();

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ColumnMultReducer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ColumnMultReducer.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ColumnMultReducer.java
index abffadf..df3b7d0 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ColumnMultReducer.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ColumnMultReducer.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
 import org.apache.pirk.query.wideskies.Query;
+import org.apache.pirk.serialization.HadoopFileSystemStore;
 import org.apache.pirk.utils.FileConst;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -55,7 +56,7 @@ public class ColumnMultReducer extends Reducer<LongWritable,Text,LongWritable,Te
 
     FileSystem fs = FileSystem.newInstance(ctx.getConfiguration());
     String queryDir = ctx.getConfiguration().get("pirMR.queryInputDir");
-    query = Query.readFromHDFSFile(new Path(queryDir), fs);
+    query = new HadoopFileSystemStore(fs).recall(queryDir, Query.class);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java
index fb3027b..6eab9fe 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java
@@ -49,6 +49,7 @@ import org.apache.pirk.query.wideskies.QueryInfo;
 import org.apache.pirk.schema.data.LoadDataSchemas;
 import org.apache.pirk.schema.query.LoadQuerySchemas;
 import org.apache.pirk.schema.query.QuerySchema;
+import org.apache.pirk.serialization.HadoopFileSystemStore;
 import org.apache.pirk.utils.FileConst;
 import org.apache.pirk.utils.HDFS;
 import org.apache.pirk.utils.SystemConfiguration;
@@ -127,7 +128,7 @@ public class ComputeResponseTool extends Configured implements Tool
     LoadDataSchemas.initialize(true, fs);
     LoadQuerySchemas.initialize(true, fs);
 
-    query = Query.readFromHDFSFile(new Path(queryInputDir), fs);
+    query = new HadoopFileSystemStore(fs).recall(queryInputDir, Query.class);
     queryInfo = query.getQueryInfo();
     qSchema = LoadQuerySchemas.getSchema(queryInfo.getQueryType());
 
@@ -331,7 +332,7 @@ public class ComputeResponseTool extends Configured implements Tool
 
     // Place exp table in query object
     query.setExpFileBasedLookup(expFileTable);
-    query.writeToHDFSFile(new Path(queryInputDir), fs);
+    new HadoopFileSystemStore(fs).store(queryInputDir, query);
 
     logger.info("Completed creation of expTable");
 

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ExpTableMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ExpTableMapper.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ExpTableMapper.java
index 28d49a3..c53acdc 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ExpTableMapper.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ExpTableMapper.java
@@ -22,12 +22,12 @@ import java.io.IOException;
 import java.math.BigInteger;
 
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.pirk.encryption.ModPowAbstraction;
 import org.apache.pirk.query.wideskies.Query;
+import org.apache.pirk.serialization.HadoopFileSystemStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,9 +52,8 @@ public class ExpTableMapper extends Mapper<LongWritable,Text,Text,Text>
 
     valueOut = new Text();
 
-    FileSystem fs = FileSystem.newInstance(ctx.getConfiguration());
     String queryDir = ctx.getConfiguration().get("pirMR.queryInputDir");
-    query = Query.readFromHDFSFile(new Path(queryDir), fs);
+    query = new HadoopFileSystemStore(FileSystem.newInstance(ctx.getConfiguration())).recall(queryDir, Query.class);
 
     int dataPartitionBitSize = query.getQueryInfo().getDataPartitionBitSize();
     maxValue = (int) Math.pow(2, dataPartitionBitSize) - 1;

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/FinalResponseReducer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/FinalResponseReducer.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/FinalResponseReducer.java
index 1df7b0e..8f7cbe8 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/FinalResponseReducer.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/FinalResponseReducer.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.math.BigInteger;
 
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Reducer;
@@ -30,6 +29,7 @@ import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
 import org.apache.pirk.query.wideskies.Query;
 import org.apache.pirk.query.wideskies.QueryInfo;
 import org.apache.pirk.response.wideskies.Response;
+import org.apache.pirk.serialization.HadoopFileSystemStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,6 +46,8 @@ public class FinalResponseReducer extends Reducer<LongWritable,Text,LongWritable
   private Response response = null;
   private String outputFile = null;
   private FileSystem fs = null;
+  private HadoopFileSystemStore storage = null;
+  private QueryInfo queryInfo = null;
 
   @Override
   public void setup(Context ctx) throws IOException, InterruptedException
@@ -56,8 +58,9 @@ public class FinalResponseReducer extends Reducer<LongWritable,Text,LongWritable
     mos = new MultipleOutputs<>(ctx);
 
     fs = FileSystem.newInstance(ctx.getConfiguration());
+    storage = new HadoopFileSystemStore(fs);
     String queryDir = ctx.getConfiguration().get("pirMR.queryInputDir");
-    Query query = Query.readFromHDFSFile(new Path(queryDir), fs);
+    Query query = storage.recall(queryDir, Query.class);
     QueryInfo queryInfo = query.getQueryInfo();
 
     outputFile = ctx.getConfiguration().get("pirMR.outputFile");
@@ -83,7 +86,7 @@ public class FinalResponseReducer extends Reducer<LongWritable,Text,LongWritable
   @Override
   public void cleanup(Context ctx) throws IOException, InterruptedException
   {
-    response.writeToHDFSFile(new Path(outputFile), fs);
+    storage.store(outputFile, response);
     mos.close();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/HashSelectorsAndPartitionDataMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/HashSelectorsAndPartitionDataMapper.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/HashSelectorsAndPartitionDataMapper.java
index 95396a9..b04babd 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/HashSelectorsAndPartitionDataMapper.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/HashSelectorsAndPartitionDataMapper.java
@@ -36,6 +36,7 @@ import org.apache.pirk.schema.data.LoadDataSchemas;
 import org.apache.pirk.schema.query.LoadQuerySchemas;
 import org.apache.pirk.schema.query.QuerySchema;
 import org.apache.pirk.schema.query.filter.DataFilter;
+import org.apache.pirk.serialization.HadoopFileSystemStore;
 import org.apache.pirk.utils.StringUtils;
 import org.apache.pirk.utils.SystemConfiguration;
 import org.slf4j.Logger;
@@ -57,6 +58,7 @@ public class HashSelectorsAndPartitionDataMapper extends Mapper<Text,MapWritable
 
   HashSet<String> stopList = null;
 
+  private Query query = null;
   private QueryInfo queryInfo = null;
   private QuerySchema qSchema = null;
   private DataSchema dSchema = null;
@@ -75,7 +77,7 @@ public class HashSelectorsAndPartitionDataMapper extends Mapper<Text,MapWritable
 
     // Can make this so that it reads multiple queries at one time...
     String queryDir = ctx.getConfiguration().get("pirMR.queryInputDir");
-    Query query = Query.readFromHDFSFile(new Path(queryDir), fs);
+    query = new HadoopFileSystemStore(fs).recall(queryDir, Query.class);
     queryInfo = query.getQueryInfo();
 
     try

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/RowCalcReducer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/RowCalcReducer.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/RowCalcReducer.java
index e35ee84..ea57d2d 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/RowCalcReducer.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/RowCalcReducer.java
@@ -37,6 +37,7 @@ import org.apache.pirk.schema.data.DataSchema;
 import org.apache.pirk.schema.data.LoadDataSchemas;
 import org.apache.pirk.schema.query.LoadQuerySchemas;
 import org.apache.pirk.schema.query.QuerySchema;
+import org.apache.pirk.serialization.HadoopFileSystemStore;
 import org.apache.pirk.utils.FileConst;
 import org.apache.pirk.utils.SystemConfiguration;
 import org.slf4j.Logger;
@@ -79,7 +80,7 @@ public class RowCalcReducer extends Reducer<IntWritable,BytesArrayWritable,LongW
 
     fs = FileSystem.newInstance(ctx.getConfiguration());
     String queryDir = ctx.getConfiguration().get("pirMR.queryInputDir");
-    query = Query.readFromHDFSFile(new Path(queryDir), fs);
+    query = new HadoopFileSystemStore(fs).recall(queryDir, Query.class);
     queryInfo = query.getQueryInfo();
 
     try

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeExpLookupTable.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeExpLookupTable.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeExpLookupTable.java
index 938c32e..2feeca8 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeExpLookupTable.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeExpLookupTable.java
@@ -28,12 +28,14 @@ import java.util.TreeMap;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.pirk.query.wideskies.Query;
+import org.apache.pirk.serialization.HadoopFileSystemStore;
 import org.apache.pirk.utils.SystemConfiguration;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import scala.Tuple2;
 
 /**
@@ -100,7 +102,13 @@ public class ComputeExpLookupTable
       // Place exp table in query object and in the BroadcastVars
       Map<Integer,String> queryHashFileNameMap = hashToPartition.collectAsMap();
       query.setExpFileBasedLookup(new HashMap<>(queryHashFileNameMap));
-      query.writeToHDFSFile(new Path(queryInputFile), fs);
+      try
+      {
+        new HadoopFileSystemStore(fs).store(queryInputFile, query);
+      } catch (IOException e)
+      {
+        e.printStackTrace();
+      }
       bVars.setQuery(query);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java
index ba7fd12..c6b0d28 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java
@@ -18,12 +18,12 @@
  */
 package org.apache.pirk.responder.wideskies.spark;
 
+import java.io.IOException;
 import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.Map;
 
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
@@ -36,6 +36,7 @@ import org.apache.pirk.response.wideskies.Response;
 import org.apache.pirk.schema.data.LoadDataSchemas;
 import org.apache.pirk.schema.query.LoadQuerySchemas;
 import org.apache.pirk.schema.query.QuerySchema;
+import org.apache.pirk.serialization.HadoopFileSystemStore;
 import org.apache.pirk.utils.PIRException;
 import org.apache.pirk.utils.SystemConfiguration;
 import org.apache.spark.SparkConf;
@@ -45,6 +46,7 @@ import org.apache.spark.api.java.JavaSparkContext;
 import org.elasticsearch.hadoop.mr.EsInputFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import scala.Tuple2;
 
 /**
@@ -77,6 +79,7 @@ public class ComputeResponse
   private boolean useModExpJoin = false;
 
   private FileSystem fs = null;
+  private HadoopFileSystemStore storage = null;
   private JavaSparkContext sc = null;
 
   private Accumulators accum = null;
@@ -93,6 +96,7 @@ public class ComputeResponse
   public ComputeResponse(FileSystem fileSys) throws Exception
   {
     fs = fileSys;
+    storage = new HadoopFileSystemStore(fs);
 
     dataInputFormat = SystemConfiguration.getProperty("pir.dataInputFormat");
     if (!InputFormatConst.ALLOWED_FORMATS.contains(dataInputFormat))
@@ -162,7 +166,7 @@ public class ComputeResponse
     bVars = new BroadcastVars(sc);
 
     // Set the Query and QueryInfo broadcast variables
-    query = Query.readFromHDFSFile(new Path(queryInput), fs);
+    query = storage.recall(queryInput, Query.class);
     queryInfo = query.getQueryInfo();
     bVars.setQuery(query);
     bVars.setQueryInfo(queryInfo);
@@ -366,7 +370,13 @@ public class ComputeResponse
       logger.debug("colNum = " + colVal + " column = " + encColResults.get(colVal).toString());
     }
 
-    response.writeToHDFSFile(new Path(outputFile), fs);
+    try
+    {
+      storage.store(outputFile, response);
+    } catch (IOException e)
+    {
+      throw new RuntimeException(e);
+    }
     accum.printAll();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/responder/wideskies/standalone/Responder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/standalone/Responder.java b/src/main/java/org/apache/pirk/responder/wideskies/standalone/Responder.java
index 879b618..4ac3923 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/standalone/Responder.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/standalone/Responder.java
@@ -30,6 +30,7 @@ import org.apache.pirk.query.wideskies.Query;
 import org.apache.pirk.query.wideskies.QueryInfo;
 import org.apache.pirk.query.wideskies.QueryUtils;
 import org.apache.pirk.response.wideskies.Response;
+import org.apache.pirk.serialization.LocalFileSystemStore;
 import org.apache.pirk.utils.KeyedHash;
 import org.apache.pirk.utils.SystemConfiguration;
 import org.json.simple.JSONObject;
@@ -126,7 +127,7 @@ public class Responder
     // Set the response object, extract, write to file
     String outputFile = SystemConfiguration.getProperty("pir.outputFile");
     setResponseElements();
-    response.writeToFile(outputFile);
+    new LocalFileSystemStore().store(outputFile, response);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/response/wideskies/Response.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/response/wideskies/Response.java b/src/main/java/org/apache/pirk/response/wideskies/Response.java
index 3d2a3c0..667b8a3 100644
--- a/src/main/java/org/apache/pirk/response/wideskies/Response.java
+++ b/src/main/java/org/apache/pirk/response/wideskies/Response.java
@@ -18,19 +18,12 @@
  */
 package org.apache.pirk.response.wideskies;
 
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 import java.io.Serializable;
 import java.math.BigInteger;
 import java.util.TreeMap;
 
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.pirk.query.wideskies.QueryInfo;
+import org.apache.pirk.serialization.Storable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,7 +33,7 @@ import org.slf4j.LoggerFactory;
  * Serialized and returned to the querier for decryption
  * 
  */
-public class Response implements Serializable
+public class Response implements Serializable, Storable
 {
   private static final long serialVersionUID = 1L;
 
@@ -76,127 +69,4 @@ public class Response implements Serializable
   {
     responseElements.put(position, element);
   }
-
-  public void writeToFile(String filename) throws IOException
-  {
-    writeToFile(new File(filename));
-  }
-
-  public void writeToFile(File file) throws IOException
-  {
-    ObjectOutputStream oos = null;
-    FileOutputStream fout = null;
-    try
-    {
-      fout = new FileOutputStream(file, true);
-      oos = new ObjectOutputStream(fout);
-      oos.writeObject(this);
-    } catch (Exception ex)
-    {
-      ex.printStackTrace();
-    } finally
-    {
-      if (oos != null)
-      {
-        oos.close();
-      }
-      if (fout != null)
-      {
-        fout.close();
-      }
-    }
-  }
-
-  public void writeToHDFSFile(Path fileName, FileSystem fs)
-  {
-
-    ObjectOutputStream oos = null;
-    try
-    {
-      oos = new ObjectOutputStream(fs.create(fileName));
-      oos.writeObject(this);
-      oos.close();
-    } catch (IOException e)
-    {
-      e.printStackTrace();
-    } finally
-    {
-      if (oos != null)
-      {
-        try
-        {
-          oos.close();
-        } catch (IOException e)
-        {
-          e.printStackTrace();
-        }
-      }
-    }
-  }
-
-  public static Response readFromFile(String filename) throws IOException
-  {
-    return readFromFile(new File(filename));
-  }
-
-  public static Response readFromFile(File file) throws IOException
-  {
-    Response response = null;
-
-    ObjectInputStream objectinputstream = null;
-    FileInputStream streamIn = null;
-    try
-    {
-      streamIn = new FileInputStream(file);
-      objectinputstream = new ObjectInputStream(streamIn);
-      response = (Response) objectinputstream.readObject();
-
-    } catch (Exception e)
-    {
-      e.printStackTrace();
-    } finally
-    {
-      if (objectinputstream != null)
-      {
-        objectinputstream.close();
-      }
-      if (streamIn != null)
-      {
-        streamIn.close();
-      }
-    }
-
-    return response;
-  }
-
-  // Used for testing
-  public static Response readFromHDFSFile(Path file, FileSystem fs) throws IOException
-  {
-    Response response = null;
-
-    ObjectInputStream ois = null;
-    try
-    {
-      ois = new ObjectInputStream(fs.open(file));
-      response = (Response) ois.readObject();
-      ois.close();
-
-    } catch (IOException | ClassNotFoundException e1)
-    {
-      e1.printStackTrace();
-    } finally
-    {
-      if (ois != null)
-      {
-        try
-        {
-          ois.close();
-        } catch (IOException e)
-        {
-          e.printStackTrace();
-        }
-      }
-    }
-    return response;
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/serialization/HadoopFileSystemStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/serialization/HadoopFileSystemStore.java b/src/main/java/org/apache/pirk/serialization/HadoopFileSystemStore.java
new file mode 100644
index 0000000..7e1e475
--- /dev/null
+++ b/src/main/java/org/apache/pirk/serialization/HadoopFileSystemStore.java
@@ -0,0 +1,94 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *******************************************************************************/
+package org.apache.pirk.serialization;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class HadoopFileSystemStore extends StorageService
+{
+
+  private FileSystem hadoopFileSystem;
+
+  // Prevents others from using default constructor.
+  HadoopFileSystemStore()
+  {
+    super();
+  }
+
+  /**
+   * Creates a new storage service on the given HDFS file system using default Java serialization.
+   */
+  public HadoopFileSystemStore(FileSystem fs)
+  {
+    super();
+    hadoopFileSystem = fs;
+  }
+
+  public HadoopFileSystemStore(FileSystem fs, SerializationService serial)
+  {
+    super(serial);
+    hadoopFileSystem = fs;
+  }
+
+  public void store(String pathName, Storable value) throws IOException
+  {
+    store(new Path(pathName), value);
+  }
+
+  public void store(Path path, Storable obj) throws IOException
+  {
+    OutputStream os = hadoopFileSystem.create(path);
+    try
+    {
+      serializer.write(os, obj);
+    } finally
+    {
+      if (os != null)
+      {
+        os.close();
+      }
+    }
+  }
+
+  public <T> T recall(String pathName, Class<T> type) throws IOException
+  {
+    return recall(new Path(pathName), type);
+  }
+
+  public <T> T recall(Path path, Class<T> type) throws IOException
+  {
+    InputStream is = hadoopFileSystem.open(path);
+    try
+    {
+      return serializer.read(is, type);
+    } finally
+    {
+      if (is != null)
+      {
+        is.close();
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/serialization/JavaSerializer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/serialization/JavaSerializer.java b/src/main/java/org/apache/pirk/serialization/JavaSerializer.java
new file mode 100644
index 0000000..4228c19
--- /dev/null
+++ b/src/main/java/org/apache/pirk/serialization/JavaSerializer.java
@@ -0,0 +1,49 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *******************************************************************************/
+package org.apache.pirk.serialization;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+
+public class JavaSerializer extends SerializationService
+{
+
+  public void write(OutputStream stream, Storable obj) throws IOException
+  {
+    ObjectOutputStream oos = new ObjectOutputStream(stream);
+    oos.writeObject(obj);
+  }
+
+  @SuppressWarnings("unchecked")
+  public <T> T read(InputStream stream, Class<T> type) throws IOException
+  {
+    ObjectInputStream oin = new ObjectInputStream(stream);
+    try
+    {
+      return (T) oin.readObject();
+    } catch (ClassNotFoundException e)
+    {
+      throw new RuntimeException(e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/serialization/JsonSerializer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/serialization/JsonSerializer.java b/src/main/java/org/apache/pirk/serialization/JsonSerializer.java
new file mode 100644
index 0000000..c33366d
--- /dev/null
+++ b/src/main/java/org/apache/pirk/serialization/JsonSerializer.java
@@ -0,0 +1,41 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *******************************************************************************/
+package org.apache.pirk.serialization;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+//TODO: Waiting for Jackson adoption
+public class JsonSerializer extends SerializationService
+{
+
+  @Override
+  public void write(OutputStream w, Storable obj) throws IOException
+  {
+    throw new RuntimeException("Not yet implemented");
+  }
+
+  @Override
+  public <T> T read(InputStream stream, Class<T> type) throws IOException
+  {
+    throw new RuntimeException("Not yet implemented");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/serialization/LocalFileSystemStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/serialization/LocalFileSystemStore.java b/src/main/java/org/apache/pirk/serialization/LocalFileSystemStore.java
new file mode 100644
index 0000000..50d11c3
--- /dev/null
+++ b/src/main/java/org/apache/pirk/serialization/LocalFileSystemStore.java
@@ -0,0 +1,82 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *******************************************************************************/
+package org.apache.pirk.serialization;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+public class LocalFileSystemStore extends StorageService
+{
+
+  /**
+   * Creates a new storage service on the local file system using default Java serialization.
+   */
+  public LocalFileSystemStore()
+  {
+    super();
+  }
+
+  public LocalFileSystemStore(SerializationService serial)
+  {
+    super(serial);
+  }
+
+  public void store(String path, Storable obj) throws IOException
+  {
+    store(new File(path), obj);
+  }
+
+  public void store(File file, Storable obj) throws IOException
+  {
+    FileOutputStream fos = new FileOutputStream(file);
+    try
+    {
+      serializer.write(fos, obj);
+    } finally
+    {
+      if (fos != null)
+      {
+        fos.close();
+      }
+    }
+  }
+
+  public <T> T recall(String path, Class<T> type) throws IOException
+  {
+    return recall(new File(path), type);
+  }
+
+  public <T> T recall(File file, Class<T> type) throws IOException
+  {
+    FileInputStream fis = new FileInputStream(file);
+    try
+    {
+      return serializer.read(fis, type);
+    } finally
+    {
+      if (fis != null)
+      {
+        fis.close();
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/serialization/SerializationService.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/serialization/SerializationService.java b/src/main/java/org/apache/pirk/serialization/SerializationService.java
new file mode 100644
index 0000000..2764fc8
--- /dev/null
+++ b/src/main/java/org/apache/pirk/serialization/SerializationService.java
@@ -0,0 +1,49 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *******************************************************************************/
+package org.apache.pirk.serialization;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/*
+ * Ability to read and write objects to/from a stream.
+ */
+public abstract class SerializationService
+{
+  public abstract <T> T read(InputStream stream, Class<T> type) throws IOException;
+
+  public abstract void write(OutputStream w, Storable obj) throws IOException;
+
+  public byte[] toBytes(Storable obj)
+  {
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+
+    try
+    {
+      write(bos, obj);
+    } catch (IOException e)
+    {
+      throw new RuntimeException(e);
+    }
+
+    return bos.toByteArray();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/serialization/Storable.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/serialization/Storable.java b/src/main/java/org/apache/pirk/serialization/Storable.java
new file mode 100644
index 0000000..d9e2fb3
--- /dev/null
+++ b/src/main/java/org/apache/pirk/serialization/Storable.java
@@ -0,0 +1,25 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *******************************************************************************/
+package org.apache.pirk.serialization;
+
+// Marker interface
+public interface Storable
+{
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/serialization/StorageService.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/serialization/StorageService.java b/src/main/java/org/apache/pirk/serialization/StorageService.java
new file mode 100644
index 0000000..775a313
--- /dev/null
+++ b/src/main/java/org/apache/pirk/serialization/StorageService.java
@@ -0,0 +1,39 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *******************************************************************************/
+package org.apache.pirk.serialization;
+
+abstract class StorageService
+{
+  SerializationService serializer;
+
+  StorageService()
+  {
+    this.setSerializer(new JavaSerializer());
+  }
+
+  StorageService(SerializationService service)
+  {
+    this.setSerializer(service);
+  }
+
+  public void setSerializer(SerializationService service)
+  {
+    serializer = service;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/test/distributed/testsuite/DistTestSuite.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/test/distributed/testsuite/DistTestSuite.java b/src/main/java/org/apache/pirk/test/distributed/testsuite/DistTestSuite.java
index 37cb43c..020d464 100644
--- a/src/main/java/org/apache/pirk/test/distributed/testsuite/DistTestSuite.java
+++ b/src/main/java/org/apache/pirk/test/distributed/testsuite/DistTestSuite.java
@@ -36,6 +36,7 @@ import org.apache.pirk.responder.wideskies.ResponderCLI;
 import org.apache.pirk.responder.wideskies.mapreduce.ComputeResponseTool;
 import org.apache.pirk.response.wideskies.Response;
 import org.apache.pirk.schema.response.QueryResponseJSON;
+import org.apache.pirk.serialization.HadoopFileSystemStore;
 import org.apache.pirk.test.distributed.DistributedTestDriver;
 import org.apache.pirk.test.utils.BaseTests;
 import org.apache.pirk.test.utils.Inputs;
@@ -342,7 +343,7 @@ public class DistTestSuite
 
     // Write the Querier object to a file
     Path queryInputDirPath = new Path(queryInputDir);
-    query.writeToHDFSFile(queryInputDirPath, fs);
+    new HadoopFileSystemStore(fs).store(queryInputDirPath, query);
     fs.deleteOnExit(queryInputDirPath);
 
     // Grab the original data and query schema properties to reset upon completion
@@ -413,7 +414,7 @@ public class DistTestSuite
     // Perform decryption
     // Reconstruct the necessary objects from the files
     logger.info("Performing decryption; writing final results file");
-    Response response = Response.readFromHDFSFile(new Path(outputFile), fs);
+    Response response = new HadoopFileSystemStore(fs).recall(outputFile, Response.class);
 
     // Perform decryption and output the result file
     DecryptResponse decryptResponse = new DecryptResponse(response, querier);

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/test/utils/StandaloneQuery.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/test/utils/StandaloneQuery.java b/src/main/java/org/apache/pirk/test/utils/StandaloneQuery.java
index aeda7dc..c33971e 100644
--- a/src/main/java/org/apache/pirk/test/utils/StandaloneQuery.java
+++ b/src/main/java/org/apache/pirk/test/utils/StandaloneQuery.java
@@ -34,6 +34,7 @@ import org.apache.pirk.query.wideskies.QueryUtils;
 import org.apache.pirk.responder.wideskies.standalone.Responder;
 import org.apache.pirk.response.wideskies.Response;
 import org.apache.pirk.schema.response.QueryResponseJSON;
+import org.apache.pirk.serialization.LocalFileSystemStore;
 import org.apache.pirk.utils.PIRException;
 import org.apache.pirk.utils.SystemConfiguration;
 import org.json.simple.JSONObject;
@@ -60,6 +61,7 @@ public class StandaloneQuery
     ArrayList<QueryResponseJSON> results = null;
 
     // Create the necessary files
+    LocalFileSystemStore storage = new LocalFileSystemStore();
     String querySideOuputFilePrefix = "querySideOut";
     File fileQuerier = File.createTempFile(querySideOuputFilePrefix + "-" + QuerierConst.QUERIER_FILETAG, ".txt");
     File fileQuery = File.createTempFile(querySideOuputFilePrefix + "-" + QuerierConst.QUERY_FILETAG, ".txt");
@@ -98,11 +100,12 @@ public class StandaloneQuery
     }
 
     // Write necessary output files
-    encryptQuery.writeOutputFiles(fileQuerier, fileQuery);
+    storage.store(fileQuerier, encryptQuery.getQuerier());
+    storage.store(fileQuery, encryptQuery.getQuery());
 
     // Perform the PIR query and build the response elements
     logger.info("Performing the PIR Query and constructing the response elements:");
-    Query query = Query.readFromFile(fileQuery);
+    Query query = storage.recall(fileQuery, Query.class);
     Responder pirResponder = new Responder(query);
     logger.info("Query and Responder elements constructed");
     for (JSONObject jsonData : dataElements)
@@ -123,14 +126,14 @@ public class StandaloneQuery
     logger.info("Forming response from response elements; writing to a file");
     pirResponder.setResponseElements();
     Response responseOut = pirResponder.getResponse();
-    responseOut.writeToFile(fileResponse);
+    storage.store(fileResponse, responseOut);
     logger.info("Completed forming response from response elements and writing to a file");
 
     // Perform decryption
     // Reconstruct the necessary objects from the files
     logger.info("Performing decryption; writing final results file");
-    Response responseIn = Response.readFromFile(fileResponse);
-    Querier querier = Querier.readFromFile(fileQuerier);
+    Response responseIn = storage.recall(fileResponse, Response.class);
+    Querier querier = storage.recall(fileQuerier, Querier.class);
 
     // Perform decryption and output the result file
     DecryptResponse decryptResponse = new DecryptResponse(responseIn, querier);


Mime
View raw message