metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mmiklav...@apache.org
Subject metron git commit: METRON-1732: Fix job status liveness bug and parallelize finalizer file writing (mmiklavc via mmiklavc) closes apache/metron#1157
Date Wed, 15 Aug 2018 17:30:36 GMT
Repository: metron
Updated Branches:
  refs/heads/feature/METRON-1554-pcap-query-panel 8a926dd5a -> d9e1f381c


METRON-1732: Fix job status liveness bug and parallelize finalizer file writing (mmiklavc via mmiklavc) closes apache/metron#1157


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/d9e1f381
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/d9e1f381
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/d9e1f381

Branch: refs/heads/feature/METRON-1554-pcap-query-panel
Commit: d9e1f381cc1ae9edfbffc7216abffdbd8f942c09
Parents: 8a926dd
Author: mmiklavc <michael.miklavcic@gmail.com>
Authored: Wed Aug 15 11:30:10 2018 -0600
Committer: Michael Miklavcic <michael.miklavcic@gmail.com>
Committed: Wed Aug 15 11:30:10 2018 -0600

----------------------------------------------------------------------
 .../CURRENT/configuration/metron-rest-env.xml   | 14 +++
 .../package/scripts/params/params_linux.py      |  1 +
 .../METRON/CURRENT/package/templates/metron.j2  |  1 +
 .../METRON/CURRENT/themes/metron_theme.json     | 10 +++
 metron-interface/metron-rest/README.md          | 10 +++
 .../src/main/config/rest_application.yml        |  1 +
 .../apache/metron/rest/MetronRestConstants.java |  1 +
 .../rest/service/impl/PcapServiceImpl.java      | 25 +++---
 .../apache/metron/rest/mock/MockPcapJob.java    | 18 ++--
 .../rest/service/impl/PcapServiceImplTest.java  | 49 ++++++-----
 .../org/apache/metron/pcap/query/CliParser.java | 10 ++-
 .../PcapTopologyIntegrationTest.java            |  1 +
 .../apache/metron/pcap/query/PcapCliTest.java   | 21 ++---
 .../java/org/apache/metron/pcap/PcapPages.java  |  2 +-
 .../apache/metron/pcap/config/PcapConfig.java   |  6 ++
 .../metron/pcap/config/PcapGlobalDefaults.java  |  1 +
 .../apache/metron/pcap/config/PcapOptions.java  |  3 +-
 .../metron/pcap/finalizer/PcapFinalizer.java    | 70 +++++++++++++--
 .../java/org/apache/metron/pcap/mr/PcapJob.java | 90 ++++++++++++++------
 .../org/apache/metron/pcap/mr/PcapJobTest.java  | 16 +++-
 20 files changed, 258 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/d9e1f381/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-rest-env.xml
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-rest-env.xml b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-rest-env.xml
index 895c091..767afa3 100644
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-rest-env.xml
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-rest-env.xml
@@ -201,5 +201,19 @@
             <empty-value-valid>true</empty-value-valid>
         </value-attributes>
     </property>
+    <property>
+        <name>pcap_finalizer_threadpool_size</name>
+        <display-name>Pcap Finalizer Threadpool Size</display-name>
+        <description>The number of threads to use when finalizing Pcap jobs. This affects parallelism
+          around writing out paged files to their final location.
+          If it's a string and ends with "C", then strip the C and treat it as an integral multiple of
+          the number of cores. If it's a string and does not end with a C, then treat it as a number in
+          string form.
+        </description>
+        <value>1</value>
+        <value-attributes>
+            <empty-value-valid>false</empty-value-valid>
+        </value-attributes>
+    </property>
 
 </configuration>

http://git-wip-us.apache.org/repos/asf/metron/blob/d9e1f381/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
index 4f8a9a7..115a54c 100755
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
@@ -391,6 +391,7 @@ pcap_base_interim_result_path = config['configurations']['metron-rest-env']['pca
 pcap_final_output_path = config['configurations']['metron-rest-env']['pcap_final_output_path']
 pcap_page_size = config['configurations']['metron-rest-env']['pcap_page_size']
 pcap_yarn_queue = config['configurations']['metron-rest-env']['pcap_yarn_queue']
+pcap_finalizer_threadpool_size= config['configurations']['metron-rest-env']['pcap_finalizer_threadpool_size']
 pcap_configured_flag_file = status_params.pcap_configured_flag_file
 
 # MapReduce

http://git-wip-us.apache.org/repos/asf/metron/blob/d9e1f381/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/metron.j2
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/metron.j2 b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/metron.j2
index 55422d0..a7d01e5 100644
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/metron.j2
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/metron.j2
@@ -45,3 +45,4 @@ PCAP_BASE_INTERIM_RESULT_PATH="{{pcap_base_interim_result_path}}"
 PCAP_FINAL_OUTPUT_PATH="{{pcap_final_output_path}}"
 PCAP_PAGE_SIZE="{{pcap_page_size}}"
 PCAP_YARN_QUEUE="{{pcap_yarn_queue}}"
+PCAP_FINALIZER_THREADPOOL_SIZE="{{pcap_finalizer_threadpool_size}}"

http://git-wip-us.apache.org/repos/asf/metron/blob/d9e1f381/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json
index db06b61..2b64f8f 100644
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json
@@ -828,6 +828,10 @@
           "subsection-name": "subsection-rest"
         },
         {
+          "config": "metron-rest-env/pcap_finalizer_threadpool_size",
+          "subsection-name": "subsection-rest"
+        },
+        {
           "config": "metron-management-ui-env/metron_management_ui_port",
           "subsection-name": "subsection-management-ui"
         },
