Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 47F6C200B4D for ; Sat, 23 Jul 2016 17:29:31 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 464A2160A77; Sat, 23 Jul 2016 15:29:31 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C965F160A68 for ; Sat, 23 Jul 2016 17:29:28 +0200 (CEST) Received: (qmail 98691 invoked by uid 500); 23 Jul 2016 15:29:28 -0000 Mailing-List: contact commits-help@pirk.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@pirk.incubator.apache.org Delivered-To: mailing list commits@pirk.incubator.apache.org Received: (qmail 98682 invoked by uid 99); 23 Jul 2016 15:29:27 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 23 Jul 2016 15:29:27 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 6C2FA18125C for ; Sat, 23 Jul 2016 15:29:27 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.646 X-Spam-Level: X-Spam-Status: No, score=-4.646 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id 2bHcsYtW0zLB for ; Sat, 23 Jul 2016 15:29:17 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id A8B6560D25 for ; Sat, 23 Jul 2016 15:29:14 +0000 (UTC) Received: (qmail 98352 invoked by uid 99); 23 Jul 2016 15:29:13 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 23 Jul 2016 15:29:13 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 96460E7E18; Sat, 23 Jul 2016 15:29:13 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: eawilliams@apache.org To: commits@pirk.incubator.apache.org Date: Sat, 23 Jul 2016 15:29:15 -0000 Message-Id: In-Reply-To: <047cc6ab63f443a297a4bd1af9987e5c@git.apache.org> References: <047cc6ab63f443a297a4bd1af9987e5c@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/4] incubator-pirk git commit: PIRK-11 Switch to SLF4J with Log4j2 bindings, including other minor cleanup - closes apache/incubator-pirk#20 archived-at: Sat, 23 Jul 2016 15:29:31 -0000 http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/responder/wideskies/common/HashSelectorAndPartitionData.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/common/HashSelectorAndPartitionData.java b/src/main/java/org/apache/pirk/responder/wideskies/common/HashSelectorAndPartitionData.java index 2f1e069..7eb264a 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/common/HashSelectorAndPartitionData.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/common/HashSelectorAndPartitionData.java @@ -1,4 +1,4 @@ -/******************************************************************************* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,23 +15,22 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - *******************************************************************************/ + */ package org.apache.pirk.responder.wideskies.common; import java.math.BigInteger; import java.util.ArrayList; import org.apache.hadoop.io.MapWritable; -import org.apache.log4j.Logger; import org.apache.pirk.inputformat.hadoop.BytesArrayWritable; import org.apache.pirk.query.wideskies.QueryInfo; import org.apache.pirk.query.wideskies.QueryUtils; import org.apache.pirk.schema.data.DataSchema; import org.apache.pirk.schema.query.QuerySchema; import org.apache.pirk.utils.KeyedHash; -import org.apache.pirk.utils.LogUtils; import org.json.simple.JSONObject; - +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.Tuple2; /** @@ -40,12 +39,12 @@ import scala.Tuple2; */ public class HashSelectorAndPartitionData { - private static Logger logger = LogUtils.getLoggerForThisClass(); + private static final Logger logger = LoggerFactory.getLogger(HashSelectorAndPartitionData.class); public static Tuple2> hashSelectorAndFormPartitionsBigInteger(MapWritable dataElement, QuerySchema qSchema, DataSchema dSchema, QueryInfo queryInfo) throws Exception { - Tuple2> returnTuple = null; + Tuple2> returnTuple; // Pull the selector based on the query type String selector = QueryUtils.getSelectorByQueryType(dataElement, qSchema, dSchema); @@ -56,7 +55,7 @@ public class HashSelectorAndPartitionData // Partition by the given partitionSize ArrayList hitValPartitions = QueryUtils.partitionDataElement(dataElement, qSchema, dSchema, queryInfo.getEmbedSelector()); - returnTuple = new Tuple2>(hash, hitValPartitions); + returnTuple = new Tuple2<>(hash, hitValPartitions); return returnTuple; } @@ -64,7 +63,7 @@ public class HashSelectorAndPartitionData public static Tuple2 hashSelectorAndFormPartitions(MapWritable dataElement, QuerySchema qSchema, DataSchema dSchema, QueryInfo queryInfo) throws Exception { - Tuple2 returnTuple = null; + Tuple2 returnTuple; // Pull the selector based on the query type String selector = QueryUtils.getSelectorByQueryType(dataElement, qSchema, dSchema); @@ -76,14 +75,14 @@ public class HashSelectorAndPartitionData ArrayList hitValPartitions = QueryUtils.partitionDataElement(dataElement, qSchema, dSchema, queryInfo.getEmbedSelector()); BytesArrayWritable bAW = new BytesArrayWritable(hitValPartitions); - returnTuple = new Tuple2(hash, bAW); + returnTuple = new Tuple2<>(hash, bAW); return returnTuple; } public static Tuple2> hashSelectorAndFormPartitions(JSONObject json, QueryInfo queryInfo) throws Exception { - Tuple2> returnTuple = null; + Tuple2> returnTuple; // Pull the selector based on the query type String selector = QueryUtils.getSelectorByQueryTypeJSON(queryInfo.getQueryType(), json); @@ -94,7 +93,7 @@ public class HashSelectorAndPartitionData // Partition by the given partitionSize ArrayList hitValPartitions = QueryUtils.partitionDataElement(queryInfo.getQueryType(), json, queryInfo.getEmbedSelector()); - returnTuple = new Tuple2>(hash, hitValPartitions); + returnTuple = new Tuple2<>(hash, hitValPartitions); return returnTuple; } http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ColumnMultMapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ColumnMultMapper.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ColumnMultMapper.java index 5aa3ffe..92993ff 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ColumnMultMapper.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ColumnMultMapper.java @@ -1,4 +1,4 @@ -/******************************************************************************* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,7 +15,7 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - *******************************************************************************/ + */ package org.apache.pirk.responder.wideskies.mapreduce; import java.io.IOException; @@ -23,9 +23,9 @@ import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; -import org.apache.log4j.Logger; import org.apache.pirk.utils.CSVOutputUtils; -import org.apache.pirk.utils.LogUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Pass through mapper for encrypted column multiplication @@ -33,10 +33,10 @@ import org.apache.pirk.utils.LogUtils; */ public class ColumnMultMapper extends Mapper { - private static Logger logger = LogUtils.getLoggerForThisClass(); + private static final Logger logger = LoggerFactory.getLogger(ColumnMultMapper.class); - LongWritable keyOut = null; - Text valueOut = null; + private LongWritable keyOut = null; + private Text valueOut = null; @Override public void setup(Context ctx) throws IOException, InterruptedException http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ColumnMultReducer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ColumnMultReducer.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ColumnMultReducer.java index cfca83c..abffadf 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ColumnMultReducer.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ColumnMultReducer.java @@ -1,4 +1,4 @@ -/******************************************************************************* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,7 +15,7 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - *******************************************************************************/ + */ package org.apache.pirk.responder.wideskies.mapreduce; import java.io.IOException; @@ -27,10 +27,10 @@ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; -import org.apache.log4j.Logger; import org.apache.pirk.query.wideskies.Query; import org.apache.pirk.utils.FileConst; -import org.apache.pirk.utils.LogUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Reducer to perform encrypted column multiplication @@ -38,12 +38,12 @@ import org.apache.pirk.utils.LogUtils; */ public class ColumnMultReducer extends Reducer { - private static Logger logger = LogUtils.getLoggerForThisClass(); + private static final Logger logger = LoggerFactory.getLogger(ColumnMultReducer.class); - Text outputValue = null; + private Text outputValue = null; private MultipleOutputs mos = null; - Query query = null; + private Query query = null; @Override public void setup(Context ctx) throws IOException, InterruptedException @@ -51,7 +51,7 @@ public class ColumnMultReducer extends Reducer(ctx); + mos = new MultipleOutputs<>(ctx); FileSystem fs = FileSystem.newInstance(ctx.getConfiguration()); String queryDir = ctx.getConfiguration().get("pirMR.queryInputDir"); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java index 827f0b1..fb3027b 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java @@ -1,4 +1,4 @@ -/******************************************************************************* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,7 +15,7 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - *******************************************************************************/ + */ package org.apache.pirk.responder.wideskies.mapreduce; import java.io.BufferedReader; @@ -41,7 +41,6 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; -import org.apache.log4j.Logger; import org.apache.pirk.inputformat.hadoop.BaseInputFormat; import org.apache.pirk.inputformat.hadoop.BytesArrayWritable; import org.apache.pirk.inputformat.hadoop.InputFormatConst; @@ -52,9 +51,10 @@ import org.apache.pirk.schema.query.LoadQuerySchemas; import org.apache.pirk.schema.query.QuerySchema; import org.apache.pirk.utils.FileConst; import org.apache.pirk.utils.HDFS; -import org.apache.pirk.utils.LogUtils; import org.apache.pirk.utils.SystemConfiguration; import org.elasticsearch.hadoop.mr.EsInputFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Tool for computing the PIR response in MapReduce @@ -62,14 +62,14 @@ import org.elasticsearch.hadoop.mr.EsInputFormat; * Each query run consists of three MR jobs: *

