Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 53BEA200B87 for ; Mon, 19 Sep 2016 15:04:54 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 51D1F160ACC; Mon, 19 Sep 2016 13:04:54 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 5F389160ABC for ; Mon, 19 Sep 2016 15:04:52 +0200 (CEST) Received: (qmail 65874 invoked by uid 500); 19 Sep 2016 13:04:51 -0000 Mailing-List: contact commits-help@metron.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@metron.incubator.apache.org Delivered-To: mailing list commits@metron.incubator.apache.org Received: (qmail 65865 invoked by uid 99); 19 Sep 2016 13:04:51 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 19 Sep 2016 13:04:51 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 19968C9D25 for ; Mon, 19 Sep 2016 13:04:51 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.324 X-Spam-Level: X-Spam-Status: No, score=-4.324 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RP_MATCHES_RCVD=-1.124] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id poejglDFiTI3 for ; Mon, 19 Sep 2016 13:04:47 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id D7B2C5F24B for ; Mon, 19 Sep 2016 13:04:45 +0000 (UTC) Received: (qmail 65824 invoked by uid 99); 19 Sep 2016 13:04:45 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 19 Sep 2016 13:04:45 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id EEBB3E0158; Mon, 19 Sep 2016 13:04:44 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: cestella@apache.org To: commits@metron.incubator.apache.org Message-Id: <7227206673184435a1b6c81c81e8c620@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: incubator-metron git commit: METRON-257 Enable pcap result pagination from the Pcap CLI (mmiklavc via cestella) closes apache/incubator-metron#256 Date: Mon, 19 Sep 2016 13:04:44 +0000 (UTC) archived-at: Mon, 19 Sep 2016 13:04:54 -0000 Repository: incubator-metron Updated Branches: refs/heads/master 095313255 -> e0c9970b1 METRON-257 Enable pcap result pagination from the Pcap CLI (mmiklavc via cestella) closes apache/incubator-metron#256 Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/e0c9970b Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/e0c9970b Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/e0c9970b Branch: refs/heads/master Commit: e0c9970b1f467135f01ee212fb2a8d7f2d61de8b Parents: 0953132 Author: mmiklavc Authored: Mon Sep 19 09:04:32 2016 -0400 Committer: cstella Committed: Mon Sep 19 09:04:32 2016 -0400 ---------------------------------------------------------------------- .../pcapservice/PcapReceiverImplRestEasy.java | 38 +++-- .../PcapReceiverImplRestEasyTest.java | 4 +- .../common/hadoop/SequenceFileIterable.java | 139 +++++++++++++++++++ .../org/apache/metron/pcap/query/CliConfig.java | 18 ++- .../org/apache/metron/pcap/query/CliParser.java | 24 +++- .../org/apache/metron/pcap/query/PcapCli.java | 31 +++-- .../org/apache/metron/pcap/PcapJobTest.java | 34 ++++- .../PcapTopologyIntegrationTest.java | 48 +++---- .../apache/metron/pcap/query/PcapCliTest.java | 139 ++++++++++++------- .../java/org/apache/metron/pcap/mr/PcapJob.java | 43 +++--- pom.xml | 2 +- 11 files changed, 383 insertions(+), 137 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e0c9970b/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java b/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java index 18b5dc9..5a2a0ae 100644 --- a/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java +++ b/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java @@ -19,11 +19,13 @@ package org.apache.metron.pcapservice; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; +import com.google.common.collect.Lists; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.log4j.Logger; import org.apache.metron.common.Constants; +import org.apache.metron.common.hadoop.SequenceFileIterable; import org.apache.metron.common.utils.timestamp.TimestampConverters; import org.apache.metron.pcap.filter.fixed.FixedPcapFilter; import org.apache.metron.pcap.filter.query.QueryPcapFilter; @@ -120,6 +122,7 @@ public class PcapReceiverImplRestEasy { throws IOException { PcapsResponse response = new PcapsResponse(); + SequenceFileIterable results = null; try { if (startTime < 0) { startTime = 0L; @@ -137,7 +140,7 @@ public class PcapReceiverImplRestEasy { if(LOGGER.isDebugEnabled()) { LOGGER.debug("Query received: " + query); } - response.setPcaps(getQueryUtil().query(new org.apache.hadoop.fs.Path(ConfigurationUtil.getPcapOutputPath()) + results = getQueryUtil().query(new org.apache.hadoop.fs.Path(ConfigurationUtil.getPcapOutputPath()) , new org.apache.hadoop.fs.Path(ConfigurationUtil.getTempQueryOutputPath()) , startTime , endTime @@ -146,13 +149,17 @@ public class PcapReceiverImplRestEasy { , CONFIGURATION.get() , FileSystem.get(CONFIGURATION.get()) , new QueryPcapFilter.Configurator() - ) ); + response.setPcaps(results != null ? Lists.newArrayList(results) : null); } catch (Exception e) { LOGGER.error("Exception occurred while fetching Pcaps by identifiers :", e); throw new WebApplicationException("Unable to fetch Pcaps via MR job", e); + } finally { + if (null != results) { + results.cleanup(); + } } // return http status '200 OK' along with the complete pcaps response file, @@ -205,6 +212,7 @@ public class PcapReceiverImplRestEasy { final boolean includeReverseTrafficF = includeReverseTraffic; PcapsResponse response = new PcapsResponse(); + SequenceFileIterable results = null; try { if(startTime < 0) { startTime = 0L; @@ -237,22 +245,26 @@ public class PcapReceiverImplRestEasy { if(LOGGER.isDebugEnabled()) { LOGGER.debug("Query received: " + Joiner.on(",").join(query.entrySet())); } - response.setPcaps(getQueryUtil().query(new org.apache.hadoop.fs.Path(ConfigurationUtil.getPcapOutputPath()) - , new org.apache.hadoop.fs.Path(ConfigurationUtil.getTempQueryOutputPath()) - , startTime - , endTime - , numReducers - , query - , CONFIGURATION.get() - , FileSystem.get(CONFIGURATION.get()) - , new FixedPcapFilter.Configurator() - ) - ); + results = getQueryUtil().query(new org.apache.hadoop.fs.Path(ConfigurationUtil.getPcapOutputPath()) + , new org.apache.hadoop.fs.Path(ConfigurationUtil.getTempQueryOutputPath()) + , startTime + , endTime + , numReducers + , query + , CONFIGURATION.get() + , FileSystem.get(CONFIGURATION.get()) + , new FixedPcapFilter.Configurator() + ); + response.setPcaps(results != null ? Lists.newArrayList(results) : null); } catch (Exception e) { LOGGER.error("Exception occurred while fetching Pcaps by identifiers :", e); throw new WebApplicationException("Unable to fetch Pcaps via MR job", e); + } finally { + if (null != results) { + results.cleanup(); + } } // return http status '200 OK' along with the complete pcaps response file, http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e0c9970b/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasyTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasyTest.java b/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasyTest.java index 1c1c236..dba87cf 100644 --- a/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasyTest.java +++ b/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasyTest.java @@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.metron.common.Constants; +import org.apache.metron.common.hadoop.SequenceFileIterable; import org.apache.metron.common.utils.timestamp.TimestampConverters; import org.apache.metron.pcap.filter.PcapFilterConfigurator; import org.apache.metron.pcap.mr.PcapJob; @@ -31,7 +32,6 @@ import org.junit.Test; import java.io.IOException; import java.util.EnumMap; -import java.util.List; public class PcapReceiverImplRestEasyTest { @@ -44,7 +44,7 @@ public class PcapReceiverImplRestEasyTest { PcapFilterConfigurator filterImpl; @Override - public List query( Path basePath + public SequenceFileIterable query(Path basePath , Path baseOutputPath , long beginNS , long endNS http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e0c9970b/metron-platform/metron-common/src/main/java/org/apache/metron/common/hadoop/SequenceFileIterable.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/hadoop/SequenceFileIterable.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/hadoop/SequenceFileIterable.java new file mode 100644 index 0000000..a57cd35 --- /dev/null +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/hadoop/SequenceFileIterable.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.common.hadoop; + +import com.google.common.collect.Iterators; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +import static java.lang.String.format; + +public class SequenceFileIterable implements Iterable { + private static final Logger LOGGER = Logger.getLogger(SequenceFileIterable.class); + private List files; + private Configuration config; + + public SequenceFileIterable(List files, Configuration config) { + this.files = files; + this.config = config; + } + + @Override + public Iterator iterator() { + return Iterators.concat(getIterators(files, config)); + } + + private Iterator[] getIterators(List files, Configuration config) { + return files.stream().map(f -> new SequenceFileIterator(f, config)).toArray(Iterator[]::new); + } + + /** + * Cleans up all files read by this Iterable + * + * @return true if success, false if any files were not deleted + * @throws IOException + */ + public boolean cleanup() throws IOException { + FileSystem fileSystem = FileSystem.get(config); + boolean success = true; + for (Path file : files) { + success &= fileSystem.delete(file, false); + } + return success; + } + + private static class SequenceFileIterator implements Iterator { + private Path path; + private Configuration config; + private SequenceFile.Reader reader; + private LongWritable key = new LongWritable(); + private BytesWritable value = new BytesWritable(); + private byte[] next; + private boolean finished = false; + + public SequenceFileIterator(Path path, Configuration config) { + this.path = path; + this.config = config; + } + + @Override + public boolean hasNext() { + if (!finished && null == reader) { + try { + reader = new SequenceFile.Reader(config, SequenceFile.Reader.file(path)); + LOGGER.debug("Writing file: " + path.toString()); + } catch (IOException e) { + throw new RuntimeException("Failed to get reader", e); + } + } else { + LOGGER.debug(format("finished=%s, reader=%s, next=%s", finished, reader, next)); + } + try { + //ensure hasnext is idempotent + if (!finished) { + if (null == next && reader.next(key, value)) { + next = value.copyBytes(); + } else if (null == next) { + close(); + } + } + } catch (IOException e) { + close(); + throw new RuntimeException("Failed to get next record", e); + } + return (null != next); + } + + private void close() { + LOGGER.debug("Closing file: " + path.toString()); + finished = true; + try { + if (reader != null) { + reader.close(); + reader = null; + } + } catch (IOException e) { + // ah well, we tried... + LOGGER.warn("Error closing file", e); + } + } + + @Override + public byte[] next() { + byte[] ret = null; + if (hasNext()) { + ret = next; + next = null; //don't want same record more than once + } else { + throw new NoSuchElementException("No more records"); + } + return ret; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e0c9970b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliConfig.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliConfig.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliConfig.java index f8ab0ac..294844f 100644 --- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliConfig.java +++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliConfig.java @@ -23,22 +23,22 @@ import java.text.DateFormat; import java.text.SimpleDateFormat; public class CliConfig { - public static final String BASE_PATH_DEFAULT = "/apps/metron/pcap"; - public static final String BASE_OUTPUT_PATH_DEFAULT = "/tmp"; private boolean showHelp; private String basePath; private String baseOutputPath; private long startTime; private long endTime; - private int numReducers = 0; + private int numReducers; + private int numRecordsPerFile; private DateFormat dateFormat; public CliConfig() { showHelp = false; - basePath = BASE_PATH_DEFAULT; - baseOutputPath = BASE_OUTPUT_PATH_DEFAULT; + basePath = ""; + baseOutputPath = ""; startTime = -1L; endTime = -1L; + numReducers = 0; } public int getNumReducers() { @@ -100,4 +100,12 @@ public class CliConfig { public void setNumReducers(int numReducers) { this.numReducers = numReducers; } + + public int getNumRecordsPerFile() { + return numRecordsPerFile; + } + + public void setNumRecordsPerFile(int numRecordsPerFile) { + this.numRecordsPerFile = numRecordsPerFile; + } } http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e0c9970b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java index ea6f8e7..83e9fcf 100644 --- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java +++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java @@ -24,6 +24,10 @@ import org.apache.commons.cli.*; * Provides commmon required fields for the PCAP filter jobs */ public class CliParser { + public static final String BASE_PATH_DEFAULT = "/apps/metron/pcap"; + public static final String BASE_OUTPUT_PATH_DEFAULT = "/tmp"; + public static final int NUM_REDUCERS_DEFAULT = 10; + public static final int NUM_RECORDS_PER_FILE_DEFAULT = 10000; private CommandLineParser parser; public CliParser() { @@ -33,10 +37,11 @@ public class CliParser { public Options buildOptions() { Options options = new Options(); options.addOption(newOption("h", "help", false, "Display help")); - options.addOption(newOption("bp", "base_path", true, String.format("Base PCAP data path. Default is '%s'", CliConfig.BASE_PATH_DEFAULT))); - options.addOption(newOption("bop", "base_output_path", true, String.format("Query result output path. Default is '%s'", CliConfig.BASE_OUTPUT_PATH_DEFAULT))); + options.addOption(newOption("bp", "base_path", true, String.format("Base PCAP data path. Default is '%s'", BASE_PATH_DEFAULT))); + options.addOption(newOption("bop", "base_output_path", true, String.format("Query result output path. Default is '%s'", BASE_OUTPUT_PATH_DEFAULT))); options.addOption(newOption("st", "start_time", true, "(required) Packet start time range.", true)); - options.addOption(newOption("nr", "num_reducers", true, "Number of reducers to use", true)); + options.addOption(newOption("nr", "num_reducers", true, String.format("Number of reducers to use (defaults to %s)", NUM_REDUCERS_DEFAULT))); + options.addOption(newOption("rpf", "records_per_file", true, String.format("Number of records to include in each output pcap file (defaults to %s)", NUM_RECORDS_PER_FILE_DEFAULT))); options.addOption(newOption("et", "end_time", true, "Packet end time range. Default is current system time.")); options.addOption(newOption("df", "date_format", true, "Date format to use for parsing start_time and end_time. Default is to use time in millis since the epoch.")); return options; @@ -61,9 +66,13 @@ public class CliParser { } if (commandLine.hasOption("base_path")) { config.setBasePath(commandLine.getOptionValue("base_path")); + } else { + config.setBasePath(BASE_PATH_DEFAULT); } if (commandLine.hasOption("base_output_path")) { config.setBaseOutputPath(commandLine.getOptionValue("base_output_path")); + } else { + config.setBaseOutputPath(BASE_OUTPUT_PATH_DEFAULT); } if (commandLine.hasOption("start_time")) { try { @@ -83,7 +92,14 @@ public class CliParser { config.setNumReducers(numReducers); } else { - config.setNumReducers(10); + config.setNumReducers(NUM_REDUCERS_DEFAULT); + } + if (commandLine.hasOption("records_per_file")) { + int numRecordsPerFile = Integer.parseInt(commandLine.getOptionValue("records_per_file")); + config.setNumRecordsPerFile(numRecordsPerFile); + } + else { + config.setNumRecordsPerFile(NUM_RECORDS_PER_FILE_DEFAULT); } if (commandLine.hasOption("end_time")) { try { http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e0c9970b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java index d96e166..d2e6807 100644 --- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java +++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java @@ -17,12 +17,14 @@ */ package org.apache.metron.pcap.query; +import com.google.common.collect.Iterables; import org.apache.commons.cli.ParseException; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.metron.common.hadoop.SequenceFileIterable; import org.apache.metron.common.system.Clock; import org.apache.metron.common.utils.timestamp.TimestampConverters; import org.apache.metron.pcap.filter.fixed.FixedPcapFilter; @@ -32,7 +34,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -59,7 +60,7 @@ public class PcapCli { return -1; } String jobType = args[0]; - List results = new ArrayList<>(); + SequenceFileIterable results = null; String[] commandArgs = Arrays.copyOfRange(args, 1, args.length); Configuration hadoopConf = new Configuration(); String[] otherArgs = null; @@ -69,13 +70,16 @@ public class PcapCli { LOGGER.error("Failed to configure hadoop with provided options: " + e.getMessage(), e); return -1; } + CliConfig commonConfig = null; if ("fixed".equals(jobType)) { FixedCliParser fixedParser = new FixedCliParser(); FixedCliConfig config = null; try { config = fixedParser.parse(otherArgs); + commonConfig = config; } catch (ParseException | java.text.ParseException e) { System.err.println(e.getMessage()); + System.err.flush(); fixedParser.printHelp(); return -1; } @@ -110,6 +114,7 @@ public class PcapCli { QueryCliConfig config = null; try { config = queryParser.parse(otherArgs); + commonConfig = config; } catch (ParseException | java.text.ParseException e) { System.err.println(e.getMessage()); queryParser.printHelp(); @@ -145,18 +150,28 @@ public class PcapCli { printBasicHelp(); return -1; } - String timestamp = clock.currentTimeFormatted("yyyyMMddHHmmssSSSZ"); - String outFileName = String.format("pcap-data-%s.pcap", timestamp); try { - if(results.size() > 0) { - resultsWriter.write(results, outFileName); - } - else { + Iterable> partitions = Iterables.partition(results, commonConfig.getNumRecordsPerFile()); + if (partitions.iterator().hasNext()) { + for (List data : partitions) { + String timestamp = clock.currentTimeFormatted("yyyyMMddHHmmssSSSZ"); + String outFileName = String.format("pcap-data-%s.pcap", timestamp); + if(data.size() > 0) { + resultsWriter.write(data, outFileName); + } + } + } else { System.out.println("No results returned."); } } catch (IOException e) { LOGGER.error("Unable to write file", e); return -1; + } finally { + try { + results.cleanup(); + } catch(IOException e) { + LOGGER.warn("Unable to cleanup files in HDFS", e); + } } return 0; } http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e0c9970b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java index 17c9325..81725d8 100644 --- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java +++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java @@ -19,8 +19,11 @@ package org.apache.metron.pcap; import com.google.common.collect.Iterables; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; import org.apache.metron.common.utils.timestamp.TimestampConverters; import org.apache.metron.pcap.mr.PcapJob; import org.junit.Assert; @@ -30,6 +33,9 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import static java.lang.Long.toUnsignedString; +import static org.hamcrest.CoreMatchers.equalTo; + public class PcapJobTest { @Test @@ -48,6 +54,7 @@ public class PcapJobTest { Assert.assertTrue(Iterables.isEmpty(paths)); } } + @Test public void test_getPaths_leftEdge() throws Exception { PcapJob job; @@ -63,9 +70,10 @@ public class PcapJobTest { } }; Iterable paths = job.getPaths(null, null, 0, TimestampConverters.MILLISECONDS.toNanoseconds(System.currentTimeMillis())); - Assert.assertEquals(1,Iterables.size(paths)); + Assert.assertEquals(1, Iterables.size(paths)); } } + @Test public void test_getPaths_rightEdge() throws Exception { PcapJob job; @@ -80,8 +88,8 @@ public class PcapJobTest { return inputFiles; } }; - Iterable paths = job.getPaths(null, null, 1461589333993573000L-1L, 1461589333993573000L + 1L); - Assert.assertEquals(2,Iterables.size(paths)); + Iterable paths = job.getPaths(null, null, 1461589333993573000L - 1L, 1461589333993573000L + 1L); + Assert.assertEquals(2, Iterables.size(paths)); } { final List inputFiles = new ArrayList() {{ @@ -95,10 +103,11 @@ public class PcapJobTest { return inputFiles; } }; - Iterable paths = job.getPaths(null, null, 1461589334993573000L-1L, 1461589334993573000L + 1L); - Assert.assertEquals(2,Iterables.size(paths)); + Iterable paths = job.getPaths(null, null, 1461589334993573000L - 1L, 1461589334993573000L + 1L); + Assert.assertEquals(2, Iterables.size(paths)); } } + @Test public void test_getPaths_bothEdges() throws Exception { PcapJob job; @@ -115,7 +124,20 @@ public class PcapJobTest { } }; Iterable paths = job.getPaths(null, null, 0, TimestampConverters.MILLISECONDS.toNanoseconds(System.currentTimeMillis())); - Assert.assertEquals(3,Iterables.size(paths)); + Assert.assertEquals(3, Iterables.size(paths)); } } + + @Test + public void partition_gives_value_in_range() throws Exception { + long start = 1473897600000000000L; + long end = TimestampConverters.MILLISECONDS.toNanoseconds(1473995927455L); + Configuration conf = new Configuration(); + conf.set(PcapJob.START_TS_CONF, toUnsignedString(start)); + conf.set(PcapJob.END_TS_CONF, toUnsignedString(end)); + conf.set(PcapJob.WIDTH_CONF, "" + PcapJob.findWidth(start, end, 10)); + PcapJob.PcapPartitioner partitioner = new PcapJob.PcapPartitioner(); + partitioner.setConf(conf); + Assert.assertThat("Partition not in range", partitioner.getPartition(new LongWritable(1473978789181189000L), new BytesWritable(), 10), equalTo(8)); + } } http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e0c9970b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java index d4367ea..0dd07aa 100644 --- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java +++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java @@ -271,7 +271,7 @@ public class PcapTopologyIntegrationTest { PcapJob job = new PcapJob(); { //Ensure that only two pcaps are returned when we look at 4 and 5 - List results = + Iterable results = job.query(new Path(outDir.getAbsolutePath()) , new Path(queryDir.getAbsolutePath()) , getTimestamp(4, pcapEntries) @@ -283,12 +283,12 @@ public class PcapTopologyIntegrationTest { , new FixedPcapFilter.Configurator() ); assertInOrder(results); - Assert.assertEquals(results.size(), 2); + Assert.assertEquals(Iterables.size(results), 2); } { // Ensure that only two pcaps are returned when we look at 4 and 5 // test with empty query filter - List results = + Iterable results = job.query(new Path(outDir.getAbsolutePath()) , new Path(queryDir.getAbsolutePath()) , getTimestamp(4, pcapEntries) @@ -300,11 +300,11 @@ public class PcapTopologyIntegrationTest { , new QueryPcapFilter.Configurator() ); assertInOrder(results); - Assert.assertEquals(results.size(), 2); + Assert.assertEquals(Iterables.size(results), 2); } { //ensure that none get returned since that destination IP address isn't in the dataset - List results = + Iterable results = job.query(new Path(outDir.getAbsolutePath()) , new Path(queryDir.getAbsolutePath()) , getTimestamp(0, pcapEntries) @@ -318,12 +318,12 @@ public class PcapTopologyIntegrationTest { , new FixedPcapFilter.Configurator() ); assertInOrder(results); - Assert.assertEquals(results.size(), 0); + Assert.assertEquals(Iterables.size(results), 0); } { // ensure that none get returned since that destination IP address isn't in the dataset // test with query filter - List results = + Iterable results = job.query(new Path(outDir.getAbsolutePath()) , new Path(queryDir.getAbsolutePath()) , getTimestamp(0, pcapEntries) @@ -335,11 +335,11 @@ public class PcapTopologyIntegrationTest { , new QueryPcapFilter.Configurator() ); assertInOrder(results); - Assert.assertEquals(results.size(), 0); + Assert.assertEquals(Iterables.size(results), 0); } { //same with protocol as before with the destination addr - List results = + Iterable results = job.query(new Path(outDir.getAbsolutePath()) , new Path(queryDir.getAbsolutePath()) , getTimestamp(0, pcapEntries) @@ -353,12 +353,12 @@ public class PcapTopologyIntegrationTest { , new FixedPcapFilter.Configurator() ); assertInOrder(results); - Assert.assertEquals(results.size(), 0); + Assert.assertEquals(Iterables.size(results), 0); } { //same with protocol as before with the destination addr //test with query filter - List results = + Iterable results = job.query(new Path(outDir.getAbsolutePath()) , new Path(queryDir.getAbsolutePath()) , getTimestamp(0, pcapEntries) @@ -370,11 +370,11 @@ public class PcapTopologyIntegrationTest { , new QueryPcapFilter.Configurator() ); assertInOrder(results); - Assert.assertEquals(results.size(), 0); + Assert.assertEquals(Iterables.size(results), 0); } { //make sure I get them all. - List results = + Iterable results = job.query(new Path(outDir.getAbsolutePath()) , new Path(queryDir.getAbsolutePath()) , getTimestamp(0, pcapEntries) @@ -386,12 +386,12 @@ public class PcapTopologyIntegrationTest { , new FixedPcapFilter.Configurator() ); assertInOrder(results); - Assert.assertEquals(results.size(), pcapEntries.size()); + Assert.assertEquals(Iterables.size(results), pcapEntries.size()); } { //make sure I get them all. //with query filter - List results = + Iterable results = job.query(new Path(outDir.getAbsolutePath()) , new Path(queryDir.getAbsolutePath()) , getTimestamp(0, pcapEntries) @@ -403,10 +403,10 @@ public class PcapTopologyIntegrationTest { , new QueryPcapFilter.Configurator() ); assertInOrder(results); - Assert.assertEquals(results.size(), pcapEntries.size()); + Assert.assertEquals(Iterables.size(results), pcapEntries.size()); } { - List results = + Iterable results = job.query(new Path(outDir.getAbsolutePath()) , new Path(queryDir.getAbsolutePath()) , getTimestamp(0, pcapEntries) @@ -420,8 +420,8 @@ public class PcapTopologyIntegrationTest { , new FixedPcapFilter.Configurator() ); assertInOrder(results); - Assert.assertTrue(results.size() > 0); - Assert.assertEquals(results.size() + Assert.assertTrue(Iterables.size(results) > 0); + Assert.assertEquals(Iterables.size(results) , Iterables.size(filterPcaps(pcapEntries, new Predicate() { @Override public boolean apply(@Nullable JSONObject input) { @@ -432,12 +432,12 @@ public class PcapTopologyIntegrationTest { ) ); ByteArrayOutputStream baos = new ByteArrayOutputStream(); - PcapMerger.merge(baos, results); + PcapMerger.merge(baos, Iterables.partition(results, 1).iterator().next()); Assert.assertTrue(baos.toByteArray().length > 0); } { //test with query filter - List results = + Iterable results = job.query(new Path(outDir.getAbsolutePath()) , new Path(queryDir.getAbsolutePath()) , getTimestamp(0, pcapEntries) @@ -449,8 +449,8 @@ public class PcapTopologyIntegrationTest { , new QueryPcapFilter.Configurator() ); assertInOrder(results); - Assert.assertTrue(results.size() > 0); - Assert.assertEquals(results.size() + Assert.assertTrue(Iterables.size(results) > 0); + Assert.assertEquals(Iterables.size(results) , Iterables.size(filterPcaps(pcapEntries, new Predicate() { @Override public boolean apply(@Nullable JSONObject input) { @@ -462,7 +462,7 @@ public class PcapTopologyIntegrationTest { ); assertInOrder(results); ByteArrayOutputStream baos = new ByteArrayOutputStream(); - PcapMerger.merge(baos, results); + PcapMerger.merge(baos, Iterables.partition(results, 1).iterator().next()); Assert.assertTrue(baos.toByteArray().length > 0); } System.out.println("Ended"); http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e0c9970b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java index 4d6432e..bad22e4 100644 --- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java +++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java @@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.metron.common.Constants; +import org.apache.metron.common.hadoop.SequenceFileIterable; import org.apache.metron.common.system.Clock; import org.apache.metron.common.utils.timestamp.TimestampConverters; import org.apache.metron.pcap.filter.fixed.FixedPcapFilter; @@ -38,14 +39,12 @@ import java.io.PrintStream; import java.nio.charset.StandardCharsets; import java.text.ParseException; import java.text.SimpleDateFormat; -import java.util.Arrays; -import java.util.Date; -import java.util.EnumMap; -import java.util.List; +import java.util.*; import static org.hamcrest.CoreMatchers.equalTo; import static org.junit.Assert.assertThat; import static org.mockito.Matchers.*; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class PcapCliTest { @@ -71,13 +70,15 @@ public class PcapCliTest { "-ip_dst_addr", "192.168.1.2", "-ip_src_port", "8081", "-ip_dst_port", "8082", - "-protocol", "6", - "-num_reducers", "10" + "-protocol", "6" }; List pcaps = Arrays.asList(new byte[][]{asBytes("abc"), asBytes("def"), asBytes("ghi")}); + Iterator iterator = pcaps.iterator(); + SequenceFileIterable iterable = mock(SequenceFileIterable.class); + when(iterable.iterator()).thenReturn(iterator); - Path base_path = new Path(CliConfig.BASE_PATH_DEFAULT); - Path base_output_path = new Path(CliConfig.BASE_OUTPUT_PATH_DEFAULT); + Path base_path = new Path(CliParser.BASE_PATH_DEFAULT); + Path base_output_path = new Path(CliParser.BASE_OUTPUT_PATH_DEFAULT); EnumMap query = new EnumMap(Constants.Fields.class) {{ put(Constants.Fields.SRC_ADDR, "192.168.1.1"); put(Constants.Fields.DST_ADDR, "192.168.1.2"); @@ -87,7 +88,7 @@ public class PcapCliTest { put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "false"); }}; - when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(pcaps); + when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(iterable); when(clock.currentTimeFormatted("yyyyMMddHHmmssSSSZ")).thenReturn("20160615183527162+0000"); PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock); @@ -109,9 +110,13 @@ public class PcapCliTest { "-ip_dst_port", "8082", "-protocol", "6", "-include_reverse", - "-num_reducers", "10" + "-num_reducers", "10", + "-records_per_file", "1000" }; List pcaps = Arrays.asList(new byte[][]{asBytes("abc"), asBytes("def"), asBytes("ghi")}); + Iterator iterator = pcaps.iterator(); + SequenceFileIterable iterable = mock(SequenceFileIterable.class); + when(iterable.iterator()).thenReturn(iterator); Path base_path = new Path("/base/path"); Path base_output_path = new Path("/base/output/path"); @@ -124,7 +129,7 @@ public class PcapCliTest { put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "true"); }}; - when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(pcaps); + when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(iterable); when(clock.currentTimeFormatted("yyyyMMddHHmmssSSSZ")).thenReturn("20160615183527162+0000"); PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock); @@ -147,9 +152,13 @@ public class PcapCliTest { "-ip_dst_port", "8082", "-protocol", "6", "-include_reverse", - "-num_reducers", "10" + "-num_reducers", "10", + "-records_per_file", "1000" }; List pcaps = Arrays.asList(new byte[][]{asBytes("abc"), asBytes("def"), asBytes("ghi")}); + Iterator iterator = pcaps.iterator(); + SequenceFileIterable iterable = mock(SequenceFileIterable.class); + when(iterable.iterator()).thenReturn(iterator); Path base_path = new Path("/base/path"); Path base_output_path = new Path("/base/output/path"); @@ -164,7 +173,7 @@ public class PcapCliTest { long startAsNanos = asNanos("2016-06-13-18:35.00", "yyyy-MM-dd-HH:mm.ss"); long endAsNanos = asNanos("2016-06-15-18:35.00", "yyyy-MM-dd-HH:mm.ss"); - when(jobRunner.query(eq(base_path), eq(base_output_path), eq(startAsNanos), eq(endAsNanos), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(pcaps); + when(jobRunner.query(eq(base_path), eq(base_output_path), eq(startAsNanos), eq(endAsNanos), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(iterable); when(clock.currentTimeFormatted("yyyyMMddHHmmssSSSZ")).thenReturn("20160615183527162+0000"); PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock); @@ -187,16 +196,18 @@ public class PcapCliTest { String[] args = { "query", "-start_time", "500", - "-num_reducers", "10", "-query", "some query string" }; List pcaps = Arrays.asList(new byte[][]{asBytes("abc"), asBytes("def"), asBytes("ghi")}); + Iterator iterator = pcaps.iterator(); + SequenceFileIterable iterable = mock(SequenceFileIterable.class); + when(iterable.iterator()).thenReturn(iterator); - Path base_path = new Path(CliConfig.BASE_PATH_DEFAULT); - Path base_output_path = new Path(CliConfig.BASE_OUTPUT_PATH_DEFAULT); + Path base_path = new Path(CliParser.BASE_PATH_DEFAULT); + Path base_output_path = new Path(CliParser.BASE_OUTPUT_PATH_DEFAULT); String query = "some query string"; - when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(QueryPcapFilter.Configurator.class))).thenReturn(pcaps); + when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(QueryPcapFilter.Configurator.class))).thenReturn(iterable); when(clock.currentTimeFormatted("yyyyMMddHHmmssSSSZ")).thenReturn("20160615183527162+0000"); PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock); @@ -213,15 +224,19 @@ public class PcapCliTest { "-num_reducers", "10", "-base_path", "/base/path", "-base_output_path", "/base/output/path", - "-query", "some query string" + "-query", "some query string", + "-records_per_file", "1000" }; List pcaps = Arrays.asList(new byte[][]{asBytes("abc"), asBytes("def"), asBytes("ghi")}); + Iterator iterator = pcaps.iterator(); + SequenceFileIterable iterable = mock(SequenceFileIterable.class); + when(iterable.iterator()).thenReturn(iterator); Path base_path = new Path("/base/path"); Path base_output_path = new Path("/base/output/path"); String query = "some query string"; - when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(QueryPcapFilter.Configurator.class))).thenReturn(pcaps); + when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(QueryPcapFilter.Configurator.class))).thenReturn(iterable); when(clock.currentTimeFormatted("yyyyMMddHHmmssSSSZ")).thenReturn("20160615183527162+0000"); PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock); @@ -229,54 +244,76 @@ public class PcapCliTest { Mockito.verify(resultsWriter).write(pcaps, "pcap-data-20160615183527162+0000.pcap"); } + // INVALID OPTION CHECKS + @Test public void invalid_fixed_filter_arg_prints_help() throws Exception { + String[] args = { + "fixed", + "-start_time", "500", + "-end_time", "1000", + "-num_reducers", "10", + "-base_path", "/base/path", + "-base_output_path", "/base/output/path", + "-query", "THIS IS AN ERROR" + }; + assertCliError(args, "Fixed", "Unrecognized option: -query"); + } + + /** + * + * @param args PcapJob args + * @param type Fixed|Query + * @param optMsg Expected error message + */ + public void assertCliError(String[] args, String type, String optMsg) { PrintStream originalOutStream = System.out; + PrintStream originalErrOutStream = System.err; try { ByteArrayOutputStream bos = new ByteArrayOutputStream(); - PrintStream testStream = new PrintStream(new BufferedOutputStream(bos)); - System.setOut(testStream); - String[] args = { - "fixed", - "-start_time", "500", - "-end_time", "1000", - "-num_reducers", "10", - "-base_path", "/base/path", - "-base_output_path", "/base/output/path", - "-query", "THIS IS AN ERROR" - }; + PrintStream outStream = new PrintStream(new BufferedOutputStream(bos)); + System.setOut(outStream); + + ByteArrayOutputStream ebos = new ByteArrayOutputStream(); + PrintStream errOutStream = new PrintStream(new BufferedOutputStream(ebos)); + System.setErr(errOutStream); PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock); assertThat("Expect errors on run", cli.run(args), equalTo(-1)); - assertThat(bos.toString(), bos.toString().contains("usage: Fixed filter options"), equalTo(true)); + assertThat("Expect missing required option error: " + ebos.toString(), ebos.toString().contains(optMsg), equalTo(true)); + assertThat("Expect usage to be printed: " + bos.toString(), bos.toString().contains("usage: " + type + " filter options"), equalTo(true)); } finally { System.setOut(originalOutStream); + System.setErr(originalErrOutStream); } } @Test public void invalid_query_filter_arg_prints_help() throws Exception { - PrintStream originalOutStream = System.out; - try { - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - PrintStream outStream = new PrintStream(new BufferedOutputStream(bos)); - System.setOut(outStream); - String[] args = { - "query", - "-start_time", "500", - "-end_time", "1000", - "-num_reducers", "10", - "-base_path", "/base/path", - "-base_output_path", "/base/output/path", - "-ip_src_addr", "THIS IS AN ERROR" - }; + String[] args = { + "query", + "-start_time", "500", + "-end_time", "1000", + "-num_reducers", "10", + "-base_path", "/base/path", + "-base_output_path", "/base/output/path", + "-ip_src_addr", "THIS IS AN ERROR" + }; + assertCliError(args, "Query", ""); + } - PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock); - assertThat("Expect errors on run", cli.run(args), equalTo(-1)); - assertThat(bos.toString(), bos.toString().contains("usage: Query filter options"), equalTo(true)); - } finally { - System.setOut(originalOutStream); - } + @Test + public void missing_start_time_arg_prints_error_and_help() throws Exception { + String[] args = { + "fixed", + "-ip_src_addr", "192.168.1.1", + "-ip_dst_addr", "192.168.1.2", + "-ip_src_port", "8081", + "-ip_dst_port", "8082", + "-protocol", "6", + "-num_reducers", "10" + }; + assertCliError(args, "Fixed", "Missing required option: st"); } } http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e0c9970b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java index cce4074..f874620 100644 --- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java @@ -27,7 +27,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; @@ -35,6 +34,7 @@ import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.log4j.Logger; +import org.apache.metron.common.hadoop.SequenceFileIterable; import org.apache.metron.pcap.PacketInfo; import org.apache.metron.pcap.PcapHelper; import org.apache.metron.pcap.filter.PcapFilter; @@ -64,6 +64,11 @@ public class PcapJob { } long x = longWritable.get(); int ret = (int)Long.divideUnsigned(x - start, width); + if(ret > numPartitions) { + throw new IllegalArgumentException(String.format("Bad partition: key=%s, width=%d, partition=%d, numPartitions=%d" + , Long.toUnsignedString(x), width, ret, numPartitions) + ); + } return ret; } @@ -176,32 +181,26 @@ public class PcapJob { return ret; } - private List readResults(Path outputPath, Configuration config, FileSystem fs) throws IOException { - List ret = new ArrayList<>(); - for(RemoteIterator it= fs.listFiles(outputPath, false);it.hasNext();) { + /** + * Returns a lazily-read Iterable over a set of sequence files + */ + private SequenceFileIterable readResults(Path outputPath, Configuration config, FileSystem fs) throws IOException { + List files = new ArrayList<>(); + for (RemoteIterator it = fs.listFiles(outputPath, false); it.hasNext(); ) { Path p = it.next().getPath(); - if(p.getName().equals("_SUCCESS")) { + if (p.getName().equals("_SUCCESS")) { fs.delete(p, false); continue; } - SequenceFile.Reader reader = new SequenceFile.Reader(config, - SequenceFile.Reader.file(p)); - LongWritable key = new LongWritable(); - BytesWritable value = new BytesWritable(); - while(reader.next(key, value)) { - ret.add(value.copyBytes()); - } - reader.close(); - fs.delete(p, false); + files.add(p); } - fs.delete(outputPath, false); - if(LOG.isDebugEnabled()) { - LOG.debug(outputPath + ": Returning " + ret.size()); + if (LOG.isDebugEnabled()) { + LOG.debug(outputPath); } - return ret; + return new SequenceFileIterable(files, config); } - public List query(Path basePath + public SequenceFileIterable query(Path basePath , Path baseOutputPath , long beginNS , long endNS @@ -240,12 +239,10 @@ public class PcapJob { } } - - public static int findWidth(long start, long end, int numReducers) { - return (int)Long.divideUnsigned(end - start, numReducers) + 1; + public static long findWidth(long start, long end, int numReducers) { + return Long.divideUnsigned(end - start, numReducers) + 1; } - public Job createJob( Path basePath , Path outputPath , long beginNS http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e0c9970b/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index d7e373d..659a467 100644 --- a/pom.xml +++ b/pom.xml @@ -202,7 +202,7 @@ metron-ui/lib/public/font/** metron-ui/node_modules/** - metron-deployment/packaging/ambari/src/main/resources/common-services/KIBANA/4.5.1/package/scripts/dashboard/*.p + **/packaging/ambari/src/main/resources/common-services/KIBANA/4.5.1/package/scripts/dashboard/*.p