metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ceste...@apache.org
Subject incubator-metron git commit: METRON-257 Enable pcap result pagination from the Pcap CLI (mmiklavc via cestella) closes apache/incubator-metron#256
Date Mon, 19 Sep 2016 13:04:44 GMT
Repository: incubator-metron
Updated Branches:
  refs/heads/master 095313255 -> e0c9970b1


METRON-257 Enable pcap result pagination from the Pcap CLI (mmiklavc via cestella) closes apache/incubator-metron#256


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/e0c9970b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/e0c9970b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/e0c9970b

Branch: refs/heads/master
Commit: e0c9970b1f467135f01ee212fb2a8d7f2d61de8b
Parents: 0953132
Author: mmiklavc <michael.miklavcic@gmail.com>
Authored: Mon Sep 19 09:04:32 2016 -0400
Committer: cstella <cestella@gmail.com>
Committed: Mon Sep 19 09:04:32 2016 -0400

----------------------------------------------------------------------
 .../pcapservice/PcapReceiverImplRestEasy.java   |  38 +++--
 .../PcapReceiverImplRestEasyTest.java           |   4 +-
 .../common/hadoop/SequenceFileIterable.java     | 139 +++++++++++++++++++
 .../org/apache/metron/pcap/query/CliConfig.java |  18 ++-
 .../org/apache/metron/pcap/query/CliParser.java |  24 +++-
 .../org/apache/metron/pcap/query/PcapCli.java   |  31 +++--
 .../org/apache/metron/pcap/PcapJobTest.java     |  34 ++++-
 .../PcapTopologyIntegrationTest.java            |  48 +++----
 .../apache/metron/pcap/query/PcapCliTest.java   | 139 ++++++++++++-------
 .../java/org/apache/metron/pcap/mr/PcapJob.java |  43 +++---
 pom.xml                                         |   2 +-
 11 files changed, 383 insertions(+), 137 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e0c9970b/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java b/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java
index 18b5dc9..5a2a0ae 100644
--- a/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java
+++ b/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java
@@ -19,11 +19,13 @@ package org.apache.metron.pcapservice;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.log4j.Logger;
 import org.apache.metron.common.Constants;
+import org.apache.metron.common.hadoop.SequenceFileIterable;
 import org.apache.metron.common.utils.timestamp.TimestampConverters;
 import org.apache.metron.pcap.filter.fixed.FixedPcapFilter;
 import org.apache.metron.pcap.filter.query.QueryPcapFilter;