* (1) Map: Initialization mapper reads data using an extension of the BaseInputFormat or elasticsearch and, according to the QueryInfo object, extracts the - * selector from each dataElement according to the QueryType, hashes selector, and outputs {@code} + * selector from each dataElement according to the QueryType, hashes selector, and outputs {@link } *

* Reduce: Calculates the encrypted row values for each selector and corresponding data element, striping across columns,and outputs each row entry by column - * position: {@code} + * position: {@link } *

* (2) Map: Pass through mapper to aggregate by column number *

- * Reduce: Input: {@code>}; multiplies all colVals according to the encryption algorithm and outputs {@code} for each colNum + * Reduce: Input: {@link >}; multiplies all colVals according to the encryption algorithm and outputs {@link } for each colNum *

* (3) Map: Pass through mapper to move all final columns to one reducer *

@@ -89,32 +89,32 @@ import org.elasticsearch.hadoop.mr.EsInputFormat; */ public class ComputeResponseTool extends Configured implements Tool { - private static Logger logger = LogUtils.getLoggerForThisClass(); + private static final Logger logger = LoggerFactory.getLogger(ComputeResponseTool.class); - String dataInputFormat = null; - String inputFile = null; - String outputFile = null; - String outputDirExp = null; - String outputDirInit = null; - String outputDirColumnMult = null; - String outputDirFinal = null; - String queryInputDir = null; - String stopListFile = null; - int numReduceTasks = 1; + private String dataInputFormat = null; + private String inputFile = null; + private String outputFile = null; + private String outputDirExp = null; + private String outputDirInit = null; + private String outputDirColumnMult = null; + private String outputDirFinal = null; + private String queryInputDir = null; + private String stopListFile = null; + private int numReduceTasks = 1; - boolean useHDFSLookupTable = false; + private boolean useHDFSLookupTable = false; - String esQuery = "none"; - String esResource = "none"; + private String esQuery = "none"; + private String esResource = "none"; String dataSchema = "none"; - Configuration conf = null; - FileSystem fs = null; + private Configuration conf = null; + private FileSystem fs = null; - Query query = null; - QueryInfo queryInfo = null; - QuerySchema qSchema = null; + private Query query = null; + private QueryInfo queryInfo = null; + private QuerySchema qSchema = null; public ComputeResponseTool() throws Exception { @@ -223,7 +223,7 @@ public class ComputeResponseTool extends Configured implements Tool private boolean computeExpTable() throws IOException, ClassNotFoundException, InterruptedException { - boolean success = false; + boolean success; logger.info("Creating expTable"); @@ -235,7 +235,7 @@ public class ComputeResponseTool extends Configured implements Tool } // Write the query hashes to the split files TreeMap queryElements = query.getQueryElements(); - ArrayList keys = new ArrayList(queryElements.keySet()); + ArrayList keys = new ArrayList<>(queryElements.keySet()); int numSplits = Integer.parseInt(SystemConfiguration.getProperty("pir.expCreationSplits", "100")); int elementsPerSplit = (int) Math.floor(queryElements.size() / numSplits); @@ -302,13 +302,13 @@ public class ComputeResponseTool extends Configured implements Tool // Assemble the exp table from the output // element_index -> fileName - HashMap expFileTable = new HashMap(); + HashMap expFileTable = new HashMap<>(); FileStatus[] status = fs.listStatus(outPathExp); for (FileStatus fstat : status) { - if (fstat.getPath().getName().toString().startsWith(FileConst.PIR)) + if (fstat.getPath().getName().startsWith(FileConst.PIR)) { - logger.info("fstat.getPath().getName().toString() = " + fstat.getPath().getName().toString()); + logger.info("fstat.getPath().getName().toString() = " + fstat.getPath().getName()); try { InputStreamReader isr = new InputStreamReader(fs.open(fstat.getPath())); @@ -340,7 +340,7 @@ public class ComputeResponseTool extends Configured implements Tool private boolean readDataEncRows(Path outPathInit) throws Exception { - boolean success = false; + boolean success; Job job = new Job(conf, "pirMR"); job.setSpeculativeExecution(false); @@ -432,7 +432,7 @@ public class ComputeResponseTool extends Configured implements Tool private boolean multiplyColumns(Path outPathInit, Path outPathColumnMult) throws IOException, ClassNotFoundException, InterruptedException { - boolean success = false; + boolean success; Job columnMultJob = new Job(conf, "pir_columnMult"); columnMultJob.setSpeculativeExecution(false); @@ -460,7 +460,7 @@ public class ComputeResponseTool extends Configured implements Tool FileStatus[] status = fs.listStatus(outPathInit); for (FileStatus fstat : status) { - if (fstat.getPath().getName().toString().startsWith(FileConst.PIR)) + if (fstat.getPath().getName().startsWith(FileConst.PIR)) { logger.info("fstat.getPath() = " + fstat.getPath().toString()); FileInputFormat.addInputPath(columnMultJob, fstat.getPath()); @@ -492,7 +492,7 @@ public class ComputeResponseTool extends Configured implements Tool private boolean computeFinalResponse(Path outPathFinal) throws ClassNotFoundException, IOException, InterruptedException { - boolean success = false; + boolean success; Job finalResponseJob = new Job(conf, "pir_finalResponse"); finalResponseJob.setSpeculativeExecution(false); @@ -522,7 +522,7 @@ public class ComputeResponseTool extends Configured implements Tool FileStatus[] status = fs.listStatus(new Path(outputDirColumnMult)); for (FileStatus fstat : status) { - if (fstat.getPath().getName().toString().startsWith(FileConst.PIR_COLS)) + if (fstat.getPath().getName().startsWith(FileConst.PIR_COLS)) { logger.info("fstat.getPath() = " + fstat.getPath().toString()); FileInputFormat.addInputPath(finalResponseJob, fstat.getPath()); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ExpTableMapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ExpTableMapper.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ExpTableMapper.java index 4b0a3b3..28d49a3 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ExpTableMapper.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ExpTableMapper.java @@ -1,4 +1,4 @@ -/******************************************************************************* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,7 +15,7 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - *******************************************************************************/ + */ package org.apache.pirk.responder.wideskies.mapreduce; import java.io.IOException; @@ -26,10 +26,10 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; -import org.apache.log4j.Logger; import org.apache.pirk.encryption.ModPowAbstraction; import org.apache.pirk.query.wideskies.Query; -import org.apache.pirk.utils.LogUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Class to generate the expTable given the input query vectors @@ -37,15 +37,13 @@ import org.apache.pirk.utils.LogUtils; */ public class ExpTableMapper extends Mapper { - private static Logger logger = LogUtils.getLoggerForThisClass(); + private static final Logger logger = LoggerFactory.getLogger(ExpTableMapper.class); - Text keyOut = null; - Text valueOut = null; + private Text valueOut = null; - int dataPartitionBitSize = 0; - int maxValue = 0; - BigInteger NSquared = null; - Query query = null; + private int maxValue = 0; + private BigInteger NSquared = null; + private Query query = null; @Override public void setup(Context ctx) throws IOException, InterruptedException @@ -58,7 +56,7 @@ public class ExpTableMapper extends Mapper String queryDir = ctx.getConfiguration().get("pirMR.queryInputDir"); query = Query.readFromHDFSFile(new Path(queryDir), fs); - dataPartitionBitSize = query.getQueryInfo().getDataPartitionBitSize(); + int dataPartitionBitSize = query.getQueryInfo().getDataPartitionBitSize(); maxValue = (int) Math.pow(2, dataPartitionBitSize) - 1; NSquared = query.getNSquared(); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ExpTableReducer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ExpTableReducer.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ExpTableReducer.java index 6bbd89b..fabf679 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ExpTableReducer.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ExpTableReducer.java @@ -1,4 +1,4 @@ -/******************************************************************************* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,7 +15,7 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - *******************************************************************************/ + */ package org.apache.pirk.responder.wideskies.mapreduce; import java.io.IOException; @@ -23,9 +23,9 @@ import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; -import org.apache.log4j.Logger; import org.apache.pirk.utils.FileConst; -import org.apache.pirk.utils.LogUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Reducer class to complete the exp lookup table and add to the Query object @@ -33,16 +33,16 @@ import org.apache.pirk.utils.LogUtils; */ public class ExpTableReducer extends Reducer { - private static Logger logger = LogUtils.getLoggerForThisClass(); + private static final Logger logger = LoggerFactory.getLogger(ExpTableReducer.class); private MultipleOutputs mos = null; - String reducerID = null; + private String reducerID = null; @Override public void setup(Context ctx) throws IOException, InterruptedException { super.setup(ctx); - mos = new MultipleOutputs(ctx); + mos = new MultipleOutputs<>(ctx); reducerID = String.format("%05d", ctx.getTaskAttemptID().getTaskID().getId()); logger.info("reducerID = " + reducerID); } http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/FinalResponseReducer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/FinalResponseReducer.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/FinalResponseReducer.java index f9a0881..1df7b0e 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/FinalResponseReducer.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/FinalResponseReducer.java @@ -1,4 +1,4 @@ -/******************************************************************************* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,7 +15,7 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - *******************************************************************************/ + */ package org.apache.pirk.responder.wideskies.mapreduce; import java.io.IOException; @@ -27,11 +27,11 @@ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; -import org.apache.log4j.Logger; import org.apache.pirk.query.wideskies.Query; import org.apache.pirk.query.wideskies.QueryInfo; import org.apache.pirk.response.wideskies.Response; -import org.apache.pirk.utils.LogUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Reducer class to construct the final Response object @@ -39,28 +39,26 @@ import org.apache.pirk.utils.LogUtils; */ public class FinalResponseReducer extends Reducer { - private static Logger logger = LogUtils.getLoggerForThisClass(); + private static final Logger logger = LoggerFactory.getLogger(FinalResponseReducer.class); - Text outputValue = null; private MultipleOutputs mos = null; - Response response = null; - String outputFile = null; - FileSystem fs = null; - QueryInfo queryInfo = null; + private Response response = null; + private String outputFile = null; + private FileSystem fs = null; @Override public void setup(Context ctx) throws IOException, InterruptedException { super.setup(ctx); - outputValue = new Text(); - mos = new MultipleOutputs(ctx); + Text outputValue = new Text(); + mos = new MultipleOutputs<>(ctx); fs = FileSystem.newInstance(ctx.getConfiguration()); String queryDir = ctx.getConfiguration().get("pirMR.queryInputDir"); Query query = Query.readFromHDFSFile(new Path(queryDir), fs); - queryInfo = query.getQueryInfo(); + QueryInfo queryInfo = query.getQueryInfo(); outputFile = ctx.getConfiguration().get("pirMR.outputFile"); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/HashSelectorsAndPartitionDataMapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/HashSelectorsAndPartitionDataMapper.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/HashSelectorsAndPartitionDataMapper.java index c3f672e..95396a9 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/HashSelectorsAndPartitionDataMapper.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/HashSelectorsAndPartitionDataMapper.java @@ -1,4 +1,4 @@ -/******************************************************************************* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,7 +15,7 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - *******************************************************************************/ + */ package org.apache.pirk.responder.wideskies.mapreduce; import java.io.IOException; @@ -27,7 +27,6 @@ import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; -import org.apache.log4j.Logger; import org.apache.pirk.inputformat.hadoop.BytesArrayWritable; import org.apache.pirk.query.wideskies.Query; import org.apache.pirk.query.wideskies.QueryInfo; @@ -37,32 +36,31 @@ import org.apache.pirk.schema.data.LoadDataSchemas; import org.apache.pirk.schema.query.LoadQuerySchemas; import org.apache.pirk.schema.query.QuerySchema; import org.apache.pirk.schema.query.filter.DataFilter; -import org.apache.pirk.utils.LogUtils; import org.apache.pirk.utils.StringUtils; import org.apache.pirk.utils.SystemConfiguration; - +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.Tuple2; /** * Initialization mapper for PIR *

* Reads in data, extracts the selector by queryType from each dataElement, performs a keyed hash of the selector, extracts the partitions of the dataElement, - * and emits {@} + * and emits {@link } * */ public class HashSelectorsAndPartitionDataMapper extends Mapper { - private static Logger logger = LogUtils.getLoggerForThisClass(); + private static final Logger logger = LoggerFactory.getLogger(HashSelectorsAndPartitionDataMapper.class); - IntWritable keyOut = null; + private IntWritable keyOut = null; HashSet stopList = null; - Query query = null; - QueryInfo queryInfo = null; - QuerySchema qSchema = null; - DataSchema dSchema = null; - Object filter = null; + private QueryInfo queryInfo = null; + private QuerySchema qSchema = null; + private DataSchema dSchema = null; + private Object filter = null; @Override public void setup(Context ctx) throws IOException, InterruptedException @@ -77,7 +75,7 @@ public class HashSelectorsAndPartitionDataMapper extends Mapper { - private static Logger logger = LogUtils.getLoggerForThisClass(); + private static final Logger logger = LoggerFactory.getLogger(RowCalcReducer.class); - LongWritable keyOut = null; - Text valueOut = null; + private LongWritable keyOut = null; + private Text valueOut = null; private MultipleOutputs mos = null; - FileSystem fs = null; - Query query = null; - QueryInfo queryInfo = null; - QuerySchema qSchema = null; - DataSchema dSchema = null; + private FileSystem fs = null; + private Query query = null; + private QueryInfo queryInfo = null; - boolean useLocalCache = false; - boolean limitHitsPerSelector = false; - int maxHitsPerSelector = 1000; + private boolean useLocalCache = false; + private boolean limitHitsPerSelector = false; + private int maxHitsPerSelector = 1000; @Override public void setup(Context ctx) throws IOException, InterruptedException @@ -78,7 +75,7 @@ public class RowCalcReducer extends Reducer(ctx); + mos = new MultipleOutputs<>(ctx); fs = FileSystem.newInstance(ctx.getConfiguration()); String queryDir = ctx.getConfiguration().get("pirMR.queryInputDir"); @@ -99,8 +96,8 @@ public class RowCalcReducer extends Reducer numRecordsReceived = null; - Accumulator numRecordsFiltered = null; - Accumulator numRecordsAfterFilter = null; - Accumulator numHashes = null; - Accumulator numColumns = null; + private Accumulator numRecordsReceived = null; + private Accumulator numRecordsFiltered = null; + private Accumulator numRecordsAfterFilter = null; + private Accumulator numHashes = null; + private Accumulator numColumns = null; public Accumulators(JavaSparkContext sc) { http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/responder/wideskies/spark/BroadcastVars.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/BroadcastVars.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/BroadcastVars.java index b4a2bd6..89ce35f 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/spark/BroadcastVars.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/BroadcastVars.java @@ -1,4 +1,4 @@ -/******************************************************************************* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,7 +15,7 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - *******************************************************************************/ + */ package org.apache.pirk.responder.wideskies.spark; import java.io.Serializable; @@ -33,19 +33,19 @@ public class BroadcastVars implements Serializable { private static final long serialVersionUID = 1L; - transient JavaSparkContext jsc = null; + private transient JavaSparkContext jsc = null; Broadcast query = null; - Broadcast queryInfo = null; + private Broadcast queryInfo = null; - Broadcast useLocalCache = null; + private Broadcast useLocalCache = null; - Broadcast limitHitsPerSelector = null; + private Broadcast limitHitsPerSelector = null; - Broadcast maxHitsPerSelector = null; + private Broadcast maxHitsPerSelector = null; - Broadcast expDir = null; + private Broadcast expDir = null; public BroadcastVars(JavaSparkContext sc) { http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeExpLookupTable.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeExpLookupTable.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeExpLookupTable.java index 0db43f8..938c32e 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeExpLookupTable.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeExpLookupTable.java @@ -1,4 +1,4 @@ -/******************************************************************************* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,7 +15,7 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - *******************************************************************************/ + */ package org.apache.pirk.responder.wideskies.spark; import java.io.IOException; @@ -27,14 +27,13 @@ import java.util.TreeMap; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.log4j.Logger; import org.apache.pirk.query.wideskies.Query; -import org.apache.pirk.utils.LogUtils; import org.apache.pirk.utils.SystemConfiguration; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; - +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.Tuple2; /** @@ -43,7 +42,7 @@ import scala.Tuple2; */ public class ComputeExpLookupTable { - private static Logger logger = LogUtils.getLoggerForThisClass(); + private static final Logger logger = LoggerFactory.getLogger(ComputeExpLookupTable.class); /** * Method to create the distributed modular exponentiation lookup table in hdfs for a given Query @@ -64,7 +63,7 @@ public class ComputeExpLookupTable public static JavaPairRDD>> computeExpTable(JavaSparkContext sc, FileSystem fs, BroadcastVars bVars, Query query, String queryInputFile, String outputDirExp, boolean useModExpJoin) { - JavaPairRDD>> expCalculations = null; + JavaPairRDD>> expCalculations; logger.info("Creating expTable in hdfs for queryName = " + query.getQueryInfo().getQueryName()); @@ -83,7 +82,7 @@ public class ComputeExpLookupTable // Write the query hashes to a RDD TreeMap queryElements = query.getQueryElements(); - ArrayList keys = new ArrayList(queryElements.keySet()); + ArrayList keys = new ArrayList<>(queryElements.keySet()); int numSplits = Integer.parseInt(SystemConfiguration.getProperty("pir.expCreationSplits", "100")); JavaRDD queryHashes = sc.parallelize(keys, numSplits); @@ -100,7 +99,7 @@ public class ComputeExpLookupTable // Place exp table in query object and in the BroadcastVars Map queryHashFileNameMap = hashToPartition.collectAsMap(); - query.setExpFileBasedLookup(new HashMap(queryHashFileNameMap)); + query.setExpFileBasedLookup(new HashMap<>(queryHashFileNameMap)); query.writeToHDFSFile(new Path(queryInputFile), fs); bVars.setQuery(query); } http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java index 5391e41..ba7fd12 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java @@ -1,4 +1,4 @@ -/******************************************************************************* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,7 +15,7 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - *******************************************************************************/ + */ package org.apache.pirk.responder.wideskies.spark; import java.math.BigInteger; @@ -28,7 +28,6 @@ import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.log4j.Logger; import org.apache.pirk.inputformat.hadoop.BaseInputFormat; import org.apache.pirk.inputformat.hadoop.InputFormatConst; import org.apache.pirk.query.wideskies.Query; @@ -37,7 +36,6 @@ import org.apache.pirk.response.wideskies.Response; import org.apache.pirk.schema.data.LoadDataSchemas; import org.apache.pirk.schema.query.LoadQuerySchemas; import org.apache.pirk.schema.query.QuerySchema; -import org.apache.pirk.utils.LogUtils; import org.apache.pirk.utils.PIRException; import org.apache.pirk.utils.SystemConfiguration; import org.apache.spark.SparkConf; @@ -45,7 +43,8 @@ import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.elasticsearch.hadoop.mr.EsInputFormat; - +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.Tuple2; /** @@ -62,35 +61,34 @@ import scala.Tuple2; */ public class ComputeResponse { - private static Logger logger = LogUtils.getLoggerForThisClass(); + private static final Logger logger = LoggerFactory.getLogger(ComputeResponse.class); - String dataInputFormat = null; - String inputData = null; - String outputFile = null; - String outputDirExp = null; + private String dataInputFormat = null; + private String inputData = null; + private String outputFile = null; + private String outputDirExp = null; - String queryInput = null; - String stopListFile = null; + private String queryInput = null; - String esQuery = "none"; - String esResource = "none"; + private String esQuery = "none"; + private String esResource = "none"; - boolean useHDFSLookupTable = false; - boolean useModExpJoin = false; + private boolean useHDFSLookupTable = false; + private boolean useModExpJoin = false; - FileSystem fs = null; - JavaSparkContext sc = null; + private FileSystem fs = null; + private JavaSparkContext sc = null; - Accumulators accum = null; - BroadcastVars bVars = null; + private Accumulators accum = null; + private BroadcastVars bVars = null; - QueryInfo queryInfo = null; + private QueryInfo queryInfo = null; Query query = null; - int numDataPartitions = 0; - int numColMultPartitions = 0; + private int numDataPartitions = 0; + private int numColMultPartitions = 0; - boolean colMultReduceByKey = false; + private boolean colMultReduceByKey = false; public ComputeResponse(FileSystem fileSys) throws Exception { @@ -129,7 +127,7 @@ public class ComputeResponse outputDirExp = outputFile + "_exp"; queryInput = SystemConfiguration.getProperty("pir.queryInput"); - stopListFile = SystemConfiguration.getProperty("pir.stopListFile"); + String stopListFile = SystemConfiguration.getProperty("pir.stopListFile"); useModExpJoin = SystemConfiguration.getProperty("pir.useModExpJoin", "false").equals("true"); logger.info("outputFile = " + outputFile + " queryInputDir = " + queryInput + " stopListFile = " + stopListFile + " esQuery = " + esQuery @@ -199,7 +197,7 @@ public class ComputeResponse /** * Method to read in data from an allowed input source/format and perform the query */ - public void performQuery() throws ClassNotFoundException, Exception + public void performQuery() throws Exception { logger.info("Performing query: "); @@ -223,7 +221,7 @@ public class ComputeResponse { logger.info("Reading data "); - JavaRDD dataRDD = null; + JavaRDD dataRDD; Job job = new Job(); String baseQuery = SystemConfiguration.getProperty("pir.baseQuery"); @@ -271,7 +269,7 @@ public class ComputeResponse { logger.info("Reading data "); - JavaRDD dataRDD = null; + JavaRDD dataRDD; Job job = new Job(); String jobName = "pirSpark_ES_" + esQuery + "_" + System.currentTimeMillis(); @@ -314,7 +312,7 @@ public class ComputeResponse JavaPairRDD>> selectorGroupRDD = selectorHashToDocRDD.groupByKey(); // Calculate the encrypted row values for each row, emit for each row - JavaPairRDD encRowRDD = null; + JavaPairRDD encRowRDD; if (useModExpJoin) { // If we are pre-computing the modular exponentiation table and then joining the data partitions @@ -347,7 +345,7 @@ public class ComputeResponse private void encryptedColumnCalc(JavaPairRDD encRowRDD) throws PIRException { // Multiply the column values by colNum: emit - JavaPairRDD encColRDD = null; + JavaPairRDD encColRDD; if (colMultReduceByKey) { encColRDD = encRowRDD.reduceByKey(new EncColMultReducer(accum, bVars), numColMultPartitions); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultGroupedMapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultGroupedMapper.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultGroupedMapper.java index 84d00b4..72d6b95 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultGroupedMapper.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultGroupedMapper.java @@ -1,4 +1,4 @@ -/******************************************************************************* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,16 +15,15 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - *******************************************************************************/ + */ package org.apache.pirk.responder.wideskies.spark; import java.math.BigInteger; -import org.apache.log4j.Logger; import org.apache.pirk.query.wideskies.Query; -import org.apache.pirk.utils.LogUtils; import org.apache.spark.api.java.function.PairFunction; - +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.Tuple2; /** @@ -35,19 +34,14 @@ public class EncColMultGroupedMapper implements PairFunction(); + expTable = new HashMap<>(); logger.info("Initialized EncRowCalcPrecomputedCache - limitHitsPerSelector = " + limitHitsPerSelector + " maxHitsPerSelector = " + maxHitsPerSelector); } @@ -84,7 +77,7 @@ public class EncRowCalcPrecomputedCache implements public Iterable> call(Tuple2>,Iterable>>> hashDocTuple) throws Exception { - ArrayList> returnPairs = new ArrayList>(); + ArrayList> returnPairs = new ArrayList<>(); int rowIndex = hashDocTuple._1; accum.incNumHashes(1); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/responder/wideskies/spark/ExpKeyFilenameMap.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/ExpKeyFilenameMap.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/ExpKeyFilenameMap.java index c417a9d..0642e22 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/spark/ExpKeyFilenameMap.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/ExpKeyFilenameMap.java @@ -1,4 +1,4 @@ -/******************************************************************************* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,7 +15,7 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - *******************************************************************************/ + */ package org.apache.pirk.responder.wideskies.spark; import java.io.BufferedWriter; @@ -27,11 +27,10 @@ import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.log4j.Logger; -import org.apache.pirk.utils.LogUtils; import org.apache.spark.TaskContext; import org.apache.spark.api.java.function.PairFlatMapFunction; - +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.Tuple2; /** @@ -41,15 +40,13 @@ public class ExpKeyFilenameMap implements PairFlatMapFunction>> call(Integer queryHashKey) throws Exception { // queryHashKey -> <,> - ArrayList>> modExp = new ArrayList>>(); + ArrayList>> modExp = new ArrayList<>(); BigInteger element = query.getQueryElement(queryHashKey); for (int i = 0; i <= maxValue; ++i) { BigInteger modPow = ModPowAbstraction.modPow(element, BigInteger.valueOf(i), NSquared); - Tuple2 modPowTuple = new Tuple2(i, modPow); - modExp.add(new Tuple2>(queryHashKey, modPowTuple)); + Tuple2 modPowTuple = new Tuple2<>(i, modPow); + modExp.add(new Tuple2<>(queryHashKey, modPowTuple)); } return modExp; http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/responder/wideskies/spark/FilterData.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/FilterData.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/FilterData.java index 3eb37f6..2a54a38 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/spark/FilterData.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/FilterData.java @@ -1,4 +1,4 @@ -/******************************************************************************* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,19 +15,19 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - *******************************************************************************/ + */ package org.apache.pirk.responder.wideskies.spark; import org.apache.hadoop.io.MapWritable; -import org.apache.log4j.Logger; import org.apache.pirk.query.wideskies.QueryInfo; import org.apache.pirk.schema.data.DataSchema; import org.apache.pirk.schema.data.LoadDataSchemas; import org.apache.pirk.schema.query.LoadQuerySchemas; import org.apache.pirk.schema.query.QuerySchema; import org.apache.pirk.schema.query.filter.DataFilter; -import org.apache.pirk.utils.LogUtils; import org.apache.spark.api.java.function.Function; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Class to filter data as per the provided Filter (via the QuerySchema) @@ -36,19 +36,17 @@ public class FilterData implements Function { private static final long serialVersionUID = 1L; - private static Logger logger = LogUtils.getLoggerForThisClass(); + private static final Logger logger = LoggerFactory.getLogger(FilterData.class); - Accumulators accum = null; - BroadcastVars bbVars = null; - DataSchema dSchema = null; - Object filter = null; + private Accumulators accum = null; + private DataSchema dSchema = null; + private Object filter = null; public FilterData(Accumulators accumIn, BroadcastVars bbVarsIn) throws Exception { accum = accumIn; - bbVars = bbVarsIn; - QueryInfo queryInfo = bbVars.getQueryInfo(); + QueryInfo queryInfo = bbVarsIn.getQueryInfo(); QuerySchema qSchema = LoadQuerySchemas.getSchema(queryInfo.getQueryType()); dSchema = LoadDataSchemas.getSchema(qSchema.getDataSchemaName()); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/responder/wideskies/spark/HashSelectorsAndPartitionData.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/HashSelectorsAndPartitionData.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/HashSelectorsAndPartitionData.java index b614d42..bbd0edd 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/spark/HashSelectorsAndPartitionData.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/HashSelectorsAndPartitionData.java @@ -1,4 +1,4 @@ -/******************************************************************************* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,23 +15,22 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - *******************************************************************************/ + */ package org.apache.pirk.responder.wideskies.spark; import java.math.BigInteger; import java.util.ArrayList; import org.apache.hadoop.io.MapWritable; -import org.apache.log4j.Logger; import org.apache.pirk.query.wideskies.QueryInfo; import org.apache.pirk.responder.wideskies.common.HashSelectorAndPartitionData; import org.apache.pirk.schema.data.DataSchema; import org.apache.pirk.schema.data.LoadDataSchemas; import org.apache.pirk.schema.query.LoadQuerySchemas; import org.apache.pirk.schema.query.QuerySchema; -import org.apache.pirk.utils.LogUtils; import org.apache.spark.api.java.function.PairFunction; - +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.Tuple2; /** @@ -43,21 +42,15 @@ public class HashSelectorsAndPartitionData implements PairFunction> call(MapWritable doc) throws Exception { - Tuple2> returnTuple = null; + Tuple2> returnTuple; // Extract the selector, compute the hash, and partition the data element according to query type returnTuple = HashSelectorAndPartitionData.hashSelectorAndFormPartitionsBigInteger(doc, qSchema, dSchema, queryInfo); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/responder/wideskies/standalone/Responder.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/standalone/Responder.java b/src/main/java/org/apache/pirk/responder/wideskies/standalone/Responder.java index d851d70..879b618 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/standalone/Responder.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/standalone/Responder.java @@ -1,4 +1,4 @@ -/******************************************************************************* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,28 +15,27 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - *******************************************************************************/ + */ package org.apache.pirk.responder.wideskies.standalone; import java.io.BufferedReader; -import java.io.File; import java.io.FileReader; import java.io.IOException; import java.math.BigInteger; import java.util.ArrayList; import java.util.TreeMap; -import org.apache.log4j.Logger; import org.apache.pirk.encryption.ModPowAbstraction; import org.apache.pirk.query.wideskies.Query; import org.apache.pirk.query.wideskies.QueryInfo; import org.apache.pirk.query.wideskies.QueryUtils; import org.apache.pirk.response.wideskies.Response; import org.apache.pirk.utils.KeyedHash; -import org.apache.pirk.utils.LogUtils; import org.apache.pirk.utils.SystemConfiguration; import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Class to perform stand alone responder functionalities @@ -51,18 +50,18 @@ import org.json.simple.parser.JSONParser; */ public class Responder { - private static Logger logger = LogUtils.getLoggerForThisClass(); + private static final Logger logger = LoggerFactory.getLogger(Responder.class); - Query query = null; - QueryInfo queryInfo = null; + private Query query = null; + private QueryInfo queryInfo = null; - String queryType = null; + private String queryType = null; - Response response = null; + private Response response = null; - TreeMap columns = null; // the column values for the PIR calculations + private TreeMap columns = null; // the column values for the PIR calculations - ArrayList rowColumnCounters; // keeps track of how many hit partitions have been recorded for each row/selector + private ArrayList rowColumnCounters; // keeps track of how many hit partitions have been recorded for each row/selector public Responder(Query queryInput) { @@ -73,10 +72,10 @@ public class Responder response = new Response(queryInfo); // Columns are allocated as needed, initialized to 1 - columns = new TreeMap(); + columns = new TreeMap<>(); // Initialize row counters - rowColumnCounters = new ArrayList(); + rowColumnCounters = new ArrayList<>(); for (int i = 0; i < Math.pow(2, queryInfo.getHashBitSize()); ++i) { rowColumnCounters.add(0); @@ -178,7 +177,7 @@ public class Responder BigInteger column = columns.get(i + rowCounter); // the next 'free' column relative to the selector logger.debug("Before: columns.get(" + (i + rowCounter) + ") = " + columns.get(i + rowCounter)); - BigInteger exp = null; + BigInteger exp; if (query.getQueryInfo().getUseExpLookupTable() && !query.getQueryInfo().getUseHDFSExpLookupTable()) // using the standalone // lookup table { http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/response/wideskies/Response.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/response/wideskies/Response.java b/src/main/java/org/apache/pirk/response/wideskies/Response.java index 6127b06..3d2a3c0 100644 --- a/src/main/java/org/apache/pirk/response/wideskies/Response.java +++ b/src/main/java/org/apache/pirk/response/wideskies/Response.java @@ -1,4 +1,4 @@ -/******************************************************************************* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,7 +15,7 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - *******************************************************************************/ + */ package org.apache.pirk.response.wideskies; import java.io.File; @@ -30,9 +30,9 @@ import java.util.TreeMap; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.log4j.Logger; import org.apache.pirk.query.wideskies.QueryInfo; -import org.apache.pirk.utils.LogUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Class to hold the encrypted response elements for the PIR query @@ -44,17 +44,17 @@ public class Response implements Serializable { private static final long serialVersionUID = 1L; - private static Logger logger = LogUtils.getLoggerForThisClass(); + private static final Logger logger = LoggerFactory.getLogger(Response.class); - QueryInfo queryInfo = null; // holds all query info + private QueryInfo queryInfo = null; // holds all query info - TreeMap responseElements = null; // encrypted response columns, colNum -> column + private TreeMap responseElements = null; // encrypted response columns, colNum -> column public Response(QueryInfo queryInfoInput) { queryInfo = queryInfoInput; - responseElements = new TreeMap(); + responseElements = new TreeMap<>(); } public TreeMap getResponseElements() http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/schema/data/DataSchema.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/schema/data/DataSchema.java b/src/main/java/org/apache/pirk/schema/data/DataSchema.java index 11c3c8e..e0512bb 100644 --- a/src/main/java/org/apache/pirk/schema/data/DataSchema.java +++ b/src/main/java/org/apache/pirk/schema/data/DataSchema.java @@ -1,4 +1,4 @@ -/******************************************************************************* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,7 +15,7 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - *******************************************************************************/ + */ package org.apache.pirk.schema.data; import java.io.Serializable; @@ -23,10 +23,10 @@ import java.util.HashMap; import java.util.HashSet; import org.apache.hadoop.io.Text; -import org.apache.log4j.Logger; import org.apache.pirk.schema.data.partitioner.DataPartitioner; import org.apache.pirk.schema.data.partitioner.PrimitiveTypePartitioner; -import org.apache.pirk.utils.LogUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Class to hold a data schema @@ -35,21 +35,21 @@ public class DataSchema implements Serializable { private static final long serialVersionUID = 1L; - private static Logger logger = LogUtils.getLoggerForThisClass(); + private static final Logger logger = LoggerFactory.getLogger(DataSchema.class); - String schemaName = null; + private String schemaName = null; - String primitiveTypePartitionerName = null; + private String primitiveTypePartitionerName = null; - transient HashMap textRep = null; // string element name -> Text representation + private transient HashMap textRep = null; // string element name -> Text representation - transient HashMap partitionerInstances = null; // partitioner class name -> Text representation + private transient HashMap partitionerInstances = null; // partitioner class name -> Text representation - HashMap typeMap = null; // string element name -> java type + private HashMap typeMap = null; // string element name -> java type - HashMap partitionerMap = null; // string element name -> partitioner class name + private HashMap partitionerMap = null; // string element name -> partitioner class name - HashSet listRep = null; // elements that are list/array types + private HashSet listRep = null; // elements that are list/array types public DataSchema(String schemaNameInput, HashMap textRepInput, HashSet listRepInput, HashMap typeMapInput, HashMap partitionerMapInput) @@ -78,7 +78,7 @@ public class DataSchema implements Serializable private void constructTextRep() { - textRep = new HashMap(); + textRep = new HashMap<>(); for (String name : typeMap.keySet()) { textRep.put(name, new Text(name)); @@ -101,7 +101,7 @@ public class DataSchema implements Serializable private void constructPartitionerInstances() throws Exception { - partitionerInstances = new HashMap(); + partitionerInstances = new HashMap<>(); for (String partitionerName : partitionerMap.values()) { if (!partitionerInstances.containsKey(partitionerName)) @@ -202,7 +202,7 @@ public class DataSchema implements Serializable public HashSet getNonListRep() { - HashSet elements = new HashSet(); + HashSet elements = new HashSet<>(); elements.addAll(textRep.keySet()); elements.removeAll(listRep); return elements; http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/schema/data/LoadDataSchemas.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/schema/data/LoadDataSchemas.java b/src/main/java/org/apache/pirk/schema/data/LoadDataSchemas.java index 5bbe754..73995e8 100644 --- a/src/main/java/org/apache/pirk/schema/data/LoadDataSchemas.java +++ b/src/main/java/org/apache/pirk/schema/data/LoadDataSchemas.java @@ -1,4 +1,4 @@ -/******************************************************************************* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,7 +15,7 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - *******************************************************************************/ + */ package org.apache.pirk.schema.data; import java.io.File; @@ -30,11 +30,11 @@ import javax.xml.parsers.DocumentBuilderFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; -import org.apache.log4j.Logger; import org.apache.pirk.schema.data.partitioner.DataPartitioner; import org.apache.pirk.schema.data.partitioner.PrimitiveTypePartitioner; -import org.apache.pirk.utils.LogUtils; import org.apache.pirk.utils.SystemConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.w3c.dom.Document; import org.w3c.dom.Element; import org.w3c.dom.Node; @@ -64,11 +64,11 @@ import org.w3c.dom.NodeList; */ public class LoadDataSchemas { - private static Logger logger = LogUtils.getLoggerForThisClass(); + private static final Logger logger = LoggerFactory.getLogger(LoadDataSchemas.class); - public static HashMap schemaMap; + private static HashMap schemaMap; - public static HashSet allowedPrimitiveJavaTypes = new HashSet(Arrays.asList(PrimitiveTypePartitioner.BYTE, PrimitiveTypePartitioner.SHORT, + private static HashSet allowedPrimitiveJavaTypes = new HashSet<>(Arrays.asList(PrimitiveTypePartitioner.BYTE, PrimitiveTypePartitioner.SHORT, PrimitiveTypePartitioner.INT, PrimitiveTypePartitioner.LONG, PrimitiveTypePartitioner.FLOAT, PrimitiveTypePartitioner.DOUBLE, PrimitiveTypePartitioner.CHAR, PrimitiveTypePartitioner.STRING)); @@ -76,7 +76,7 @@ public class LoadDataSchemas { logger.info("Loading data schemas: "); - schemaMap = new HashMap(); + schemaMap = new HashMap<>(); try { initialize(); @@ -126,21 +126,21 @@ public class LoadDataSchemas private static DataSchema loadDataSchemaFile(String schemaFile, boolean hdfs, FileSystem fs) throws Exception { - DataSchema dataSchema = null; + DataSchema dataSchema; // Initialize the elements needed to create the DataSchema - String schemaName = null; - HashMap textRep = new HashMap(); - HashSet listRep = new HashSet(); - HashMap typeMap = new HashMap(); - HashMap partitionerMap = new HashMap(); - HashMap partitionerInstances = new HashMap(); + String schemaName; + HashMap textRep = new HashMap<>(); + HashSet listRep = new HashSet<>(); + HashMap typeMap = new HashMap<>(); + HashMap partitionerMap = new HashMap<>(); + HashMap partitionerInstances = new HashMap<>(); DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance(); DocumentBuilder dBuilder = dbFactory.newDocumentBuilder(); // Read in and parse the schema file - Document doc = null; + Document doc; if (hdfs) { Path filePath = new Path(schemaFile);