@@ -1441,6 +1445,12 @@
         }
       },
       {
+        "config": "metron-rest-env/pcap_finalizer_threadpool_size",
+        "widget": {
+          "type": "text-field"
+        }
+      },
+      {
         "config": "metron-management-ui-env/metron_management_ui_port",
         "widget": {
           "type": "text-field"

http://git-wip-us.apache.org/repos/asf/metron/blob/d9e1f381/metron-interface/metron-rest/README.md
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/README.md b/metron-interface/metron-rest/README.md
index d19d8c3..080422d 100644
--- a/metron-interface/metron-rest/README.md
+++ b/metron-interface/metron-rest/README.md
@@ -223,6 +223,16 @@ REST will supply the script with raw pcap data through standard in and expects P
 
 Pcap query jobs can be configured for submission to a YARN queue.  This setting is exposed as the Spring property `pcap.yarn.queue`.  If configured, the REST application will set the `mapreduce.job.queuename` Hadoop property to that value.
 
+Queries can also be configured on a global level for setting the number of results per page via a Spring property `pcap.page.size`. By default, this value is set to 10 pcaps per page, but you may choose to set this value higher
+based on observing frequenetly-run query result sizes. This setting works in conjunction with the property for setting finalizer threadpool size when optimizing query performance.
+
+Pcap query jobs have a finalization routine that writes their results out to HDFS in pages. Depending on the size of your pcaps, the number or results typically returned, page sizing (described above), and available CPU cores for running
+your REST application, your performance can be improved by adjusting the number of files that can be written to HDFS in parallel. To this end, there is a threadpool used for this finalization step that can be configured to use a specified
+number of threads. This setting is exposed as the Spring property `pcap.finalizer.threadpool.size`. A default value of "1" is used if not specified by the user. Generally speaking, you should see a performance gain when this value is set
+to anything higher than 1. A sizeable increase in performance can be achieved, especially for larger numbers of files of smaller size, by increasing the number of threads. It should be noted that this property is parsed as a String to allow
+for more complex parallelism values. In addition to normal integer values, you can specify a multiple of the number of cores. If it's a string and ends with "C", then strip the C and treat it as an integral multiple of the number of cores.
+If it's a string and does not end with a C, then treat it as a number in string form.
+
 ## API
 
 Request and Response objects are JSON formatted.  The JSON schemas are available in the Swagger UI.

http://git-wip-us.apache.org/repos/asf/metron/blob/d9e1f381/metron-interface/metron-rest/src/main/config/rest_application.yml
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/config/rest_application.yml b/metron-interface/metron-rest/src/main/config/rest_application.yml
index e25ad82..84efc01 100644
--- a/metron-interface/metron-rest/src/main/config/rest_application.yml
+++ b/metron-interface/metron-rest/src/main/config/rest_application.yml
@@ -61,3 +61,4 @@ pcap:
   final.output.path: ${PCAP_FINAL_OUTPUT_PATH}
   page.size: ${PCAP_PAGE_SIZE}
   yarn.queue: ${PCAP_YARN_QUEUE}
+  finalizer.threadpool.size: ${PCAP_FINALIZER_THREADPOOL_SIZE}

http://git-wip-us.apache.org/repos/asf/metron/blob/d9e1f381/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java
index 02655298..e3bf698 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java
@@ -82,4 +82,5 @@ public class MetronRestConstants {
   public static final String PCAP_PAGE_SIZE_SPRING_PROPERTY = "pcap.page.size";
   public static final String PCAP_PDML_SCRIPT_PATH_SPRING_PROPERTY = "pcap.pdml.script.path";
   public static final String PCAP_YARN_QUEUE_SPRING_PROPERTY = "pcap.yarn.queue";
+  public static final String PCAP_FINALIZER_THREADPOOL_SIZE_SPRING_PROPERTY = "pcap.finalizer.threadpool.size";
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/d9e1f381/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/PcapServiceImpl.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/PcapServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/PcapServiceImpl.java
index db2e17b..d5468d4 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/PcapServiceImpl.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/PcapServiceImpl.java
@@ -17,7 +17,17 @@
  */
 package org.apache.metron.rest.service.impl;
 
+import static org.apache.metron.rest.MetronRestConstants.PCAP_YARN_QUEUE_SPRING_PROPERTY;
+
 import com.fasterxml.jackson.dataformat.xml.XmlMapper;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -31,7 +41,6 @@ import org.apache.metron.job.manager.JobManager;
 import org.apache.metron.pcap.config.PcapOptions;
 import org.apache.metron.pcap.filter.PcapFilterConfigurator;
 import org.apache.metron.pcap.filter.fixed.FixedPcapFilter;
-import org.apache.metron.pcap.filter.query.QueryPcapFilter;
 import org.apache.metron.rest.MetronRestConstants;
 import org.apache.metron.rest.RestException;
 import org.apache.metron.rest.config.PcapJobSupplier;
@@ -47,19 +56,6 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.core.env.Environment;
 import org.springframework.stereotype.Service;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.stream.Collectors;
-import java.io.OutputStream;
-import java.nio.charset.StandardCharsets;
-import java.util.Map;
-
-import static org.apache.metron.rest.MetronRestConstants.PCAP_YARN_QUEUE_SPRING_PROPERTY;
-
 @Service
 public class PcapServiceImpl implements PcapService {
 
@@ -274,6 +270,7 @@ public class PcapServiceImpl implements PcapService {
     }
 
     PcapOptions.NUM_RECORDS_PER_FILE.put(pcapRequest, Integer.parseInt(environment.getProperty(MetronRestConstants.PCAP_PAGE_SIZE_SPRING_PROPERTY)));
+    PcapOptions.FINALIZER_THREADPOOL_SIZE.put(pcapRequest, environment.getProperty(MetronRestConstants.PCAP_FINALIZER_THREADPOOL_SIZE_SPRING_PROPERTY));
   }
 
   protected FileSystem getFileSystem() throws IOException {

http://git-wip-us.apache.org/repos/asf/metron/blob/d9e1f381/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockPcapJob.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockPcapJob.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockPcapJob.java
index c977faa..1fdf45e 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockPcapJob.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockPcapJob.java
@@ -17,6 +17,11 @@
  */
 package org.apache.metron.rest.mock;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.HashMap;
+import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.MRJobConfig;
@@ -29,12 +34,6 @@ import org.apache.metron.pcap.config.PcapOptions;
 import org.apache.metron.pcap.filter.PcapFilterConfigurator;
 import org.apache.metron.pcap.mr.PcapJob;
 
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
 public class MockPcapJob extends PcapJob<Path> {
 
   private String basePath;
@@ -46,6 +45,7 @@ public class MockPcapJob extends PcapJob<Path> {
   private Map<String, String> fixedFields;
   private PcapFilterConfigurator filterImpl;
   private int recPerFile;
+  private String finalizerThreadpoolSize;
   private String query;
   private String yarnQueue;
   private Statusable<Path> statusable;
@@ -72,6 +72,7 @@ public class MockPcapJob extends PcapJob<Path> {
     this.filterImpl = PcapOptions.FILTER_IMPL.get(configuration, PcapFilterConfigurator.class);
     this.recPerFile = PcapOptions.NUM_RECORDS_PER_FILE.get(configuration, Integer.class);
     this.yarnQueue = PcapOptions.HADOOP_CONF.get(configuration, Configuration.class).get(MRJobConfig.QUEUE_NAME);
+    this.finalizerThreadpoolSize = PcapOptions.FINALIZER_THREADPOOL_SIZE.get(configuration, String.class);
     return statusable;
   }
 
@@ -152,4 +153,9 @@ public class MockPcapJob extends PcapJob<Path> {
   public String getYarnQueue() {
     return yarnQueue;
   }
+
+  public String getFinalizerThreadpoolSize() {
+    return finalizerThreadpoolSize;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/d9e1f381/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/PcapServiceImplTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/PcapServiceImplTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/PcapServiceImplTest.java
index 6635598..8cd363a 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/PcapServiceImplTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/PcapServiceImplTest.java
@@ -17,12 +17,31 @@
  */
 package org.apache.metron.rest.service.impl;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.anyVararg;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.doReturn;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
 import org.adrianwalker.multilinestring.Multiline;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.utils.JSONUtils;
 import org.apache.metron.job.JobException;
@@ -43,10 +62,10 @@ import org.apache.metron.rest.mock.MockPcapJob;
 import org.apache.metron.rest.mock.MockPcapJobSupplier;
 import org.apache.metron.rest.model.pcap.FixedPcapOptions;
 import org.apache.metron.rest.model.pcap.FixedPcapRequest;
-import org.apache.metron.rest.model.pcap.QueryPcapOptions;
-import org.apache.metron.rest.model.pcap.QueryPcapRequest;
 import org.apache.metron.rest.model.pcap.PcapStatus;
 import org.apache.metron.rest.model.pcap.Pdml;
+import org.apache.metron.rest.model.pcap.QueryPcapOptions;
+import org.apache.metron.rest.model.pcap.QueryPcapRequest;
 import org.hamcrest.CoreMatchers;
 import org.junit.Assert;
 import org.junit.Before;
@@ -59,28 +78,6 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 import org.springframework.core.env.Environment;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Matchers.anyVararg;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import static org.powermock.api.mockito.PowerMockito.doReturn;
-import static org.powermock.api.mockito.PowerMockito.whenNew;
-
 @SuppressWarnings("ALL")
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({PcapToPdmlScriptWrapper.class, ProcessBuilder.class})
@@ -195,6 +192,7 @@ public class PcapServiceImplTest {
     when(environment.getProperty(MetronRestConstants.PCAP_BASE_INTERIM_RESULT_PATH_SPRING_PROPERTY)).thenReturn("/base/interim/result/path");
     when(environment.getProperty(MetronRestConstants.PCAP_FINAL_OUTPUT_PATH_SPRING_PROPERTY)).thenReturn("/final/output/path");
     when(environment.getProperty(MetronRestConstants.PCAP_PAGE_SIZE_SPRING_PROPERTY)).thenReturn("100");
+    when(environment.getProperty(MetronRestConstants.PCAP_FINALIZER_THREADPOOL_SIZE_SPRING_PROPERTY)).thenReturn("2C");
     when(environment.getProperty(MetronRestConstants.PCAP_PDML_SCRIPT_PATH_SPRING_PROPERTY)).thenReturn("/path/to/pdml/script");
     when(environment.getProperty(MetronRestConstants.USER_JOB_LIMIT_SPRING_PROPERTY, Integer.class, 1)).thenReturn(1);
   }
@@ -255,6 +253,7 @@ public class PcapServiceImplTest {
     Assert.assertEquals(2, mockPcapJob.getNumReducers());
     Assert.assertEquals(100, mockPcapJob.getRecPerFile());
     Assert.assertEquals("pcap", mockPcapJob.getYarnQueue());
+    Assert.assertEquals("2C", mockPcapJob.getFinalizerThreadpoolSize());
     Assert.assertTrue(mockPcapJob.getFilterImpl() instanceof FixedPcapFilter.Configurator);
     Map<String, String> actualFixedFields = mockPcapJob.getFixedFields();
     Assert.assertEquals("ip_src_addr", actualFixedFields.get(Constants.Fields.SRC_ADDR.getName()));

http://git-wip-us.apache.org/repos/asf/metron/blob/d9e1f381/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java
index b9a2a50..9c338cb 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java
@@ -18,8 +18,9 @@
 
 package org.apache.metron.pcap.query;
 
-import static org.apache.metron.pcap.config.PcapGlobalDefaults.BASE_INTERIM_RESULT_PATH_DEFAULT;
 import static org.apache.metron.pcap.config.PcapGlobalDefaults.BASE_INPUT_PATH_DEFAULT;
+import static org.apache.metron.pcap.config.PcapGlobalDefaults.BASE_INTERIM_RESULT_PATH_DEFAULT;
+import static org.apache.metron.pcap.config.PcapGlobalDefaults.NUM_FINALIZER_THREADS_DEFAULT;
 import static org.apache.metron.pcap.config.PcapGlobalDefaults.NUM_RECORDS_PER_FILE_DEFAULT;
 import static org.apache.metron.pcap.config.PcapGlobalDefaults.NUM_REDUCERS_DEFAULT;
 
@@ -56,6 +57,7 @@ public class CliParser {
     options.addOption(newOption("et", "end_time", true, "Packet end time range. Default is current system time."));
     options.addOption(newOption("df", "date_format", true, "Date format to use for parsing start_time and end_time. Default is to use time in millis since the epoch."));
     options.addOption(newOption("yq", "yarn_queue", true, "Yarn queue this job will be submitted to"));
+    options.addOption(newOption("ft", "finalizer_threads", true, "Number of threads to use for the final output writing."));
     return options;
   }
 
@@ -129,6 +131,12 @@ public class CliParser {
     if (commandLine.hasOption("yarn_queue")) {
       config.setYarnQueue(commandLine.getOptionValue("yarn_queue"));
     }
+    if (commandLine.hasOption("finalizer_threads")) {
+      String numThreads = commandLine.getOptionValue("finalizer_threads");
+      config.setFinalizerThreadpoolSize(numThreads);
+    } else {
+      config.setFinalizerThreadpoolSize(NUM_FINALIZER_THREADS_DEFAULT);
+    }
   }
 
   public void printHelp(String msg, Options opts) {

http://git-wip-us.apache.org/repos/asf/metron/blob/d9e1f381/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
index c30267d..a8f0676 100644
--- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
+++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
@@ -306,6 +306,7 @@ public class PcapTopologyIntegrationTest extends BaseIntegrationTest {
     PcapOptions.NUM_REDUCERS.put(configuration, 10);
     PcapOptions.NUM_RECORDS_PER_FILE.put(configuration, 1);
     PcapOptions.FINAL_OUTPUT_PATH.put(configuration, new Path(outputDir.getAbsolutePath()));
+    PcapOptions.FINALIZER_THREADPOOL_SIZE.put(configuration, 4);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/metron/blob/d9e1f381/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
index 3e7aad2..ec06c52 100644
--- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
+++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
@@ -17,8 +17,8 @@
  */
 package org.apache.metron.pcap.query;
 
-import static org.apache.metron.pcap.config.PcapGlobalDefaults.BASE_INTERIM_RESULT_PATH_DEFAULT;
 import static org.apache.metron.pcap.config.PcapGlobalDefaults.BASE_INPUT_PATH_DEFAULT;
+import static org.apache.metron.pcap.config.PcapGlobalDefaults.BASE_INTERIM_RESULT_PATH_DEFAULT;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Matchers.argThat;
@@ -33,17 +33,11 @@ import java.io.PrintStream;
 import java.nio.charset.StandardCharsets;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Set;
-
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.system.Clock;
@@ -172,7 +166,8 @@ public class PcapCliTest {
             "-protocol", "6",
             "-include_reverse",
             "-num_reducers", "10",
-            "-records_per_file", "1000"
+            "-records_per_file", "1000",
+            "-finalizer_threads", "10"
     };
     Map<String, String> query = new HashMap<String, String>() {{
       put(Constants.Fields.SRC_ADDR.getName(), "192.168.1.1");
@@ -191,6 +186,7 @@ public class PcapCliTest {
     PcapOptions.END_TIME_MS.put(config, 1000L);
     PcapOptions.NUM_RECORDS_PER_FILE.put(config, 1000);
     PcapOptions.PRINT_JOB_STATUS.put(config, true);
+    PcapOptions.FINALIZER_THREADPOOL_SIZE.put(config, "10");
 
     when(jobRunner.submit(isA(Finalizer.class), argThat(mapContaining(config)))).thenReturn(jobRunner);
 
@@ -216,7 +212,8 @@ public class PcapCliTest {
             "-include_reverse",
             "-num_reducers", "10",
             "-records_per_file", "1000",
-            "-yq", "pcap"
+            "-yq", "pcap",
+            "-finalizer_threads", "10"
     };
     Map<String, String> query = new HashMap<String, String>() {{
       put(Constants.Fields.SRC_ADDR.getName(), "192.168.1.1");
@@ -242,6 +239,7 @@ public class PcapCliTest {
     PcapOptions.HADOOP_CONF.put(config, new HashMap<String, Object>() {{
       put(MRJobConfig.QUEUE_NAME, "pcap");
     }});
+    PcapOptions.FINALIZER_THREADPOOL_SIZE.put(config, "10");
 
     when(jobRunner.submit(isA(Finalizer.class), argThat(mapContaining(config)))).thenReturn(jobRunner);
 
@@ -275,6 +273,7 @@ public class PcapCliTest {
     PcapOptions.FIELDS.put(config, query);
     PcapOptions.NUM_REDUCERS.put(config, 10);
     PcapOptions.START_TIME_MS.put(config, 500L);
+    PcapOptions.FINALIZER_THREADPOOL_SIZE.put(config, "1");
 
     when(jobRunner.submit(isA(Finalizer.class), argThat(mapContaining(config)))).thenReturn(jobRunner);
 
@@ -293,7 +292,8 @@ public class PcapCliTest {
             "-base_path", "/base/path",
             "-base_output_path", "/base/output/path",
             "-query", "some query string",
-            "-records_per_file", "1000"
+            "-records_per_file", "1000",
+            "-finalizer_threads", "10"
     };
 
     String query = "some query string";
@@ -306,6 +306,7 @@ public class PcapCliTest {
     PcapOptions.END_TIME_MS.put(config, 1000L);  // needed bc defaults in config
     PcapOptions.NUM_RECORDS_PER_FILE.put(config, 1000);
     PcapOptions.PRINT_JOB_STATUS.put(config, true);
+    PcapOptions.FINALIZER_THREADPOOL_SIZE.put(config, "10");
 
     when(jobRunner.submit(isA(Finalizer.class), argThat(mapContaining(config)))).thenReturn(jobRunner);
 

http://git-wip-us.apache.org/repos/asf/metron/blob/d9e1f381/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapPages.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapPages.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapPages.java
index c98e681..3dbf92d 100644
--- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapPages.java
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapPages.java
@@ -64,7 +64,7 @@ public class PcapPages implements Pageable<Path> {
     return new PcapIterator(files.iterator());
   }
 
-  private class PcapIterator implements Iterator<Path> {
+  private static class PcapIterator implements Iterator<Path> {
 
     private Iterator<Path> delegateIt;
 

http://git-wip-us.apache.org/repos/asf/metron/blob/d9e1f381/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapConfig.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapConfig.java
index abf35d0..f2ad653 100644
--- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapConfig.java
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapConfig.java
@@ -29,6 +29,7 @@ import java.util.Optional;
 import java.util.function.Function;
 
 public class PcapConfig extends AbstractMapDecorator<String, Object>{
+
   public interface PrefixStrategy extends Function<Clock, String>{}
 
   private boolean showHelp;
@@ -147,4 +148,9 @@ public class PcapConfig extends AbstractMapDecorator<String, Object>{
   public Optional<String> getYarnQueue() {
     return Optional.ofNullable(yarnQueue);
   }
+
+  public void setFinalizerThreadpoolSize(String numThreads) {
+    PcapOptions.FINALIZER_THREADPOOL_SIZE.put(this, numThreads);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/d9e1f381/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapGlobalDefaults.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapGlobalDefaults.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapGlobalDefaults.java
index b8c674c..ebfdad5 100644
--- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapGlobalDefaults.java
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapGlobalDefaults.java
@@ -25,4 +25,5 @@ public class PcapGlobalDefaults {
   public static final String FINAL_OUTPUT_PATH_DEFAULT = BASE_PCAP_PATH_DEFAULT + "/output";
   public static final int NUM_REDUCERS_DEFAULT = 10;
   public static final int NUM_RECORDS_PER_FILE_DEFAULT = 10000;
+  public static final String NUM_FINALIZER_THREADS_DEFAULT = "1";
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/d9e1f381/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapOptions.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapOptions.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapOptions.java
index 203c800..c5852d1 100644
--- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapOptions.java
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapOptions.java
@@ -41,7 +41,8 @@ public enum PcapOptions implements ConfigOption {
   FILTER_IMPL("filterImpl"),
   HADOOP_CONF("hadoopConf"),
   FILESYSTEM("fileSystem"),
-  PRINT_JOB_STATUS("printJobStatus");
+  PRINT_JOB_STATUS("printJobStatus"),
+  FINALIZER_THREADPOOL_SIZE("finalizerThreadpoolSize");
 
   public static final BiFunction<String, Object, Path> STRING_TO_PATH =
       (s, o) -> o == null ? null : new Path(o.toString());

http://git-wip-us.apache.org/repos/asf/metron/blob/d9e1f381/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizer.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizer.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizer.java
index 5a61f9b..2eeab3f 100644
--- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizer.java
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizer.java
@@ -18,6 +18,7 @@
 
 package org.apache.metron.pcap.finalizer;
 
+import static java.lang.String.format;
 import static org.apache.metron.pcap.config.PcapGlobalDefaults.NUM_RECORDS_PER_FILE_DEFAULT;
 
 import com.google.common.collect.Iterables;
@@ -25,8 +26,11 @@ import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
@@ -66,8 +70,11 @@ public abstract class PcapFinalizer implements Finalizer<Path> {
     Configuration hadoopConfig = PcapOptions.HADOOP_CONF.get(config, Configuration.class);
     int recPerFile = PcapOptions.NUM_RECORDS_PER_FILE
         .getOrDefault(config, Integer.class, NUM_RECORDS_PER_FILE_DEFAULT);
-    Path interimResultPath = PcapOptions.INTERIM_RESULT_PATH.get(config, PcapOptions.STRING_TO_PATH, Path.class);
+    Path interimResultPath = PcapOptions.INTERIM_RESULT_PATH
+        .get(config, PcapOptions.STRING_TO_PATH, Path.class);
     FileSystem fs = PcapOptions.FILESYSTEM.get(config, FileSystem.class);
+    int parallelism = getNumThreads(PcapOptions.FINALIZER_THREADPOOL_SIZE.get(config, String.class));
+    LOG.info("Finalizer running with parallelism set to " + parallelism);
 
     SequenceFileIterable interimResults = null;
     try {
@@ -78,15 +85,14 @@ public abstract class PcapFinalizer implements Finalizer<Path> {
     List<Path> outFiles = new ArrayList<>();
     try {
       Iterable<List<byte[]>> partitions = Iterables.partition(interimResults, recPerFile);
+      Map<Path, List<byte[]>> toWrite = new HashMap<>();
       int part = 1;
       if (partitions.iterator().hasNext()) {
         for (List<byte[]> data : partitions) {
           Path outputPath = getOutputPath(config, part++);
-          if (data.size() > 0) {
-            write(resultsWriter, hadoopConfig, data, outputPath);
-            outFiles.add(outputPath);
-          }
+          toWrite.put(outputPath, data);
         }
+        outFiles = writeParallel(hadoopConfig, toWrite, parallelism);
       } else {
         LOG.info("No results returned.");
       }
@@ -99,10 +105,62 @@ public abstract class PcapFinalizer implements Finalizer<Path> {
         LOG.warn("Unable to cleanup files in HDFS", e);
       }
     }
+    LOG.info("Done finalizing results");
     return new PcapPages(outFiles);
   }
 
-  protected abstract void write(PcapResultsWriter resultsWriter, Configuration hadoopConfig, List<byte[]> data, Path outputPath) throws IOException;
+  /**
+   * Figure out how many threads to use in the thread pool. If it's a string and ends with "C",
+   * then strip the C and treat it as an integral multiple of the number of cores.  If it's a
+   * string and does not end with a C, then treat it as a number in string form.
+   */
+  private static int getNumThreads(String numThreads) throws JobException {
+    String numThreadsStr = ((String) numThreads).trim().toUpperCase();
+    try {
+      if (numThreadsStr.endsWith("C")) {
+        Integer factor = Integer.parseInt(numThreadsStr.replace("C", ""));
+        return factor * Runtime.getRuntime().availableProcessors();
+      } else {
+        return Integer.parseInt(numThreadsStr);
+      }
+    } catch (NumberFormatException e) {
+      throw new JobException(
+          format("Unable to set number of threads for finalizing from property value '%s'",
+              numThreads));
+    }
+  }
+
+  protected List<Path> writeParallel(Configuration hadoopConfig, Map<Path, List<byte[]>> toWrite,
+      int parallelism) throws IOException {
+    List<Path> outFiles = Collections.synchronizedList(new ArrayList<>());
+    ForkJoinPool tp = new ForkJoinPool(parallelism);
+    try {
+      tp.submit(() -> {
+        toWrite.entrySet().parallelStream().forEach(e -> {
+          Path path = e.getKey();
+          List<byte[]> data = e.getValue();
+          if (data.size() > 0) {
+            try {
+              write(getResultsWriter(), hadoopConfig, data, path);
+            } catch (IOException ioe) {
+              throw new RuntimeException(
+                  String.format("Failed to write results to path '%s'", path.toString()), ioe);
+            }
+            outFiles.add(path);
+          }
+        });
+      }).get();
+    } catch (InterruptedException | ExecutionException  e) {
+      throw new IOException("Error finalizing results.", e);
+    } catch (RuntimeException e) {
+      throw new IOException(e.getMessage(), e.getCause());
+    }
+    outFiles.sort((o1, o2) -> o1.getName().compareTo(o2.getName()));
+    return outFiles;
+  }
+
+  protected abstract void write(PcapResultsWriter resultsWriter, Configuration hadoopConfig,
+      List<byte[]> data, Path outputPath) throws IOException;
 
   protected abstract Path getOutputPath(Map<String, Object> config, int partition);
 

http://git-wip-us.apache.org/repos/asf/metron/blob/d9e1f381/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
index bf780af..5e82904 100644
--- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
@@ -83,8 +83,8 @@ public class PcapJob<T> implements Statusable<Path> {
   private static final long THREE_SECONDS = 3000;
   private static final long ONE_SECOND = 1000;
   private final OutputDirFormatter outputDirFormatter;
-  private volatile Job mrJob; // store a running MR job reference for async status check
-  private volatile JobStatus jobStatus; // overall job status, including finalization step
+  private Job mrJob; // store a running MR job reference for async status check
+  private JobStatus jobStatus; // overall job status, including finalization step
   private Finalizer<Path> finalizer;
   private Map<String, Object> configuration;
   private Pageable<Path> finalResults;
@@ -212,10 +212,10 @@ public class PcapJob<T> implements Statusable<Path> {
   }
 
   @Override
-  public Statusable<Path> submit(Finalizer<Path> finalizer, Map<String, Object> configuration)
+  public Statusable<Path> submit(Finalizer<Path> finalizer, Map<String, Object> config)
       throws JobException {
     this.finalizer = finalizer;
-    this.configuration = configuration;
+    this.configuration = config;
     Optional<String> jobName = Optional.ofNullable(PcapOptions.JOB_NAME.get(configuration, String.class));
     Configuration hadoopConf = PcapOptions.HADOOP_CONF.get(configuration, Configuration.class);
     FileSystem fileSystem = PcapOptions.FILESYSTEM.get(configuration, FileSystem.class);
@@ -307,8 +307,15 @@ public class PcapJob<T> implements Statusable<Path> {
       }
       return this;
     }
-    mrJob.submit();
-    jobStatus.withState(State.SUBMITTED).withDescription("Job submitted").withJobId(mrJob.getJobID().toString());
+    synchronized (this) {
+      // this block synchronized for proper variable visibility across threads once the status timer
+      // is started. mrJob and jobStatus need to be synchronized so that their references and internal
+      // state are made available to the timer thread. The references to these variables above need
+      // not be synchronized because the job will exit when only 1 thread will have had to use them.
+      mrJob.submit();
+      jobStatus.withState(State.SUBMITTED).withDescription("Job submitted")
+          .withJobId(mrJob.getJobID().toString());
+    }
     startJobStatusTimerThread(statusInterval);
     return this;
   }
@@ -337,45 +344,67 @@ public class PcapJob<T> implements Statusable<Path> {
    *
    * @return true if should continue updating status, false otherwise.
    */
-  private synchronized boolean updateStatus() {
+  private boolean updateStatus() {
+    JobStatus tempStatus = null;
+    final float mrJobFraction = 0.75f; // fraction of total job progress calculation we're allocating to the MR job vs finalization
+    synchronized (this) {
+      tempStatus = new JobStatus(jobStatus);
+    }
+    boolean keepUpdating = true;
     try {
-      org.apache.hadoop.mapreduce.JobStatus mrJobStatus = mrJob.getStatus();
-      org.apache.hadoop.mapreduce.JobStatus.State mrJobState = mrJob.getStatus().getState();
-      if (mrJob.isComplete()) {
-        jobStatus.withPercentComplete(100.0);
+      boolean mrJobComplete = false;
+      org.apache.hadoop.mapreduce.JobStatus.State mrJobState = null;
+      String mrJobFailureInfo = null;
+      float mapProg = 0.0f;
+      float reduceProg = 0.0f;
+      synchronized (this) {
+        mrJobComplete = mrJob.isComplete();
+        org.apache.hadoop.mapreduce.JobStatus mrJobStatus = mrJob.getStatus();
+        mrJobState = mrJobStatus.getState();
+        mrJobFailureInfo = mrJobStatus.getFailureInfo();
+        mapProg = mrJob.mapProgress();
+        reduceProg = mrJob.reduceProgress();
+      }
+      if (mrJobComplete) {
         switch (mrJobState) {
           case SUCCEEDED:
-            jobStatus.withState(State.FINALIZING).withDescription("Finalizing job.");
+            tempStatus.withPercentComplete(100.0 * mrJobFraction).withState(State.FINALIZING).withDescription("Finalizing job.");
             try {
+              synchronized (this) {
+                // want to update the description while the job is finalizing
+                jobStatus = new JobStatus(tempStatus);
+              }
               setFinalResults(finalizer, configuration);
-              jobStatus.withState(State.SUCCEEDED).withDescription("Job completed.");
+              tempStatus.withPercentComplete(100.0).withState(State.SUCCEEDED).withDescription("Job completed.");
             } catch (JobException je) {
-              jobStatus.withState(State.FAILED).withDescription("Job finalize failed.")
+              tempStatus.withPercentComplete(100.0).withState(State.FAILED).withDescription("Job finalize failed.")
                   .withFailureException(je);
             }
             break;
           case FAILED:
-            jobStatus.withState(State.FAILED).withDescription(mrJob.getStatus().getFailureInfo());
+            tempStatus.withPercentComplete(100.0).withState(State.FAILED).withDescription(mrJobFailureInfo);
             break;
           case KILLED:
-            jobStatus.withState(State.KILLED).withDescription(mrJob.getStatus().getFailureInfo());
+            tempStatus.withPercentComplete(100.0).withState(State.KILLED).withDescription(mrJobFailureInfo);
             break;
         }
-        return false;
+        keepUpdating = false;
       } else {
-        float mapProg = mrJob.mapProgress();
-        float reduceProg = mrJob.reduceProgress();
-        float totalProgress = ((mapProg / 2) + (reduceProg / 2)) * 100;
+        float mrJobProgress = ((mapProg / 2) + (reduceProg / 2)) * 100;
+        float totalProgress = mrJobProgress * mrJobFraction;
         String description = String
             .format("map: %s%%, reduce: %s%%", mapProg * 100, reduceProg * 100);
-        jobStatus.withPercentComplete(totalProgress).withState(State.RUNNING)
+        tempStatus.withPercentComplete(totalProgress).withState(State.RUNNING)
             .withDescription(description);
       }
     } catch (InterruptedException | IOException e) {
-      jobStatus.withPercentComplete(100.0).withState(State.FAILED).withFailureException(e);
-      return false;
+      tempStatus.withPercentComplete(100.0).withState(State.FAILED).withFailureException(e);
+      keepUpdating = false;
     }
-    return true;
+    synchronized (this) {
+      jobStatus = new JobStatus(tempStatus);
+    }
+    return keepUpdating;
   }
 
   /**
@@ -489,6 +518,8 @@ public class PcapJob<T> implements Statusable<Path> {
           || status.getState() == State.KILLED
           || status.getState() == State.FAILED) {
         return getFinalResults();
+      } else {
+        LOG.info("Percent complete: {}, description: {}", status.getPercentComplete(), status.getDescription());
       }
       Thread.sleep(completeCheckInterval);
     }
@@ -499,8 +530,11 @@ public class PcapJob<T> implements Statusable<Path> {
   }
 
   @Override
-  public synchronized boolean isDone() {
-    State jobState = jobStatus.getState();
+  public boolean isDone() {
+    State jobState = null;
+    synchronized (this) {
+      jobState = jobStatus.getState();
+    }
     return (jobState == State.SUCCEEDED
         || jobState == State.KILLED
         || jobState == State.FAILED);
@@ -509,7 +543,9 @@ public class PcapJob<T> implements Statusable<Path> {
   @Override
   public void kill() throws JobException {
     try {
-      mrJob.killJob();
+      synchronized (this) {
+        mrJob.killJob();
+      }
     } catch (IOException e) {
       throw new JobException("Unable to kill pcap job.", e);
     }

http://git-wip-us.apache.org/repos/asf/metron/blob/d9e1f381/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/mr/PcapJobTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/mr/PcapJobTest.java b/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/mr/PcapJobTest.java
index d5ef2dc..bbac79a 100644
--- a/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/mr/PcapJobTest.java
+++ b/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/mr/PcapJobTest.java
@@ -244,13 +244,27 @@ public class PcapJobTest {
     JobStatus status = statusable.getStatus();
     Assert.assertThat(status.getState(), equalTo(State.RUNNING));
     Assert.assertThat(status.getDescription(), equalTo("map: 50.0%, reduce: 0.0%"));
-    Assert.assertThat(status.getPercentComplete(), equalTo(25.0));
+    Assert.assertThat(status.getPercentComplete(), equalTo(25.0 * 0.75));
     when(mrJob.mapProgress()).thenReturn(1.0f);
     when(mrJob.reduceProgress()).thenReturn(0.5f);
     timer.updateJobStatus();
     status = statusable.getStatus();
     Assert.assertThat(status.getDescription(), equalTo("map: 100.0%, reduce: 50.0%"));
+    Assert.assertThat(status.getPercentComplete(), equalTo(75.0 * 0.75));
+    when(mrJob.mapProgress()).thenReturn(1.0f);
+    when(mrJob.reduceProgress()).thenReturn(1.0f);
+    timer.updateJobStatus();
+    status = statusable.getStatus();
+    Assert.assertThat(status.getDescription(), equalTo("map: 100.0%, reduce: 100.0%"));
     Assert.assertThat(status.getPercentComplete(), equalTo(75.0));
+    when(mrJob.isComplete()).thenReturn(true);
+    when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.SUCCEEDED);
+    when(mrJob.mapProgress()).thenReturn(1.0f);
+    when(mrJob.reduceProgress()).thenReturn(1.0f);
+    timer.updateJobStatus();
+    status = statusable.getStatus();
+    Assert.assertThat(status.getDescription(), equalTo("Job completed."));
+    Assert.assertThat(status.getPercentComplete(), equalTo(100.0));
   }
 
   @Test


Mime
View raw message