@@ -120,6 +122,7 @@ public class PcapReceiverImplRestEasy {
 
           throws IOException {
     PcapsResponse response = new PcapsResponse();
+    SequenceFileIterable results = null;
     try {
       if (startTime < 0) {
         startTime = 0L;
@@ -137,7 +140,7 @@ public class PcapReceiverImplRestEasy {
       if(LOGGER.isDebugEnabled()) {
         LOGGER.debug("Query received: " + query);
       }
-      response.setPcaps(getQueryUtil().query(new org.apache.hadoop.fs.Path(ConfigurationUtil.getPcapOutputPath())
+      results = getQueryUtil().query(new org.apache.hadoop.fs.Path(ConfigurationUtil.getPcapOutputPath())
               , new org.apache.hadoop.fs.Path(ConfigurationUtil.getTempQueryOutputPath())
               , startTime
               , endTime
@@ -146,13 +149,17 @@ public class PcapReceiverImplRestEasy {
               , CONFIGURATION.get()
               , FileSystem.get(CONFIGURATION.get())
               , new QueryPcapFilter.Configurator()
-              )
       );
 
+      response.setPcaps(results != null ? Lists.newArrayList(results) : null);
     } catch (Exception e) {
       LOGGER.error("Exception occurred while fetching Pcaps by identifiers :",
               e);
       throw new WebApplicationException("Unable to fetch Pcaps via MR job", e);
+    } finally {
+      if (null != results) {
+        results.cleanup();
+      }
     }
 
     // return http status '200 OK' along with the complete pcaps response file,
@@ -205,6 +212,7 @@ public class PcapReceiverImplRestEasy {
 
     final boolean includeReverseTrafficF = includeReverseTraffic;
     PcapsResponse response = new PcapsResponse();
+    SequenceFileIterable results = null;
     try {
       if(startTime < 0) {
         startTime = 0L;
@@ -237,22 +245,26 @@ public class PcapReceiverImplRestEasy {
       if(LOGGER.isDebugEnabled()) {
         LOGGER.debug("Query received: " + Joiner.on(",").join(query.entrySet()));
       }
-      response.setPcaps(getQueryUtil().query(new org.apache.hadoop.fs.Path(ConfigurationUtil.getPcapOutputPath())
-                                    , new org.apache.hadoop.fs.Path(ConfigurationUtil.getTempQueryOutputPath())
-                                    , startTime
-                                    , endTime
-                                    , numReducers
-                                    , query
-                                    , CONFIGURATION.get()
-                                    , FileSystem.get(CONFIGURATION.get())
-                                    , new FixedPcapFilter.Configurator()
-                                    )
-                     );
+      results = getQueryUtil().query(new org.apache.hadoop.fs.Path(ConfigurationUtil.getPcapOutputPath())
+              , new org.apache.hadoop.fs.Path(ConfigurationUtil.getTempQueryOutputPath())
+              , startTime
+              , endTime
+              , numReducers
+              , query
+              , CONFIGURATION.get()
+              , FileSystem.get(CONFIGURATION.get())
+              , new FixedPcapFilter.Configurator()
+      );
+      response.setPcaps(results != null ? Lists.newArrayList(results) : null);
 
     } catch (Exception e) {
       LOGGER.error("Exception occurred while fetching Pcaps by identifiers :",
               e);
       throw new WebApplicationException("Unable to fetch Pcaps via MR job", e);
+    } finally {
+      if (null != results) {
+        results.cleanup();
+      }
     }
 
     // return http status '200 OK' along with the complete pcaps response file,

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e0c9970b/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasyTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasyTest.java b/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasyTest.java
index 1c1c236..dba87cf 100644
--- a/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasyTest.java
+++ b/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasyTest.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.metron.common.Constants;
+import org.apache.metron.common.hadoop.SequenceFileIterable;
 import org.apache.metron.common.utils.timestamp.TimestampConverters;
 import org.apache.metron.pcap.filter.PcapFilterConfigurator;
 import org.apache.metron.pcap.mr.PcapJob;
@@ -31,7 +32,6 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.util.EnumMap;
-import java.util.List;
 
 public class PcapReceiverImplRestEasyTest {
 
@@ -44,7 +44,7 @@ public class PcapReceiverImplRestEasyTest {
     PcapFilterConfigurator<R> filterImpl;
 
     @Override
-    public <T> List<byte[]> query( Path basePath
+    public <T> SequenceFileIterable query(Path basePath
             , Path baseOutputPath
             , long beginNS
             , long endNS

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e0c9970b/metron-platform/metron-common/src/main/java/org/apache/metron/common/hadoop/SequenceFileIterable.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/hadoop/SequenceFileIterable.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/hadoop/SequenceFileIterable.java
new file mode 100644
index 0000000..a57cd35
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/hadoop/SequenceFileIterable.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.hadoop;
+
+import com.google.common.collect.Iterators;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import static java.lang.String.format;
+
+public class SequenceFileIterable implements Iterable<byte[]> {
+  private static final Logger LOGGER = Logger.getLogger(SequenceFileIterable.class);
+  private List<Path> files;
+  private Configuration config;
+
+  public SequenceFileIterable(List<Path> files, Configuration config) {
+    this.files = files;
+    this.config = config;
+  }
+
+  @Override
+  public Iterator<byte[]> iterator() {
+    return Iterators.concat(getIterators(files, config));
+  }
+
+  private Iterator<byte[]>[] getIterators(List<Path> files, Configuration config) {
+    return files.stream().map(f -> new SequenceFileIterator(f, config)).toArray(Iterator[]::new);
+  }
+
+  /**
+   * Cleans up all files read by this Iterable
+   *
+   * @return true if success, false if any files were not deleted
+   * @throws IOException
+   */
+  public boolean cleanup() throws IOException {
+    FileSystem fileSystem = FileSystem.get(config);
+    boolean success = true;
+    for (Path file : files) {
+      success &= fileSystem.delete(file, false);
+    }
+    return success;
+  }
+
+  private static class SequenceFileIterator implements Iterator<byte[]> {
+    private Path path;
+    private Configuration config;
+    private SequenceFile.Reader reader;
+    private LongWritable key = new LongWritable();
+    private BytesWritable value = new BytesWritable();
+    private byte[] next;
+    private boolean finished = false;
+
+    public SequenceFileIterator(Path path, Configuration config) {
+      this.path = path;
+      this.config = config;
+    }
+
+    @Override
+    public boolean hasNext() {
+      if (!finished && null == reader) {
+        try {
+          reader = new SequenceFile.Reader(config, SequenceFile.Reader.file(path));
+          LOGGER.debug("Writing file: " + path.toString());
+        } catch (IOException e) {
+          throw new RuntimeException("Failed to get reader", e);
+        }
+      } else {
+        LOGGER.debug(format("finished=%s, reader=%s, next=%s", finished, reader, next));
+      }
+      try {
+        //ensure hasnext is idempotent
+        if (!finished) {
+          if (null == next && reader.next(key, value)) {
+            next = value.copyBytes();
+          } else if (null == next) {
+            close();
+          }
+        }
+      } catch (IOException e) {
+        close();
+        throw new RuntimeException("Failed to get next record", e);
+      }
+      return (null != next);
+    }
+
+    private void close() {
+      LOGGER.debug("Closing file: " + path.toString());
+      finished = true;
+      try {
+        if (reader != null) {
+          reader.close();
+          reader = null;
+        }
+      } catch (IOException e) {
+        // ah well, we tried...
+        LOGGER.warn("Error closing file", e);
+      }
+    }
+
+    @Override
+    public byte[] next() {
+      byte[] ret = null;
+      if (hasNext()) {
+        ret = next;
+        next = null; //don't want same record more than once
+      } else {
+        throw new NoSuchElementException("No more records");
+      }
+      return ret;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e0c9970b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliConfig.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliConfig.java
index f8ab0ac..294844f 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliConfig.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliConfig.java
@@ -23,22 +23,22 @@ import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 
 public class CliConfig {
-  public static final String BASE_PATH_DEFAULT = "/apps/metron/pcap";
-  public static final String BASE_OUTPUT_PATH_DEFAULT = "/tmp";
   private boolean showHelp;
   private String basePath;
   private String baseOutputPath;
   private long startTime;
   private long endTime;
-  private int numReducers = 0;
+  private int numReducers;
+  private int numRecordsPerFile;
   private DateFormat dateFormat;
 
   public CliConfig() {
     showHelp = false;
-    basePath = BASE_PATH_DEFAULT;
-    baseOutputPath = BASE_OUTPUT_PATH_DEFAULT;
+    basePath = "";
+    baseOutputPath = "";
     startTime = -1L;
     endTime = -1L;
+    numReducers = 0;
   }
 
   public int getNumReducers() {
@@ -100,4 +100,12 @@ public class CliConfig {
   public void setNumReducers(int numReducers) {
     this.numReducers = numReducers;
   }
+
+  public int getNumRecordsPerFile() {
+    return numRecordsPerFile;
+  }
+
+  public void setNumRecordsPerFile(int numRecordsPerFile) {
+    this.numRecordsPerFile = numRecordsPerFile;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e0c9970b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java
index ea6f8e7..83e9fcf 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java
@@ -24,6 +24,10 @@ import org.apache.commons.cli.*;
  * Provides commmon required fields for the PCAP filter jobs
  */
 public class CliParser {
+  public static final String BASE_PATH_DEFAULT = "/apps/metron/pcap";
+  public static final String BASE_OUTPUT_PATH_DEFAULT = "/tmp";
+  public static final int NUM_REDUCERS_DEFAULT = 10;
+  public static final int NUM_RECORDS_PER_FILE_DEFAULT = 10000;
   private CommandLineParser parser;
 
   public CliParser() {
@@ -33,10 +37,11 @@ public class CliParser {
   public Options buildOptions() {
     Options options = new Options();
     options.addOption(newOption("h", "help", false, "Display help"));
-    options.addOption(newOption("bp", "base_path", true, String.format("Base PCAP data path. Default is '%s'", CliConfig.BASE_PATH_DEFAULT)));
-    options.addOption(newOption("bop", "base_output_path", true, String.format("Query result output path. Default is '%s'", CliConfig.BASE_OUTPUT_PATH_DEFAULT)));
+    options.addOption(newOption("bp", "base_path", true, String.format("Base PCAP data path. Default is '%s'", BASE_PATH_DEFAULT)));
+    options.addOption(newOption("bop", "base_output_path", true, String.format("Query result output path. Default is '%s'", BASE_OUTPUT_PATH_DEFAULT)));
     options.addOption(newOption("st", "start_time", true, "(required) Packet start time range.", true));
-    options.addOption(newOption("nr", "num_reducers", true, "Number of reducers to use", true));
+    options.addOption(newOption("nr", "num_reducers", true, String.format("Number of reducers to use (defaults to %s)", NUM_REDUCERS_DEFAULT)));
+    options.addOption(newOption("rpf", "records_per_file", true, String.format("Number of records to include in each output pcap file (defaults to %s)", NUM_RECORDS_PER_FILE_DEFAULT)));
     options.addOption(newOption("et", "end_time", true, "Packet end time range. Default is current system time."));
     options.addOption(newOption("df", "date_format", true, "Date format to use for parsing start_time and end_time. Default is to use time in millis since the epoch."));
     return options;
@@ -61,9 +66,13 @@ public class CliParser {
     }
     if (commandLine.hasOption("base_path")) {
       config.setBasePath(commandLine.getOptionValue("base_path"));
+    } else {
+      config.setBasePath(BASE_PATH_DEFAULT);
     }
     if (commandLine.hasOption("base_output_path")) {
       config.setBaseOutputPath(commandLine.getOptionValue("base_output_path"));
+    } else {
+      config.setBaseOutputPath(BASE_OUTPUT_PATH_DEFAULT);
     }
     if (commandLine.hasOption("start_time")) {
       try {
@@ -83,7 +92,14 @@ public class CliParser {
       config.setNumReducers(numReducers);
     }
     else {
-      config.setNumReducers(10);
+      config.setNumReducers(NUM_REDUCERS_DEFAULT);
+    }
+    if (commandLine.hasOption("records_per_file")) {
+      int numRecordsPerFile = Integer.parseInt(commandLine.getOptionValue("records_per_file"));
+      config.setNumRecordsPerFile(numRecordsPerFile);
+    }
+    else {
+      config.setNumRecordsPerFile(NUM_RECORDS_PER_FILE_DEFAULT);
     }
     if (commandLine.hasOption("end_time")) {
       try {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e0c9970b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java
index d96e166..d2e6807 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java
@@ -17,12 +17,14 @@
  */
 package org.apache.metron.pcap.query;
 
+import com.google.common.collect.Iterables;
 import org.apache.commons.cli.ParseException;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.metron.common.hadoop.SequenceFileIterable;
 import org.apache.metron.common.system.Clock;
 import org.apache.metron.common.utils.timestamp.TimestampConverters;
 import org.apache.metron.pcap.filter.fixed.FixedPcapFilter;
@@ -32,7 +34,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
@@ -59,7 +60,7 @@ public class PcapCli {
       return -1;
     }
     String jobType = args[0];
-    List<byte[]> results = new ArrayList<>();
+    SequenceFileIterable results = null;
     String[] commandArgs = Arrays.copyOfRange(args, 1, args.length);
     Configuration hadoopConf = new Configuration();
     String[] otherArgs = null;
@@ -69,13 +70,16 @@ public class PcapCli {
       LOGGER.error("Failed to configure hadoop with provided options: " + e.getMessage(), e);
       return -1;
     }
+    CliConfig commonConfig = null;
     if ("fixed".equals(jobType)) {
       FixedCliParser fixedParser = new FixedCliParser();
       FixedCliConfig config = null;
       try {
         config = fixedParser.parse(otherArgs);
+        commonConfig = config;
       } catch (ParseException | java.text.ParseException e) {
         System.err.println(e.getMessage());
+        System.err.flush();
         fixedParser.printHelp();
         return -1;
       }
@@ -110,6 +114,7 @@ public class PcapCli {
       QueryCliConfig config = null;
       try {
         config = queryParser.parse(otherArgs);
+        commonConfig = config;
       } catch (ParseException | java.text.ParseException e) {
         System.err.println(e.getMessage());
         queryParser.printHelp();
@@ -145,18 +150,28 @@ public class PcapCli {
       printBasicHelp();
       return -1;
     }
-    String timestamp = clock.currentTimeFormatted("yyyyMMddHHmmssSSSZ");
-    String outFileName = String.format("pcap-data-%s.pcap", timestamp);
     try {
-      if(results.size() > 0) {
-        resultsWriter.write(results, outFileName);
-      }
-      else {
+      Iterable<List<byte[]>> partitions = Iterables.partition(results, commonConfig.getNumRecordsPerFile());
+      if (partitions.iterator().hasNext()) {
+        for (List<byte[]> data : partitions) {
+          String timestamp = clock.currentTimeFormatted("yyyyMMddHHmmssSSSZ");
+          String outFileName = String.format("pcap-data-%s.pcap", timestamp);
+          if(data.size() > 0) {
+            resultsWriter.write(data, outFileName);
+          }
+        }
+      } else {
         System.out.println("No results returned.");
       }
     } catch (IOException e) {
       LOGGER.error("Unable to write file", e);
       return -1;
+    } finally {
+      try {
+        results.cleanup();
+      } catch(IOException e) {
+        LOGGER.warn("Unable to cleanup files in HDFS", e);
+      }
     }
     return 0;
   }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e0c9970b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java
index 17c9325..81725d8 100644
--- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java
+++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java
@@ -19,8 +19,11 @@
 package org.apache.metron.pcap;
 
 import com.google.common.collect.Iterables;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.metron.common.utils.timestamp.TimestampConverters;
 import org.apache.metron.pcap.mr.PcapJob;
 import org.junit.Assert;
@@ -30,6 +33,9 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import static java.lang.Long.toUnsignedString;
+import static org.hamcrest.CoreMatchers.equalTo;
+
 public class PcapJobTest {
 
   @Test
@@ -48,6 +54,7 @@ public class PcapJobTest {
       Assert.assertTrue(Iterables.isEmpty(paths));
     }
   }
+
   @Test
   public void test_getPaths_leftEdge() throws Exception {
     PcapJob job;
@@ -63,9 +70,10 @@ public class PcapJobTest {
         }
       };
       Iterable<String> paths = job.getPaths(null, null, 0, TimestampConverters.MILLISECONDS.toNanoseconds(System.currentTimeMillis()));
-      Assert.assertEquals(1,Iterables.size(paths));
+      Assert.assertEquals(1, Iterables.size(paths));
     }
   }
+
   @Test
   public void test_getPaths_rightEdge() throws Exception {
     PcapJob job;
@@ -80,8 +88,8 @@ public class PcapJobTest {
           return inputFiles;
         }
       };
-      Iterable<String> paths = job.getPaths(null, null, 1461589333993573000L-1L, 1461589333993573000L + 1L);
-      Assert.assertEquals(2,Iterables.size(paths));
+      Iterable<String> paths = job.getPaths(null, null, 1461589333993573000L - 1L, 1461589333993573000L + 1L);
+      Assert.assertEquals(2, Iterables.size(paths));
     }
     {
       final List<Path> inputFiles = new ArrayList<Path>() {{
@@ -95,10 +103,11 @@ public class PcapJobTest {
           return inputFiles;
         }
       };
-      Iterable<String> paths = job.getPaths(null, null, 1461589334993573000L-1L, 1461589334993573000L + 1L);
-      Assert.assertEquals(2,Iterables.size(paths));
+      Iterable<String> paths = job.getPaths(null, null, 1461589334993573000L - 1L, 1461589334993573000L + 1L);
+      Assert.assertEquals(2, Iterables.size(paths));
     }
   }
+
   @Test
   public void test_getPaths_bothEdges() throws Exception {
     PcapJob job;
@@ -115,7 +124,20 @@ public class PcapJobTest {
         }
       };
       Iterable<String> paths = job.getPaths(null, null, 0, TimestampConverters.MILLISECONDS.toNanoseconds(System.currentTimeMillis()));
-      Assert.assertEquals(3,Iterables.size(paths));
+      Assert.assertEquals(3, Iterables.size(paths));
     }
   }
+
+  @Test
+  public void partition_gives_value_in_range() throws Exception {
+    long start = 1473897600000000000L;
+    long end = TimestampConverters.MILLISECONDS.toNanoseconds(1473995927455L);
+    Configuration conf = new Configuration();
+    conf.set(PcapJob.START_TS_CONF, toUnsignedString(start));
+    conf.set(PcapJob.END_TS_CONF, toUnsignedString(end));
+    conf.set(PcapJob.WIDTH_CONF, "" + PcapJob.findWidth(start, end, 10));
+    PcapJob.PcapPartitioner partitioner = new PcapJob.PcapPartitioner();
+    partitioner.setConf(conf);
+    Assert.assertThat("Partition not in range", partitioner.getPartition(new LongWritable(1473978789181189000L), new BytesWritable(), 10), equalTo(8));
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e0c9970b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
index d4367ea..0dd07aa 100644
--- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
+++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
@@ -271,7 +271,7 @@ public class PcapTopologyIntegrationTest {
       PcapJob job = new PcapJob();
       {
         //Ensure that only two pcaps are returned when we look at 4 and 5
-        List<byte[]> results =
+        Iterable<byte[]> results =
                 job.query(new Path(outDir.getAbsolutePath())
                         , new Path(queryDir.getAbsolutePath())
                         , getTimestamp(4, pcapEntries)
@@ -283,12 +283,12 @@ public class PcapTopologyIntegrationTest {
                         , new FixedPcapFilter.Configurator()
                 );
         assertInOrder(results);
-        Assert.assertEquals(results.size(), 2);
+        Assert.assertEquals(Iterables.size(results), 2);
       }
       {
         // Ensure that only two pcaps are returned when we look at 4 and 5
         // test with empty query filter
-        List<byte[]> results =
+        Iterable<byte[]> results =
                 job.query(new Path(outDir.getAbsolutePath())
                         , new Path(queryDir.getAbsolutePath())
                         , getTimestamp(4, pcapEntries)
@@ -300,11 +300,11 @@ public class PcapTopologyIntegrationTest {
                         , new QueryPcapFilter.Configurator()
                 );
         assertInOrder(results);
-        Assert.assertEquals(results.size(), 2);
+        Assert.assertEquals(Iterables.size(results), 2);
       }
       {
         //ensure that none get returned since that destination IP address isn't in the dataset
-        List<byte[]> results =
+        Iterable<byte[]> results =
                 job.query(new Path(outDir.getAbsolutePath())
                         , new Path(queryDir.getAbsolutePath())
                         , getTimestamp(0, pcapEntries)
@@ -318,12 +318,12 @@ public class PcapTopologyIntegrationTest {
                         , new FixedPcapFilter.Configurator()
                 );
         assertInOrder(results);
-        Assert.assertEquals(results.size(), 0);
+        Assert.assertEquals(Iterables.size(results), 0);
       }
       {
         // ensure that none get returned since that destination IP address isn't in the dataset
         // test with query filter
-        List<byte[]> results =
+        Iterable<byte[]> results =
                 job.query(new Path(outDir.getAbsolutePath())
                         , new Path(queryDir.getAbsolutePath())
                         , getTimestamp(0, pcapEntries)
@@ -335,11 +335,11 @@ public class PcapTopologyIntegrationTest {
                         , new QueryPcapFilter.Configurator()
                 );
         assertInOrder(results);
-        Assert.assertEquals(results.size(), 0);
+        Assert.assertEquals(Iterables.size(results), 0);
       }
       {
         //same with protocol as before with the destination addr
-        List<byte[]> results =
+        Iterable<byte[]> results =
                 job.query(new Path(outDir.getAbsolutePath())
                         , new Path(queryDir.getAbsolutePath())
                         , getTimestamp(0, pcapEntries)
@@ -353,12 +353,12 @@ public class PcapTopologyIntegrationTest {
                         , new FixedPcapFilter.Configurator()
                 );
         assertInOrder(results);
-        Assert.assertEquals(results.size(), 0);
+        Assert.assertEquals(Iterables.size(results), 0);
       }
       {
         //same with protocol as before with the destination addr
         //test with query filter
-        List<byte[]> results =
+        Iterable<byte[]> results =
                 job.query(new Path(outDir.getAbsolutePath())
                         , new Path(queryDir.getAbsolutePath())
                         , getTimestamp(0, pcapEntries)
@@ -370,11 +370,11 @@ public class PcapTopologyIntegrationTest {
                         , new QueryPcapFilter.Configurator()
                 );
         assertInOrder(results);
-        Assert.assertEquals(results.size(), 0);
+        Assert.assertEquals(Iterables.size(results), 0);
       }
       {
         //make sure I get them all.
-        List<byte[]> results =
+        Iterable<byte[]> results =
                 job.query(new Path(outDir.getAbsolutePath())
                         , new Path(queryDir.getAbsolutePath())
                         , getTimestamp(0, pcapEntries)
@@ -386,12 +386,12 @@ public class PcapTopologyIntegrationTest {
                         , new FixedPcapFilter.Configurator()
                 );
         assertInOrder(results);
-        Assert.assertEquals(results.size(), pcapEntries.size());
+        Assert.assertEquals(Iterables.size(results), pcapEntries.size());
       }
       {
         //make sure I get them all.
         //with query filter
-        List<byte[]> results =
+        Iterable<byte[]> results =
                 job.query(new Path(outDir.getAbsolutePath())
                         , new Path(queryDir.getAbsolutePath())
                         , getTimestamp(0, pcapEntries)
@@ -403,10 +403,10 @@ public class PcapTopologyIntegrationTest {
                         , new QueryPcapFilter.Configurator()
                 );
         assertInOrder(results);
-        Assert.assertEquals(results.size(), pcapEntries.size());
+        Assert.assertEquals(Iterables.size(results), pcapEntries.size());
       }
       {
-        List<byte[]> results =
+        Iterable<byte[]> results =
                 job.query(new Path(outDir.getAbsolutePath())
                         , new Path(queryDir.getAbsolutePath())
                         , getTimestamp(0, pcapEntries)
@@ -420,8 +420,8 @@ public class PcapTopologyIntegrationTest {
                         , new FixedPcapFilter.Configurator()
                 );
         assertInOrder(results);
-        Assert.assertTrue(results.size() > 0);
-        Assert.assertEquals(results.size()
+        Assert.assertTrue(Iterables.size(results) > 0);
+        Assert.assertEquals(Iterables.size(results)
                 , Iterables.size(filterPcaps(pcapEntries, new Predicate<JSONObject>() {
                           @Override
                           public boolean apply(@Nullable JSONObject input) {
@@ -432,12 +432,12 @@ public class PcapTopologyIntegrationTest {
                 )
         );
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        PcapMerger.merge(baos, results);
+        PcapMerger.merge(baos, Iterables.partition(results, 1).iterator().next());
         Assert.assertTrue(baos.toByteArray().length > 0);
       }
       {
         //test with query filter
-        List<byte[]> results =
+        Iterable<byte[]> results =
                 job.query(new Path(outDir.getAbsolutePath())
                         , new Path(queryDir.getAbsolutePath())
                         , getTimestamp(0, pcapEntries)
@@ -449,8 +449,8 @@ public class PcapTopologyIntegrationTest {
                         , new QueryPcapFilter.Configurator()
                 );
         assertInOrder(results);
-        Assert.assertTrue(results.size() > 0);
-        Assert.assertEquals(results.size()
+        Assert.assertTrue(Iterables.size(results) > 0);
+        Assert.assertEquals(Iterables.size(results)
                 , Iterables.size(filterPcaps(pcapEntries, new Predicate<JSONObject>() {
                   @Override
                   public boolean apply(@Nullable JSONObject input) {
@@ -462,7 +462,7 @@ public class PcapTopologyIntegrationTest {
         );
         assertInOrder(results);
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        PcapMerger.merge(baos, results);
+        PcapMerger.merge(baos, Iterables.partition(results, 1).iterator().next());
         Assert.assertTrue(baos.toByteArray().length > 0);
       }
       System.out.println("Ended");

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e0c9970b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
index 4d6432e..bad22e4 100644
--- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
+++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.metron.common.Constants;
+import org.apache.metron.common.hadoop.SequenceFileIterable;
 import org.apache.metron.common.system.Clock;
 import org.apache.metron.common.utils.timestamp.TimestampConverters;
 import org.apache.metron.pcap.filter.fixed.FixedPcapFilter;
@@ -38,14 +39,12 @@ import java.io.PrintStream;
 import java.nio.charset.StandardCharsets;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.EnumMap;
-import java.util.List;
+import java.util.*;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class PcapCliTest {
@@ -71,13 +70,15 @@ public class PcapCliTest {
             "-ip_dst_addr", "192.168.1.2",
             "-ip_src_port", "8081",
             "-ip_dst_port", "8082",
-            "-protocol", "6",
-            "-num_reducers", "10"
+            "-protocol", "6"
     };
     List<byte[]> pcaps = Arrays.asList(new byte[][]{asBytes("abc"), asBytes("def"), asBytes("ghi")});
+    Iterator iterator = pcaps.iterator();
+    SequenceFileIterable iterable = mock(SequenceFileIterable.class);
+    when(iterable.iterator()).thenReturn(iterator);
 
-    Path base_path = new Path(CliConfig.BASE_PATH_DEFAULT);
-    Path base_output_path = new Path(CliConfig.BASE_OUTPUT_PATH_DEFAULT);
+    Path base_path = new Path(CliParser.BASE_PATH_DEFAULT);
+    Path base_output_path = new Path(CliParser.BASE_OUTPUT_PATH_DEFAULT);
     EnumMap<Constants.Fields, String> query = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
       put(Constants.Fields.SRC_ADDR, "192.168.1.1");
       put(Constants.Fields.DST_ADDR, "192.168.1.2");
@@ -87,7 +88,7 @@ public class PcapCliTest {
       put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "false");
     }};
 
-    when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(pcaps);
+    when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(iterable);
     when(clock.currentTimeFormatted("yyyyMMddHHmmssSSSZ")).thenReturn("20160615183527162+0000");
 
     PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock);
@@ -109,9 +110,13 @@ public class PcapCliTest {
             "-ip_dst_port", "8082",
             "-protocol", "6",
             "-include_reverse",
-            "-num_reducers", "10"
+            "-num_reducers", "10",
+            "-records_per_file", "1000"
     };
     List<byte[]> pcaps = Arrays.asList(new byte[][]{asBytes("abc"), asBytes("def"), asBytes("ghi")});
+    Iterator iterator = pcaps.iterator();
+    SequenceFileIterable iterable = mock(SequenceFileIterable.class);
+    when(iterable.iterator()).thenReturn(iterator);
 
     Path base_path = new Path("/base/path");
     Path base_output_path = new Path("/base/output/path");
@@ -124,7 +129,7 @@ public class PcapCliTest {
       put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "true");
     }};
 
-    when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(pcaps);
+    when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(iterable);
     when(clock.currentTimeFormatted("yyyyMMddHHmmssSSSZ")).thenReturn("20160615183527162+0000");
 
     PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock);
@@ -147,9 +152,13 @@ public class PcapCliTest {
             "-ip_dst_port", "8082",
             "-protocol", "6",
             "-include_reverse",
-            "-num_reducers", "10"
+            "-num_reducers", "10",
+            "-records_per_file", "1000"
     };
     List<byte[]> pcaps = Arrays.asList(new byte[][]{asBytes("abc"), asBytes("def"), asBytes("ghi")});
+    Iterator iterator = pcaps.iterator();
+    SequenceFileIterable iterable = mock(SequenceFileIterable.class);
+    when(iterable.iterator()).thenReturn(iterator);
 
     Path base_path = new Path("/base/path");
     Path base_output_path = new Path("/base/output/path");
@@ -164,7 +173,7 @@ public class PcapCliTest {
 
     long startAsNanos = asNanos("2016-06-13-18:35.00", "yyyy-MM-dd-HH:mm.ss");
     long endAsNanos = asNanos("2016-06-15-18:35.00", "yyyy-MM-dd-HH:mm.ss");
-    when(jobRunner.query(eq(base_path), eq(base_output_path), eq(startAsNanos), eq(endAsNanos), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(pcaps);
+    when(jobRunner.query(eq(base_path), eq(base_output_path), eq(startAsNanos), eq(endAsNanos), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(iterable);
     when(clock.currentTimeFormatted("yyyyMMddHHmmssSSSZ")).thenReturn("20160615183527162+0000");
 
     PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock);
@@ -187,16 +196,18 @@ public class PcapCliTest {
     String[] args = {
             "query",
             "-start_time", "500",
-            "-num_reducers", "10",
             "-query", "some query string"
     };
     List<byte[]> pcaps = Arrays.asList(new byte[][]{asBytes("abc"), asBytes("def"), asBytes("ghi")});
+    Iterator iterator = pcaps.iterator();
+    SequenceFileIterable iterable = mock(SequenceFileIterable.class);
+    when(iterable.iterator()).thenReturn(iterator);
 
-    Path base_path = new Path(CliConfig.BASE_PATH_DEFAULT);
-    Path base_output_path = new Path(CliConfig.BASE_OUTPUT_PATH_DEFAULT);
+    Path base_path = new Path(CliParser.BASE_PATH_DEFAULT);
+    Path base_output_path = new Path(CliParser.BASE_OUTPUT_PATH_DEFAULT);
     String query = "some query string";
 
-    when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(QueryPcapFilter.Configurator.class))).thenReturn(pcaps);
+    when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(QueryPcapFilter.Configurator.class))).thenReturn(iterable);
     when(clock.currentTimeFormatted("yyyyMMddHHmmssSSSZ")).thenReturn("20160615183527162+0000");
 
     PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock);
@@ -213,15 +224,19 @@ public class PcapCliTest {
             "-num_reducers", "10",
             "-base_path", "/base/path",
             "-base_output_path", "/base/output/path",
-            "-query", "some query string"
+            "-query", "some query string",
+            "-records_per_file", "1000"
     };
     List<byte[]> pcaps = Arrays.asList(new byte[][]{asBytes("abc"), asBytes("def"), asBytes("ghi")});
+    Iterator iterator = pcaps.iterator();
+    SequenceFileIterable iterable = mock(SequenceFileIterable.class);
+    when(iterable.iterator()).thenReturn(iterator);
 
     Path base_path = new Path("/base/path");
     Path base_output_path = new Path("/base/output/path");
     String query = "some query string";
 
-    when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(QueryPcapFilter.Configurator.class))).thenReturn(pcaps);
+    when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(QueryPcapFilter.Configurator.class))).thenReturn(iterable);
     when(clock.currentTimeFormatted("yyyyMMddHHmmssSSSZ")).thenReturn("20160615183527162+0000");
 
     PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock);
@@ -229,54 +244,76 @@ public class PcapCliTest {
     Mockito.verify(resultsWriter).write(pcaps, "pcap-data-20160615183527162+0000.pcap");
   }
 
+  // INVALID OPTION CHECKS
+
   @Test
   public void invalid_fixed_filter_arg_prints_help() throws Exception {
+    String[] args = {
+            "fixed",
+            "-start_time", "500",
+            "-end_time", "1000",
+            "-num_reducers", "10",
+            "-base_path", "/base/path",
+            "-base_output_path", "/base/output/path",
+            "-query", "THIS IS AN ERROR"
+    };
+    assertCliError(args, "Fixed", "Unrecognized option: -query");
+  }
+
+  /**
+   *
+   * @param args PcapJob args
+   * @param type Fixed|Query
+   * @param optMsg Expected error message
+   */
+  public void assertCliError(String[] args, String type, String optMsg) {
     PrintStream originalOutStream = System.out;
+    PrintStream originalErrOutStream = System.err;
     try {
       ByteArrayOutputStream bos = new ByteArrayOutputStream();
-      PrintStream testStream = new PrintStream(new BufferedOutputStream(bos));
-      System.setOut(testStream);
-      String[] args = {
-              "fixed",
-              "-start_time", "500",
-              "-end_time", "1000",
-              "-num_reducers", "10",
-              "-base_path", "/base/path",
-              "-base_output_path", "/base/output/path",
-              "-query", "THIS IS AN ERROR"
-      };
+      PrintStream outStream = new PrintStream(new BufferedOutputStream(bos));
+      System.setOut(outStream);
+
+      ByteArrayOutputStream ebos = new ByteArrayOutputStream();
+      PrintStream errOutStream = new PrintStream(new BufferedOutputStream(ebos));
+      System.setErr(errOutStream);
 
       PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock);
       assertThat("Expect errors on run", cli.run(args), equalTo(-1));
-      assertThat(bos.toString(), bos.toString().contains("usage: Fixed filter options"), equalTo(true));
+      assertThat("Expect missing required option error: " + ebos.toString(), ebos.toString().contains(optMsg), equalTo(true));
+      assertThat("Expect usage to be printed: " + bos.toString(), bos.toString().contains("usage: " + type + " filter options"), equalTo(true));
     } finally {
       System.setOut(originalOutStream);
+      System.setErr(originalErrOutStream);
     }
   }
 
   @Test
   public void invalid_query_filter_arg_prints_help() throws Exception {
-    PrintStream originalOutStream = System.out;
-    try {
-      ByteArrayOutputStream bos = new ByteArrayOutputStream();
-      PrintStream outStream = new PrintStream(new BufferedOutputStream(bos));
-      System.setOut(outStream);
-      String[] args = {
-              "query",
-              "-start_time", "500",
-              "-end_time", "1000",
-              "-num_reducers", "10",
-              "-base_path", "/base/path",
-              "-base_output_path", "/base/output/path",
-              "-ip_src_addr", "THIS IS AN ERROR"
-      };
+    String[] args = {
+            "query",
+            "-start_time", "500",
+            "-end_time", "1000",
+            "-num_reducers", "10",
+            "-base_path", "/base/path",
+            "-base_output_path", "/base/output/path",
+            "-ip_src_addr", "THIS IS AN ERROR"
+    };
+    assertCliError(args, "Query", "");
+  }
 
-      PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock);
-      assertThat("Expect errors on run", cli.run(args), equalTo(-1));
-      assertThat(bos.toString(), bos.toString().contains("usage: Query filter options"), equalTo(true));
-    } finally {
-      System.setOut(originalOutStream);
-    }
+  @Test
+  public void missing_start_time_arg_prints_error_and_help() throws Exception {
+    String[] args = {
+            "fixed",
+            "-ip_src_addr", "192.168.1.1",
+            "-ip_dst_addr", "192.168.1.2",
+            "-ip_src_port", "8081",
+            "-ip_dst_port", "8082",
+            "-protocol", "6",
+            "-num_reducers", "10"
+    };
+    assertCliError(args, "Fixed", "Missing required option: st");
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e0c9970b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
index cce4074..f874620 100644
--- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.Partitioner;
@@ -35,6 +34,7 @@ import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.log4j.Logger;
+import org.apache.metron.common.hadoop.SequenceFileIterable;
 import org.apache.metron.pcap.PacketInfo;
 import org.apache.metron.pcap.PcapHelper;
 import org.apache.metron.pcap.filter.PcapFilter;
@@ -64,6 +64,11 @@ public class PcapJob {
       }
       long x = longWritable.get();
       int ret = (int)Long.divideUnsigned(x - start, width);
+      if(ret > numPartitions) {
+        throw new IllegalArgumentException(String.format("Bad partition: key=%s, width=%d, partition=%d, numPartitions=%d"
+                , Long.toUnsignedString(x), width, ret, numPartitions)
+            );
+      }
       return ret;
     }
 
@@ -176,32 +181,26 @@ public class PcapJob {
     return ret;
   }
 
-  private List<byte[]> readResults(Path outputPath, Configuration config, FileSystem fs) throws IOException {
-    List<byte[]> ret = new ArrayList<>();
-    for(RemoteIterator<LocatedFileStatus> it= fs.listFiles(outputPath, false);it.hasNext();) {
+  /**
+   * Returns a lazily-read Iterable over a set of sequence files
+   */
+  private SequenceFileIterable readResults(Path outputPath, Configuration config, FileSystem fs) throws IOException {
+    List<Path> files = new ArrayList<>();
+    for (RemoteIterator<LocatedFileStatus> it = fs.listFiles(outputPath, false); it.hasNext(); ) {
       Path p = it.next().getPath();
-      if(p.getName().equals("_SUCCESS")) {
+      if (p.getName().equals("_SUCCESS")) {
         fs.delete(p, false);
         continue;
       }
-      SequenceFile.Reader reader = new SequenceFile.Reader(config,
-            SequenceFile.Reader.file(p));
-      LongWritable key = new LongWritable();
-      BytesWritable value = new BytesWritable();
-      while(reader.next(key, value)) {
-        ret.add(value.copyBytes());
-      }
-      reader.close();
-      fs.delete(p, false);
+      files.add(p);
     }
-    fs.delete(outputPath, false);
-    if(LOG.isDebugEnabled()) {
-      LOG.debug(outputPath + ": Returning " + ret.size());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(outputPath);
     }
-    return ret;
+    return new SequenceFileIterable(files, config);
   }
 
-  public <T> List<byte[]> query(Path basePath
+  public <T> SequenceFileIterable query(Path basePath
                             , Path baseOutputPath
                             , long beginNS
                             , long endNS
@@ -240,12 +239,10 @@ public class PcapJob {
     }
   }
 
-
-  public static int findWidth(long start, long end, int numReducers) {
-    return (int)Long.divideUnsigned(end - start, numReducers) + 1;
+  public static long findWidth(long start, long end, int numReducers) {
+    return Long.divideUnsigned(end - start, numReducers) + 1;
   }
 
-
   public <T> Job createJob( Path basePath
                       , Path outputPath
                       , long beginNS

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e0c9970b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d7e373d..659a467 100644
--- a/pom.xml
+++ b/pom.xml
@@ -202,7 +202,7 @@
                         <exclude>metron-ui/lib/public/font/**</exclude>
                         <exclude>metron-ui/node_modules/**</exclude>
 			<!-- pickle file - binary format -->
-			<exclude>metron-deployment/packaging/ambari/src/main/resources/common-services/KIBANA/4.5.1/package/scripts/dashboard/*.p</exclude>
+                        <exclude>**/packaging/ambari/src/main/resources/common-services/KIBANA/4.5.1/package/scripts/dashboard/*.p</exclude>
                     </excludes>
                 </configuration>
             </plugin>


Mime
View raw message