metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmerri...@apache.org
Subject [07/51] [abbrv] metron git commit: METRON-1674 Create REST endpoint for job status abstraction (merrimanr) closes apache/metron#1109
Date Fri, 17 Aug 2018 15:34:22 GMT
METRON-1674 Create REST endpoint for job status abstraction (merrimanr) closes apache/metron#1109


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/39ae9f46
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/39ae9f46
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/39ae9f46

Branch: refs/heads/master
Commit: 39ae9f4642073d3d4f0fa423339dd97f85974588
Parents: dbbf624
Author: merrimanr <merrimanr@gmail.com>
Authored: Thu Jul 19 11:01:49 2018 -0500
Committer: rmerriman <merrimanr@gmail.com>
Committed: Thu Jul 19 11:01:49 2018 -0500

----------------------------------------------------------------------
 .../rest/model/pcap/FixedPcapOptions.java       |  42 ++++
 .../rest/model/pcap/FixedPcapRequest.java       |  72 ++++--
 .../metron/rest/model/pcap/PcapRequest.java     |  65 +++---
 .../metron/rest/model/pcap/PcapStatus.java      |  91 ++++++++
 .../apache/metron/rest/MetronRestConstants.java |   6 +-
 .../apache/metron/rest/config/PcapConfig.java   |  14 +-
 .../metron/rest/config/PcapJobSupplier.java     |  54 +++++
 .../metron/rest/controller/PcapController.java  |  34 +--
 .../apache/metron/rest/service/PcapService.java |   6 +-
 .../rest/service/impl/PcapServiceImpl.java      | 126 ++++++-----
 .../src/main/resources/application.yml          |   6 +-
 .../apache/metron/rest/config/TestConfig.java   |  17 +-
 .../PcapControllerIntegrationTest.java          | 127 ++++++++++-
 .../apache/metron/rest/mock/MockPcapJob.java    | 106 ++++++---
 .../metron/rest/mock/MockPcapJobSupplier.java   |  36 +++
 .../rest/service/impl/PcapServiceImplTest.java  | 217 +++++++++++++------
 .../common/configuration/ConfigOption.java      |  12 +-
 .../apache/metron/job/JobNotFoundException.java |  30 +++
 .../apache/metron/job/RuntimeJobException.java  |  30 +++
 .../metron/job/manager/InMemoryJobManager.java  |  11 +-
 .../org/apache/metron/pcap/query/PcapCli.java   |  12 -
 .../PcapTopologyIntegrationTest.java            |   6 +-
 .../apache/metron/pcap/query/PcapCliTest.java   |   2 -
 .../apache/metron/pcap/config/PcapOptions.java  |   2 +
 .../metron/pcap/finalizer/PcapCliFinalizer.java |   4 +-
 .../metron/pcap/finalizer/PcapFinalizer.java    |   8 +-
 .../pcap/finalizer/PcapRestFinalizer.java       |  22 +-
 .../java/org/apache/metron/pcap/mr/PcapJob.java |  18 +-
 28 files changed, 882 insertions(+), 294 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/pcap/FixedPcapOptions.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/pcap/FixedPcapOptions.java b/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/pcap/FixedPcapOptions.java
new file mode 100644
index 0000000..5e77005
--- /dev/null
+++ b/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/pcap/FixedPcapOptions.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.rest.model.pcap;
+
+import org.apache.metron.common.configuration.ConfigOption;
+
+public enum FixedPcapOptions implements ConfigOption {
+  IP_SRC_ADDR("ipSrcAddr"),
+  IP_DST_ADDR("ipDstAddr"),
+  IP_SRC_PORT("ipSrcPort"),
+  IP_DST_PORT("ipDstPort"),
+  PROTOCOL("protocol"),
+  PACKET_FILTER("packetFilter"),
+  INCLUDE_REVERSE("includeReverse")
+  ;
+
+  String key;
+
+  FixedPcapOptions(String key) {
+    this.key = key;
+  }
+
+  @Override
+  public String getKey() {
+    return key;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/pcap/FixedPcapRequest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/pcap/FixedPcapRequest.java b/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/pcap/FixedPcapRequest.java
index 758340b..a2d345b 100644
--- a/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/pcap/FixedPcapRequest.java
+++ b/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/pcap/FixedPcapRequest.java
@@ -17,70 +17,100 @@
  */
 package org.apache.metron.rest.model.pcap;
 
+import org.apache.metron.common.Constants;
+import org.apache.metron.pcap.config.PcapOptions;
+import org.apache.metron.pcap.PcapHelper;
+import org.apache.metron.pcap.filter.fixed.FixedPcapFilter;
+
+import java.util.HashMap;
+import java.util.Map;
+
 public class FixedPcapRequest extends PcapRequest {
 
-  private String ipSrcAddr;
-  private String ipDstAddr;
-  private Integer ipSrcPort;
-  private Integer ipDstPort;
-  private String protocol;
-  private String packetFilter;
-  private Boolean includeReverse = false;
+  public FixedPcapRequest() {
+    PcapOptions.FILTER_IMPL.put(this, new FixedPcapFilter.Configurator());
+  }
 
   public String getIpSrcAddr() {
-    return ipSrcAddr;
+    return FixedPcapOptions.IP_SRC_ADDR.get(this, String.class);
   }
 
   public void setIpSrcAddr(String ipSrcAddr) {
-    this.ipSrcAddr = ipSrcAddr;
+    FixedPcapOptions.IP_SRC_ADDR.put(this, ipSrcAddr);
   }
 
   public String getIpDstAddr() {
-    return ipDstAddr;
+    return FixedPcapOptions.IP_DST_ADDR.get(this, String.class);
   }
 
   public void setIpDstAddr(String ipDstAddr) {
-    this.ipDstAddr = ipDstAddr;
+    FixedPcapOptions.IP_DST_ADDR.put(this, ipDstAddr);
   }
 
   public Integer getIpSrcPort() {
-    return ipSrcPort;
+    return FixedPcapOptions.IP_SRC_PORT.get(this, Integer.class);
   }
 
   public void setIpSrcPort(Integer ipSrcPort) {
-    this.ipSrcPort = ipSrcPort;
+    FixedPcapOptions.IP_SRC_PORT.put(this, ipSrcPort);
   }
 
   public Integer getIpDstPort() {
-    return ipDstPort;
+    return FixedPcapOptions.IP_DST_PORT.get(this, Integer.class);
   }
 
   public void setIpDstPort(Integer ipDstPort) {
-    this.ipDstPort = ipDstPort;
+    FixedPcapOptions.IP_DST_PORT.put(this, ipDstPort);
   }
 
   public String getProtocol() {
-    return protocol;
+    return FixedPcapOptions.PROTOCOL.get(this, String.class);
   }
 
   public void setProtocol(String protocol) {
-    this.protocol = protocol;
+    FixedPcapOptions.PROTOCOL.put(this, protocol);
   }
 
   public String getPacketFilter() {
-    return packetFilter;
+    return FixedPcapOptions.PACKET_FILTER.get(this, String.class);
   }
 
   public void setPacketFilter(String packetFilter) {
-    this.packetFilter = packetFilter;
+    FixedPcapOptions.PACKET_FILTER.put(this, packetFilter);
   }
 
   public Boolean getIncludeReverse() {
-    return includeReverse;
+    return FixedPcapOptions.INCLUDE_REVERSE.get(this, Boolean.class);
   }
 
   public void setIncludeReverse(Boolean includeReverse) {
-    this.includeReverse = includeReverse;
+    FixedPcapOptions.INCLUDE_REVERSE.put(this, includeReverse);
+  }
+
+  public void setFields() {
+    Map<String, String> fields = new HashMap<>();
+    if (getIpSrcAddr() != null) {
+      fields.put(Constants.Fields.SRC_ADDR.getName(), getIpSrcAddr());
+    }
+    if (getIpDstAddr() != null) {
+      fields.put(Constants.Fields.DST_ADDR.getName(), getIpDstAddr());
+    }
+    if (getIpSrcPort() != null) {
+      fields.put(Constants.Fields.SRC_PORT.getName(), getIpSrcPort().toString());
+    }
+    if (getIpDstPort() != null) {
+      fields.put(Constants.Fields.DST_PORT.getName(), getIpDstPort().toString());
+    }
+    if (getProtocol() != null) {
+      fields.put(Constants.Fields.PROTOCOL.getName(), getProtocol());
+    }
+    if (getIncludeReverse() != null) {
+      fields.put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), getIncludeReverse().toString());
+    }
+    if (getPacketFilter() != null) {
+      fields.put(PcapHelper.PacketFields.PACKET_FILTER.getName(), getPacketFilter());
+    }
+    PcapOptions.FIELDS.put(this, fields);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/pcap/PcapRequest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/pcap/PcapRequest.java b/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/pcap/PcapRequest.java
index 5941d17..64ed932 100644
--- a/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/pcap/PcapRequest.java
+++ b/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/pcap/PcapRequest.java
@@ -17,48 +17,57 @@
  */
 package org.apache.metron.rest.model.pcap;
 
-// TODO reconcile with pcapmrjob
-
 import org.apache.commons.collections4.map.AbstractMapDecorator;
 import org.apache.metron.pcap.config.PcapOptions;
 
+import java.util.HashMap;
+
 public class PcapRequest extends AbstractMapDecorator<String, Object> {
 
   public PcapRequest() {
-    setStartTime(0L);
-    setEndTime(System.currentTimeMillis());
-    setNumReducers(1);
+    super(new HashMap<>());
+    setStartTimeMs(0L);
+    setEndTimeMs(System.currentTimeMillis());
+    setNumReducers(10);
+  }
+
+  public String getBasePath() {
+    return PcapOptions.BASE_PATH.get(this, String.class);
+  }
+
+  public void setBasePath(String basePath) {
+    PcapOptions.BASE_PATH.put(this, basePath);
   }
 
-  public String getBaseOutputPath() {
+  public String getBaseInterimResultPath() {
     return PcapOptions.BASE_INTERIM_RESULT_PATH.get(this, String.class);
   }
 
-  public void setBaseOutputPath(String baseOutputPath) {
-    PcapOptions.BASE_INTERIM_RESULT_PATH.put(this, baseOutputPath);
+  public void setBaseInterimResultPath(String baseInterimResultPath) {
+    PcapOptions.BASE_INTERIM_RESULT_PATH.put(this, baseInterimResultPath);
   }
 
-  public String getBasePath() {
-    return PcapOptions.BASE_PATH.get(this, String.class);
+  public String getFinalOutputPath() {
+    return PcapOptions.FINAL_OUTPUT_PATH.get(this, String.class);
   }
 
-  public void setBasePath(String basePath) {
-    PcapOptions.BASE_PATH.put(this, basePath);
+  public void setFinalOutputPath(String finalOutputPath) {
+    PcapOptions.FINAL_OUTPUT_PATH.put(this, finalOutputPath);
   }
 
-  public Long getStartTime() {
+  public Long getStartTimeMs() {
     return PcapOptions.START_TIME_MS.get(this, Long.class);
   }
 
-  public void setStartTime(Long startTime) {
+  public void setStartTimeMs(Long startTime) {
     PcapOptions.START_TIME_MS.put(this, startTime);
   }
 
-  public Long getEndTime() {
+  public Long getEndTimeMs() {
     return PcapOptions.END_TIME_MS.get(this, Long.class);
   }
 
-  public void setEndTime(Long endTime) {
+  public void setEndTimeMs(Long endTime) {
     PcapOptions.END_TIME_MS.put(this, endTime);
   }
 
@@ -69,28 +78,4 @@ public class PcapRequest extends AbstractMapDecorator<String, Object> {
   public void setNumReducers(Integer numReducers) {
     PcapOptions.NUM_REDUCERS.put(this, numReducers);
   }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
-
-    PcapRequest pcapRequest = (PcapRequest) o;
-
-    return (getBaseOutputPath() != null ? getBaseOutputPath().equals(pcapRequest.getBaseOutputPath()) : pcapRequest.getBaseOutputPath() != null) &&
-            (getBasePath() != null ? getBasePath().equals(pcapRequest.getBasePath()) : pcapRequest.getBasePath() == null) &&
-            (getStartTime() != null ? getStartTime().equals(pcapRequest.getStartTime()) : pcapRequest.getStartTime() == null) &&
-            (getEndTime() != null ? getEndTime().equals(pcapRequest.getEndTime()) : pcapRequest.getEndTime() == null) &&
-            (getNumReducers() != null ? getNumReducers().equals(pcapRequest.getNumReducers()) : pcapRequest.getNumReducers() == null);
-  }
-
-  @Override
-  public int hashCode() {
-    int result = getBaseOutputPath() != null ? getBaseOutputPath().hashCode() : 0;
-    result = 31 * result + (getBasePath() != null ? getBasePath().hashCode() : 0);
-    result = 31 * result + (getStartTime() != null ? getStartTime().hashCode() : 0);
-    result = 31 * result + (getEndTime() != null ? getEndTime().hashCode() : 0);
-    result = 31 * result + (getNumReducers() != null ? getNumReducers().hashCode() : 0);
-    return result;
-  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/pcap/PcapStatus.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/pcap/PcapStatus.java b/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/pcap/PcapStatus.java
new file mode 100644
index 0000000..f004eb5
--- /dev/null
+++ b/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/pcap/PcapStatus.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.rest.model.pcap;
+
+public class PcapStatus {
+
+  private String jobId;
+  private String jobStatus;
+  private String description;
+  private Double percentComplete = 0.0;
+  private Integer pageTotal = 0;
+
+  public String getJobId() {
+    return jobId;
+  }
+
+  public void setJobId(String jobId) {
+    this.jobId = jobId;
+  }
+
+  public String getJobStatus() {
+    return jobStatus;
+  }
+
+  public void setJobStatus(String jobStatus) {
+    this.jobStatus = jobStatus;
+  }
+
+  public String getDescription() {
+    return description;
+  }
+
+  public void setDescription(String description) {
+    this.description = description;
+  }
+
+  public Double getPercentComplete() {
+    return percentComplete;
+  }
+
+  public void setPercentComplete(Double percentComplete) {
+    this.percentComplete = percentComplete;
+  }
+
+  public Integer getPageTotal() {
+    return pageTotal;
+  }
+
+  public void setPageTotal(Integer size) {
+    this.pageTotal = size;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    PcapStatus pcapStatus = (PcapStatus) o;
+
+    return (getJobId() != null ? getJobId().equals(pcapStatus.getJobId()) : pcapStatus.getJobId() != null) &&
+            (getJobStatus() != null ? getJobStatus().equals(pcapStatus.getJobStatus()) : pcapStatus.getJobStatus() != null) &&
+            (getDescription() != null ? getDescription().equals(pcapStatus.getDescription()) : pcapStatus.getDescription() != null) &&
+            (getPercentComplete() != null ? getPercentComplete().equals(pcapStatus.getPercentComplete()) : pcapStatus.getPercentComplete() != null) &&
+            (getPageTotal() != null ? getPageTotal().equals(pcapStatus.getPageTotal()) : pcapStatus.getPageTotal() != null);
+  }
+
+  @Override
+  public int hashCode() {
+    int result = (getJobId() != null ? getJobId().hashCode() : 0);
+    result = 31 * result + (getJobStatus() != null ? getJobStatus().hashCode() : 0);
+    result = 31 * result + (getDescription() != null ? getDescription().hashCode() : 0);
+    result = 31 * result + (getPercentComplete() != null ? getPercentComplete().hashCode() : 0);
+    result = 31 * result + (getPageTotal() != null ? getPageTotal().hashCode() : 0);
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java
index 0989d12..8e14e38 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java
@@ -75,6 +75,8 @@ public class MetronRestConstants {
 
   public static final String LOGGING_SYSTEM_PROPERTY = "org.springframework.boot.logging.LoggingSystem";
 
-  public static final String PCAP_INPUT_PATH_SPRING_PROPERTY = "pcap.input.path";
-  public static final String PCAP_OUTPUT_PATH_SPRING_PROPERTY = "pcap.output.path";
+  public static final String PCAP_BASE_PATH_SPRING_PROPERTY = "pcap.base.path";
+  public static final String PCAP_BASE_INTERIM_RESULT_PATH_SPRING_PROPERTY = "pcap.base.interim.result.path";
+  public static final String PCAP_FINAL_OUTPUT_PATH_SPRING_PROPERTY = "pcap.final.output.path";
+  public static final String PCAP_PAGE_SIZE_SPRING_PROPERTY = "pcap.page.size";
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/PcapConfig.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/PcapConfig.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/PcapConfig.java
index 8da5f96..a0b7f18 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/PcapConfig.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/PcapConfig.java
@@ -17,7 +17,8 @@
  */
 package org.apache.metron.rest.config;
 
-import org.apache.metron.pcap.mr.PcapJob;
+import org.apache.metron.job.manager.InMemoryJobManager;
+import org.apache.metron.job.manager.JobManager;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.context.annotation.Profile;
@@ -29,7 +30,14 @@ import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE;
 public class PcapConfig {
 
   @Bean
-  public PcapJob pcapJob() {
-    return new PcapJob();
+  public JobManager jobManager() {
+    return new InMemoryJobManager();
   }
+
+  @Bean
+  public PcapJobSupplier pcapJobSupplier() {
+    return new PcapJobSupplier();
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/PcapJobSupplier.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/PcapJobSupplier.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/PcapJobSupplier.java
new file mode 100644
index 0000000..1e79f6a
--- /dev/null
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/PcapJobSupplier.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.rest.config;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.metron.job.Finalizer;
+import org.apache.metron.job.JobException;
+import org.apache.metron.job.RuntimeJobException;
+import org.apache.metron.job.Statusable;
+import org.apache.metron.pcap.finalizer.PcapFinalizerStrategies;
+import org.apache.metron.pcap.finalizer.PcapRestFinalizer;
+import org.apache.metron.pcap.mr.PcapJob;
+import org.apache.metron.rest.model.pcap.PcapRequest;
+
+import java.util.function.Supplier;
+
+public class PcapJobSupplier implements Supplier<Statusable<Path>> {
+
+  private PcapRequest pcapRequest;
+
+  @Override
+  public Statusable<Path> get() {
+    try {
+      PcapJob<Path> pcapJob = createPcapJob();
+      return pcapJob.submit(PcapFinalizerStrategies.REST, pcapRequest);
+    } catch (JobException e) {
+      throw new RuntimeJobException(e.getMessage());
+    }
+  }
+
+  public void setPcapRequest(PcapRequest pcapRequest) {
+    this.pcapRequest = pcapRequest;
+  }
+
+  protected PcapJob createPcapJob() {
+    return new PcapJob();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/PcapController.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/PcapController.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/PcapController.java
index 3524a8c..38bffb4 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/PcapController.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/PcapController.java
@@ -21,10 +21,14 @@ import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiParam;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
+import org.apache.hadoop.fs.Path;
 import org.apache.metron.job.JobStatus;
+import org.apache.metron.job.Statusable;
 import org.apache.metron.rest.RestException;
 import org.apache.metron.rest.model.PcapResponse;
 import org.apache.metron.rest.model.pcap.FixedPcapRequest;
+import org.apache.metron.rest.model.pcap.PcapStatus;
+import org.apache.metron.rest.security.SecurityUtils;
 import org.apache.metron.rest.service.PcapService;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.http.HttpStatus;
@@ -33,8 +37,12 @@ import org.springframework.web.bind.annotation.PathVariable;
 import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RequestParam;
 import org.springframework.web.bind.annotation.RestController;
 
+import java.util.List;
+import java.util.Set;
+
 @RestController
 @RequestMapping("/api/v1/pcap")
 public class PcapController {
@@ -45,27 +53,23 @@ public class PcapController {
   @ApiOperation(value = "Executes a Fixed Pcap Query.")
   @ApiResponses(value = { @ApiResponse(message = "Returns a job status with job ID.", code = 200)})
   @RequestMapping(value = "/fixed", method = RequestMethod.POST)
-  ResponseEntity<JobStatus> fixed(@ApiParam(name="fixedPcapRequest", value="A Fixed Pcap Request"
+  ResponseEntity<PcapStatus> fixed(@ApiParam(name="fixedPcapRequest", value="A Fixed Pcap Request"
           + " which includes fixed filter fields like ip source address and protocol.", required=true)@RequestBody FixedPcapRequest fixedPcapRequest) throws RestException {
-    JobStatus jobStatus = pcapQueryService.fixed(fixedPcapRequest);
-    return new ResponseEntity<>(jobStatus, HttpStatus.OK);
+    PcapStatus pcapStatus = pcapQueryService.fixed(SecurityUtils.getCurrentUser(), fixedPcapRequest);
+    return new ResponseEntity<>(pcapStatus, HttpStatus.OK);
   }
 
   @ApiOperation(value = "Gets job status for running job.")
   @ApiResponses(value = { @ApiResponse(message = "Returns a job status for the passed job.", code = 200)})
-  @RequestMapping(value = "/getStatus", method = RequestMethod.GET)
-  ResponseEntity<JobStatus> getStatus(@ApiParam(name="jobId", value="Job ID of submitted job"
+  @RequestMapping(value = "/{jobId}", method = RequestMethod.GET)
+  ResponseEntity<PcapStatus> getStatus(@ApiParam(name="jobId", value="Job ID of submitted job"
       + " which includes fixed filter fields like ip source address and protocol.", required=true)@PathVariable String jobId) throws RestException {
-    JobStatus jobStatus = pcapQueryService.getJobStatus("metron", jobId);
-    return new ResponseEntity<>(jobStatus, HttpStatus.OK);
-  }
+    PcapStatus jobStatus = pcapQueryService.getJobStatus(SecurityUtils.getCurrentUser(), jobId);
+    if (jobStatus != null) {
+      return new ResponseEntity<>(jobStatus, HttpStatus.OK);
+    } else {
+      return new ResponseEntity<>(HttpStatus.NOT_FOUND);
+    }
 
-  @ApiOperation(value = "Gets results of a pcap job.")
-  @ApiResponses(value = { @ApiResponse(message = "Returns a PcapResponse containing an array of pcaps.", code = 200)})
-  @RequestMapping(value = "/getPage", method = RequestMethod.GET)
-  ResponseEntity<PcapResponse> getPage(@ApiParam(name="fixedPcapRequest", value="Job ID of submitted job"
-      + " which includes fixed filter fields like ip source address and protocol.", required=true)@RequestBody String jobId, int pageNum) throws RestException {
-    PcapResponse pcapResponse = pcapQueryService.getPage("metron", jobId, pageNum);
-    return new ResponseEntity<>(pcapResponse, HttpStatus.OK);
   }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/PcapService.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/PcapService.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/PcapService.java
index ce8372c..603e013 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/PcapService.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/PcapService.java
@@ -18,10 +18,12 @@
 package org.apache.metron.rest.service;
 
 import org.apache.metron.rest.RestException;
-import org.apache.metron.rest.model.PcapResponse;
 import org.apache.metron.rest.model.pcap.FixedPcapRequest;
+import org.apache.metron.rest.model.pcap.PcapStatus;
 
 public interface PcapService {
 
-  PcapResponse fixed(FixedPcapRequest fixedPcapRequest) throws RestException;
+  PcapStatus fixed(String username, FixedPcapRequest fixedPcapRequest) throws RestException;
+
+  PcapStatus getJobStatus(String username, String jobId) throws RestException;
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/PcapServiceImpl.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/PcapServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/PcapServiceImpl.java
index 4dae1e5..218e9be 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/PcapServiceImpl.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/PcapServiceImpl.java
@@ -20,101 +20,107 @@ package org.apache.metron.rest.service.impl;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.metron.common.Constants;
-import org.apache.metron.common.hadoop.SequenceFileIterable;
-import org.apache.metron.common.utils.timestamp.TimestampConverters;
-import org.apache.metron.pcap.PcapHelper;
-import org.apache.metron.pcap.filter.fixed.FixedPcapFilter;
-import org.apache.metron.pcap.mr.PcapJob;
+import org.apache.metron.job.JobException;
+import org.apache.metron.job.JobNotFoundException;
+import org.apache.metron.job.JobStatus;
+import org.apache.metron.job.Pageable;
+import org.apache.metron.job.Statusable;
+import org.apache.metron.job.manager.JobManager;
+import org.apache.metron.pcap.config.PcapOptions;
+import org.apache.metron.pcap.finalizer.PcapRestFinalizer;
 import org.apache.metron.rest.MetronRestConstants;
 import org.apache.metron.rest.RestException;
-import org.apache.metron.rest.model.PcapResponse;
+import org.apache.metron.rest.config.PcapJobSupplier;
 import org.apache.metron.rest.model.pcap.FixedPcapRequest;
+import org.apache.metron.rest.model.pcap.PcapRequest;
+import org.apache.metron.rest.model.pcap.PcapStatus;
 import org.apache.metron.rest.service.PcapService;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.core.env.Environment;
 import org.springframework.stereotype.Service;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 
 @Service
 public class PcapServiceImpl implements PcapService {
 
   private Environment environment;
   private Configuration configuration;
-  private PcapJob pcapJob;
+  private PcapJobSupplier pcapJobSupplier;
+  private JobManager<Path> jobManager;
 
   @Autowired
-  public PcapServiceImpl(Environment environment, Configuration configuration, PcapJob pcapJob) {
+  public PcapServiceImpl(Environment environment, Configuration configuration, PcapJobSupplier pcapJobSupplier, JobManager<Path> jobManager) {
     this.environment = environment;
     this.configuration = configuration;
-    this.pcapJob = pcapJob;
+    this.pcapJobSupplier = pcapJobSupplier;
+    this.jobManager = jobManager;
   }
 
   @Override
-  public PcapResponse fixed(FixedPcapRequest fixedPcapRequest) throws RestException {
-    if (fixedPcapRequest.getBasePath() == null) {
-      fixedPcapRequest.setBasePath(environment.getProperty(MetronRestConstants.PCAP_INPUT_PATH_SPRING_PROPERTY));
-    }
-    if (fixedPcapRequest.getBaseOutputPath() == null) {
-      fixedPcapRequest.setBaseOutputPath(environment.getProperty(MetronRestConstants.PCAP_OUTPUT_PATH_SPRING_PROPERTY));
+  public PcapStatus fixed(String username, FixedPcapRequest fixedPcapRequest) throws RestException {
+    try {
+      setPcapOptions(username, fixedPcapRequest);
+      fixedPcapRequest.setFields();
+      pcapJobSupplier.setPcapRequest(fixedPcapRequest);
+      JobStatus jobStatus = jobManager.submit(pcapJobSupplier, username);
+      return jobStatusToPcapStatus(jobStatus);
+    } catch (IOException | JobException e) {
+      throw new RestException(e);
     }
-    PcapResponse response = new PcapResponse();
-    SequenceFileIterable results;
+  }
+
+  @Override
+  public PcapStatus getJobStatus(String username, String jobId) throws RestException {
+    PcapStatus pcapStatus = null;
     try {
-      results = pcapJob.query(
-              new Path(fixedPcapRequest.getBasePath()),
-              new Path(fixedPcapRequest.getBaseOutputPath()),
-              TimestampConverters.MILLISECONDS.toNanoseconds(fixedPcapRequest.getStartTime()),
-              TimestampConverters.MILLISECONDS.toNanoseconds(fixedPcapRequest.getEndTime()),
-              fixedPcapRequest.getNumReducers(),
-              getFixedFields(fixedPcapRequest),
-              configuration,
-              getFileSystem(),
-              new FixedPcapFilter.Configurator()
-      );
-      if (results != null) {
-        List<byte[]> pcaps = new ArrayList<>();
-        results.iterator().forEachRemaining(pcaps::add);
-        response.setPcaps(pcaps);
+      Statusable<Path> statusable = jobManager.getJob(username, jobId);
+      if (statusable != null) {
+        pcapStatus = jobStatusToPcapStatus(statusable.getStatus());
+        if (statusable.isDone()) {
+          Pageable<Path> pageable = statusable.get();
+          if (pageable != null) {
+            pcapStatus.setPageTotal(pageable.getSize());
+          }
+        }
       }
-    } catch (IOException | ClassNotFoundException | InterruptedException e) {
+    } catch (JobNotFoundException | InterruptedException e) {
+      // do nothing and return null pcapStatus
+    } catch (JobException e) {
       throw new RestException(e);
     }
-    return response;
+    return pcapStatus;
   }
 
-  protected Map<String, String> getFixedFields(FixedPcapRequest fixedPcapRequest) {
-    Map<String, String> fixedFields = new HashMap<>();
-    if (fixedPcapRequest.getIpSrcAddr() != null) {
-      fixedFields.put(Constants.Fields.SRC_ADDR.getName(), fixedPcapRequest.getIpSrcAddr());
-    }
-    if (fixedPcapRequest.getIpDstAddr() != null) {
-      fixedFields.put(Constants.Fields.DST_ADDR.getName(), fixedPcapRequest.getIpDstAddr());
-    }
-    if (fixedPcapRequest.getIpSrcPort() != null) {
-      fixedFields.put(Constants.Fields.SRC_PORT.getName(), fixedPcapRequest.getIpSrcPort().toString());
-    }
-    if (fixedPcapRequest.getIpDstPort() != null) {
-      fixedFields.put(Constants.Fields.DST_PORT.getName(), fixedPcapRequest.getIpDstPort().toString());
-    }
-    if (fixedPcapRequest.getProtocol() != null) {
-      fixedFields.put(Constants.Fields.PROTOCOL.getName(), fixedPcapRequest.getProtocol());
+  protected void setPcapOptions(String username, PcapRequest pcapRequest) throws IOException {
+    PcapOptions.JOB_NAME.put(pcapRequest, "jobName");
+    PcapOptions.USERNAME.put(pcapRequest, username);
+    PcapOptions.HADOOP_CONF.put(pcapRequest, configuration);
+    PcapOptions.FILESYSTEM.put(pcapRequest, getFileSystem());
+
+    if (pcapRequest.getBasePath() == null) {
+      pcapRequest.setBasePath(environment.getProperty(MetronRestConstants.PCAP_BASE_PATH_SPRING_PROPERTY));
     }
-    if (fixedPcapRequest.getIncludeReverse() != null) {
-      fixedFields.put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), fixedPcapRequest.getIncludeReverse().toString());
+    if (pcapRequest.getBaseInterimResultPath() == null) {
+      pcapRequest.setBaseInterimResultPath(environment.getProperty(MetronRestConstants.PCAP_BASE_INTERIM_RESULT_PATH_SPRING_PROPERTY));
     }
-    if (fixedPcapRequest.getPacketFilter() != null) {
-      fixedFields.put(PcapHelper.PacketFields.PACKET_FILTER.getName(), fixedPcapRequest.getPacketFilter());
+    if (pcapRequest.getFinalOutputPath() == null) {
+      pcapRequest.setFinalOutputPath(environment.getProperty(MetronRestConstants.PCAP_FINAL_OUTPUT_PATH_SPRING_PROPERTY));
     }
-    return fixedFields;
+
+    PcapOptions.NUM_RECORDS_PER_FILE.put(pcapRequest, Integer.parseInt(environment.getProperty(MetronRestConstants.PCAP_PAGE_SIZE_SPRING_PROPERTY)));
   }
 
   protected FileSystem getFileSystem() throws IOException {
     return FileSystem.get(configuration);
   }
+
+  protected PcapStatus jobStatusToPcapStatus(JobStatus jobStatus) {
+    PcapStatus pcapStatus = new PcapStatus();
+    pcapStatus.setJobId(jobStatus.getJobId());
+    pcapStatus.setJobStatus(jobStatus.getState().toString());
+    pcapStatus.setDescription(jobStatus.getDescription());
+    pcapStatus.setPercentComplete(jobStatus.getPercentComplete());
+    return pcapStatus;
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-interface/metron-rest/src/main/resources/application.yml
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/resources/application.yml b/metron-interface/metron-rest/src/main/resources/application.yml
index 10c2f50..5fd9d72 100644
--- a/metron-interface/metron-rest/src/main/resources/application.yml
+++ b/metron-interface/metron-rest/src/main/resources/application.yml
@@ -74,5 +74,7 @@ user:
     cf: cf
 
 pcap:
-  input.path: /apps/metron/pcap
-  output.path: /tmp
\ No newline at end of file
+  base.path: /apps/metron/pcap/input
+  base.interim.result.path: /apps/metron/pcap/interim
+  final.output.path: /apps/metron/pcap/output
+  page.size: 10
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java
index a9e70d2..486a7dc 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java
@@ -37,9 +37,12 @@ import org.apache.metron.integration.ComponentRunner;
 import org.apache.metron.integration.UnableToStartException;
 import org.apache.metron.integration.components.KafkaComponent;
 import org.apache.metron.integration.components.ZKServerComponent;
+import org.apache.metron.job.manager.InMemoryJobManager;
+import org.apache.metron.job.manager.JobManager;
 import org.apache.metron.pcap.mr.PcapJob;
 import org.apache.metron.rest.RestException;
 import org.apache.metron.rest.mock.MockPcapJob;
+import org.apache.metron.rest.mock.MockPcapJobSupplier;
 import org.apache.metron.rest.mock.MockStormCLIClientWrapper;
 import org.apache.metron.rest.mock.MockStormRestTemplate;
 import org.apache.metron.rest.service.impl.StormCLIWrapper;
@@ -189,7 +192,19 @@ public class TestConfig {
   }
 
   @Bean
-  public PcapJob mockPcapJob() {
+  public JobManager jobManager() {
+    return new InMemoryJobManager();
+  }
+
+  @Bean
+  public MockPcapJob mockPcapJob() {
     return new MockPcapJob();
   }
+
+  @Bean
+  public PcapJobSupplier pcapJobSupplier(MockPcapJob mockPcapJob) {
+    MockPcapJobSupplier mockPcapJobSupplier = new MockPcapJobSupplier();
+    mockPcapJobSupplier.setMockPcapJob(mockPcapJob);
+    return mockPcapJobSupplier;
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/PcapControllerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/PcapControllerIntegrationTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/PcapControllerIntegrationTest.java
index 5e4875a..462d83d 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/PcapControllerIntegrationTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/PcapControllerIntegrationTest.java
@@ -18,9 +18,13 @@
 package org.apache.metron.rest.controller;
 
 import org.adrianwalker.multilinestring.Multiline;
+import org.apache.hadoop.fs.Path;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.job.JobStatus;
+import org.apache.metron.job.Pageable;
 import org.apache.metron.pcap.PcapHelper;
+import org.apache.metron.pcap.PcapPages;
 import org.apache.metron.pcap.filter.fixed.FixedPcapFilter;
 import org.apache.metron.rest.mock.MockPcapJob;
 import org.apache.metron.rest.model.PcapResponse;
@@ -43,11 +47,14 @@ import java.util.List;
 import java.util.Map;
 
 import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE;
+import static org.hamcrest.Matchers.hasSize;
 import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.csrf;
 import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.httpBasic;
 import static org.springframework.security.test.web.servlet.setup.SecurityMockMvcConfigurers.springSecurity;
+import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
 import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post;
 import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content;
+import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath;
 import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
 
 @RunWith(SpringRunner.class)
@@ -57,23 +64,38 @@ public class PcapControllerIntegrationTest {
 
   /**
    {
-   "basePath": "/apps/metron/pcap",
-   "baseOutputPath": "/tmp",
-   "endTime": 10,
+   "basePath": "/base/path",
+   "baseInterimResultPath": "/base/interim/result/path",
+   "finalOutputPath": "/final/output/path",
+   "startTimeMs": 10,
+   "endTimeMs": 20,
+   "numReducers": 2,
    "includeReverse": "true",
    "ipDstAddr": "192.168.1.1",
    "ipDstPort": "1000",
    "ipSrcAddr": "192.168.1.2",
    "ipSrcPort": "2000",
-   "numReducers": 2,
    "packetFilter": "filter",
-   "protocol": "TCP",
-   "startTime": 1
+   "protocol": "TCP"
    }
    */
   @Multiline
   public static String fixedJson;
 
+  /**
+   {
+   "includeReverse": "true",
+   "ipDstAddr": "192.168.1.1",
+   "ipDstPort": "1000",
+   "ipSrcAddr": "192.168.1.2",
+   "ipSrcPort": "2000",
+   "packetFilter": "filter",
+   "protocol": "TCP"
+   }
+   */
+  @Multiline
+  public static String fixedWithDefaultsJson;
+
   @Autowired
   private PcapService pcapService;
 
@@ -84,6 +106,7 @@ public class PcapControllerIntegrationTest {
 
   private String pcapUrl = "/api/v1/pcap";
   private String user = "user";
+  private String user2 = "user2";
   private String password = "password";
 
   @Before
@@ -98,22 +121,24 @@ public class PcapControllerIntegrationTest {
   }
 
   @Test
-  public void testFixed() throws Exception {
+  public void testFixedRequest() throws Exception {
     MockPcapJob mockPcapJob = (MockPcapJob) wac.getBean("mockPcapJob");
     List<byte[]> results = Arrays.asList("pcap1".getBytes(), "pcap2".getBytes());
     mockPcapJob.setResults(results);
+    mockPcapJob.setStatus(new JobStatus().withState(JobStatus.State.RUNNING));
 
     PcapResponse expectedReponse = new PcapResponse();
     expectedReponse.setPcaps(results);
     this.mockMvc.perform(post(pcapUrl + "/fixed").with(httpBasic(user, password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(fixedJson))
             .andExpect(status().isOk())
             .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
-            .andExpect(content().json(JSONUtils.INSTANCE.toJSON(expectedReponse, false)));
+            .andExpect(jsonPath("$.jobStatus").value("RUNNING"));
 
-    Assert.assertEquals("/apps/metron/pcap", mockPcapJob.getBasePath());
-    Assert.assertEquals("/tmp", mockPcapJob.getBaseOutputPath());
-    Assert.assertEquals(1, mockPcapJob.getStartTime());
-    Assert.assertEquals(10, mockPcapJob.getEndTime());
+    Assert.assertEquals("/base/path", mockPcapJob.getBasePath());
+    Assert.assertEquals("/base/interim/result/path", mockPcapJob.getBaseInterrimResultPath());
+    Assert.assertEquals("/final/output/path", mockPcapJob.getFinalOutputPath());
+    Assert.assertEquals(10000000, mockPcapJob.getStartTimeNs());
+    Assert.assertEquals(20000000, mockPcapJob.getEndTimeNs());
     Assert.assertEquals(2, mockPcapJob.getNumReducers());
     Assert.assertTrue(mockPcapJob.getFilterImpl() instanceof FixedPcapFilter.Configurator);
     Map<String, String> actualFixedFields = mockPcapJob.getFixedFields();
@@ -124,6 +149,84 @@ public class PcapControllerIntegrationTest {
     Assert.assertEquals("true", actualFixedFields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName()));
     Assert.assertEquals("TCP", actualFixedFields.get(Constants.Fields.PROTOCOL.getName()));
     Assert.assertEquals("filter", actualFixedFields.get(PcapHelper.PacketFields.PACKET_FILTER.getName()));
+  }
+
+
+  @Test
+  public void testFixedRequestDefaults() throws Exception {
+    MockPcapJob mockPcapJob = (MockPcapJob) wac.getBean("mockPcapJob");
+    mockPcapJob.setStatus(new JobStatus().withState(JobStatus.State.RUNNING));
+    long beforeJobTime = System.currentTimeMillis();
+
+    this.mockMvc.perform(post(pcapUrl + "/fixed").with(httpBasic(user, password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(fixedWithDefaultsJson))
+            .andExpect(status().isOk())
+            .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
+            .andExpect(jsonPath("$.jobStatus").value("RUNNING"));
+
+    Assert.assertEquals("/apps/metron/pcap/input", mockPcapJob.getBasePath());
+    Assert.assertEquals("/apps/metron/pcap/interim", mockPcapJob.getBaseInterrimResultPath());
+    Assert.assertEquals("/apps/metron/pcap/output", mockPcapJob.getFinalOutputPath());
+    Assert.assertEquals(0, mockPcapJob.getStartTimeNs());
+    Assert.assertTrue(beforeJobTime < mockPcapJob.getEndTimeNs() / 1000000);
+    Assert.assertTrue(System.currentTimeMillis() > mockPcapJob.getEndTimeNs() / 1000000);
+    Assert.assertEquals(10, mockPcapJob.getNumReducers());
+    Assert.assertTrue(mockPcapJob.getFilterImpl() instanceof FixedPcapFilter.Configurator);
+    Map<String, String> actualFixedFields = mockPcapJob.getFixedFields();
+    Assert.assertEquals("192.168.1.2", actualFixedFields.get(Constants.Fields.SRC_ADDR.getName()));
+    Assert.assertEquals("2000", actualFixedFields.get(Constants.Fields.SRC_PORT.getName()));
+    Assert.assertEquals("192.168.1.1", actualFixedFields.get(Constants.Fields.DST_ADDR.getName()));
+    Assert.assertEquals("1000", actualFixedFields.get(Constants.Fields.DST_PORT.getName()));
+    Assert.assertEquals("true", actualFixedFields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName()));
+    Assert.assertEquals("TCP", actualFixedFields.get(Constants.Fields.PROTOCOL.getName()));
+    Assert.assertEquals("filter", actualFixedFields.get(PcapHelper.PacketFields.PACKET_FILTER.getName()));
+  }
+
+  @Test
+  public void testGetStatus() throws Exception {
+    MockPcapJob mockPcapJob = (MockPcapJob) wac.getBean("mockPcapJob");
+
+    this.mockMvc.perform(get(pcapUrl + "/jobId").with(httpBasic(user, password)))
+            .andExpect(status().isNotFound());
+
+    mockPcapJob.setStatus(new JobStatus().withJobId("jobId").withState(JobStatus.State.RUNNING));
+
+    this.mockMvc.perform(post(pcapUrl + "/fixed").with(httpBasic(user, password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(fixedJson))
+            .andExpect(status().isOk())
+            .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
+            .andExpect(jsonPath("$.jobId").value("jobId"))
+            .andExpect(jsonPath("$.jobStatus").value("RUNNING"));
+
+    mockPcapJob.setStatus(new JobStatus().withJobId("jobId").withState(JobStatus.State.SUCCEEDED));
+
+    Pageable<Path> pageable = new PcapPages(Arrays.asList(new Path("path1"), new Path("path1")));
+    mockPcapJob.setIsDone(true);
+    mockPcapJob.setPageable(pageable);
 
+    this.mockMvc.perform(get(pcapUrl + "/jobId").with(httpBasic(user, password)))
+            .andExpect(status().isOk())
+            .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
+            .andExpect(jsonPath("$.jobStatus").value("SUCCEEDED"))
+            .andExpect(jsonPath("$.pageTotal").value(2));
+
+    mockPcapJob.setStatus(new JobStatus().withJobId("jobId").withState(JobStatus.State.FINALIZING));
+
+    this.mockMvc.perform(get(pcapUrl + "/jobId").with(httpBasic(user, password)))
+            .andExpect(status().isOk())
+            .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
+            .andExpect(jsonPath("$.jobStatus").value("FINALIZING"));
+
+    mockPcapJob.setStatus(new JobStatus().withJobId("jobId").withState(JobStatus.State.FAILED));
+
+    this.mockMvc.perform(get(pcapUrl + "/jobId").with(httpBasic(user, password)))
+            .andExpect(status().isOk())
+            .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
+            .andExpect(jsonPath("$.jobStatus").value("FAILED"));
+
+    mockPcapJob.setStatus(new JobStatus().withJobId("jobId").withState(JobStatus.State.KILLED));
+
+    this.mockMvc.perform(get(pcapUrl + "/jobId").with(httpBasic(user, password)))
+            .andExpect(status().isOk())
+            .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
+            .andExpect(jsonPath("$.jobStatus").value("KILLED"));
   }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockPcapJob.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockPcapJob.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockPcapJob.java
index a7eca31..df65635 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockPcapJob.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockPcapJob.java
@@ -17,47 +17,79 @@
  */
 package org.apache.metron.rest.mock;
 
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.metron.common.hadoop.SequenceFileIterable;
+import org.apache.metron.job.Finalizer;
+import org.apache.metron.job.JobException;
+import org.apache.metron.job.JobStatus;
+import org.apache.metron.job.Pageable;
+import org.apache.metron.job.Statusable;
+import org.apache.metron.pcap.config.PcapOptions;
 import org.apache.metron.pcap.filter.PcapFilterConfigurator;
 import org.apache.metron.pcap.mr.PcapJob;
 
-public class MockPcapJob extends PcapJob {
+import java.util.List;
+import java.util.Map;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class MockPcapJob extends PcapJob<Path> {
 
   private String basePath;
-  private String baseOutputPath;
-  private long beginNS;
-  private long endNS;
+  private String baseInterrimResultPath;
+  private String finalOutputPath;
+  private long startTimeNs;
+  private long endTimeNs;
   private int numReducers;
   private Map<String, String> fixedFields;
   private PcapFilterConfigurator filterImpl;
+  private int recPerFile;
   private SequenceFileIterable sequenceFileIterable;
+  private Statusable<Path> statusable;
 
   public MockPcapJob() {
     sequenceFileIterable = mock(SequenceFileIterable.class);
+    statusable = mock(Statusable.class);
   }
 
-  @SuppressWarnings(value = "unchecked")
   @Override
-  public <T> SequenceFileIterable query(Path basePath, Path baseOutputPath, long beginNS, long endNS, int numReducers, T fields, Configuration conf, FileSystem fs, PcapFilterConfigurator<T> filterImpl) throws IOException, ClassNotFoundException, InterruptedException {
-    this.basePath = basePath.toString();
-    this.baseOutputPath = baseOutputPath.toString();
-    this.beginNS = beginNS;
-    this.endNS = endNS;
-    this.numReducers = numReducers;
+  public Statusable<Path> submit(Finalizer<Path> finalizer, Map<String, Object> configuration) throws JobException {
+    this.basePath = PcapOptions.BASE_PATH.get(configuration, String.class);
+    this.baseInterrimResultPath = PcapOptions.BASE_INTERIM_RESULT_PATH.get(configuration, String.class);
+    this.finalOutputPath = PcapOptions.FINAL_OUTPUT_PATH.get(configuration, String.class);
+    this.startTimeNs = PcapOptions.START_TIME_MS.get(configuration, Long.class) * 1000000;
+    this.endTimeNs = PcapOptions.END_TIME_MS.get(configuration, Long.class) * 1000000;
+    this.numReducers = PcapOptions.NUM_REDUCERS.get(configuration, Integer.class);
+    Object fields = PcapOptions.FIELDS.get(configuration, Object.class);
     if (fields instanceof Map) {
       this.fixedFields = (Map<String, String>) fields;
     }
-    this.filterImpl = filterImpl;
-    return sequenceFileIterable;
+    this.filterImpl = PcapOptions.FILTER_IMPL.get(configuration, PcapFilterConfigurator.class);
+    this.recPerFile = PcapOptions.NUM_RECORDS_PER_FILE.get(configuration, Integer.class);
+    return statusable;
+  }
+
+  @Override
+  public JobStatus getStatus() throws JobException {
+    return statusable.getStatus();
+  }
+
+  @Override
+  public Pageable<Path> get() throws JobException, InterruptedException {
+    return statusable.get();
+  }
+
+  public void setStatus(JobStatus jobStatus) throws JobException {
+    when(statusable.getStatus()).thenReturn(jobStatus);
+  }
+
+  public void setPageable(Pageable<Path> pageable) throws JobException, InterruptedException {
+    when(statusable.get()).thenReturn(pageable);
+  }
+
+  public void setIsDone(boolean isDone) {
+    when(statusable.isDone()).thenReturn(isDone);
   }
 
   public void setResults(List<byte[]> pcaps) {
@@ -68,16 +100,32 @@ public class MockPcapJob extends PcapJob {
     return basePath;
   }
 
-  public String getBaseOutputPath() {
-    return baseOutputPath;
+  public void setBasePath(String basePath) {
+    this.basePath = basePath;
+  }
+
+  public String getBaseInterrimResultPath() {
+    return baseInterrimResultPath;
   }
 
-  public long getStartTime() {
-    return beginNS / 1000000;
+  public void setBaseInterrimResultPath(String baseInterrimResultPath) {
+    this.baseInterrimResultPath = baseInterrimResultPath;
   }
 
-  public long getEndTime() {
-    return endNS / 1000000;
+  public String getFinalOutputPath() {
+    return finalOutputPath;
+  }
+
+  public void setFinalOutputPath(String finalOutputPath) {
+    this.finalOutputPath = finalOutputPath;
+  }
+
+  public long getStartTimeNs() {
+    return startTimeNs;
+  }
+
+  public long getEndTimeNs() {
+    return endTimeNs;
   }
 
   public int getNumReducers() {
@@ -91,4 +139,8 @@ public class MockPcapJob extends PcapJob {
   public PcapFilterConfigurator getFilterImpl() {
     return filterImpl;
   }
+
+  public int getRecPerFile() {
+    return recPerFile;
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockPcapJobSupplier.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockPcapJobSupplier.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockPcapJobSupplier.java
new file mode 100644
index 0000000..9a1ac7f
--- /dev/null
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockPcapJobSupplier.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.rest.mock;
+
+import org.apache.metron.pcap.mr.PcapJob;
+import org.apache.metron.rest.config.PcapJobSupplier;
+
+public class MockPcapJobSupplier extends PcapJobSupplier {
+
+  private MockPcapJob mockPcapJob = new MockPcapJob();
+
+  @Override
+  protected PcapJob createPcapJob() {
+    return mockPcapJob;
+  }
+
+  public void setMockPcapJob(MockPcapJob mockPcapJob) {
+    this.mockPcapJob = mockPcapJob;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/PcapServiceImplTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/PcapServiceImplTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/PcapServiceImplTest.java
index 1a11c79..2b6bea3 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/PcapServiceImplTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/PcapServiceImplTest.java
@@ -17,12 +17,23 @@
  */
 package org.apache.metron.rest.service.impl;
 
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
 import org.apache.hadoop.conf.Configuration;
-import org.apache.metron.pcap.mr.PcapJob;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.metron.common.Constants;
+import org.apache.metron.job.JobException;
+import org.apache.metron.job.JobStatus;
+import org.apache.metron.job.Pageable;
+import org.apache.metron.job.manager.InMemoryJobManager;
+import org.apache.metron.job.manager.JobManager;
+import org.apache.metron.pcap.PcapHelper;
+import org.apache.metron.pcap.filter.fixed.FixedPcapFilter;
 import org.apache.metron.rest.MetronRestConstants;
+import org.apache.metron.rest.RestException;
+import org.apache.metron.rest.config.PcapJobSupplier;
+import org.apache.metron.rest.mock.MockPcapJob;
+import org.apache.metron.rest.mock.MockPcapJobSupplier;
+import org.apache.metron.rest.model.pcap.FixedPcapRequest;
+import org.apache.metron.rest.model.pcap.PcapStatus;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
@@ -30,6 +41,15 @@ import org.junit.Test;
 import org.junit.rules.ExpectedException;
 import org.springframework.core.env.Environment;
 
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.doReturn;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
 @SuppressWarnings("ALL")
 public class PcapServiceImplTest {
   @Rule
@@ -37,32 +57,28 @@ public class PcapServiceImplTest {
 
   Environment environment;
   Configuration configuration;
-  PcapJob pcapJob;
+  MockPcapJobSupplier mockPcapJobSupplier;
 
   @Before
   public void setUp() throws Exception {
     environment = mock(Environment.class);
-    pcapJob = mock(PcapJob.class);
     configuration = mock(Configuration.class);
+    mockPcapJobSupplier = new MockPcapJobSupplier();
 
-    when(environment.getProperty(MetronRestConstants.PCAP_INPUT_PATH_SPRING_PROPERTY)).thenReturn("/input/path");
-    when(environment.getProperty(MetronRestConstants.PCAP_OUTPUT_PATH_SPRING_PROPERTY)).thenReturn("/output/path");
+    when(environment.getProperty(MetronRestConstants.PCAP_BASE_PATH_SPRING_PROPERTY)).thenReturn("/base/path");
+    when(environment.getProperty(MetronRestConstants.PCAP_BASE_INTERIM_RESULT_PATH_SPRING_PROPERTY)).thenReturn("/base/interim/result/path");
+    when(environment.getProperty(MetronRestConstants.PCAP_FINAL_OUTPUT_PATH_SPRING_PROPERTY)).thenReturn("/final/output/path");
+    when(environment.getProperty(MetronRestConstants.PCAP_PAGE_SIZE_SPRING_PROPERTY)).thenReturn("100");
   }
 
-  // TODO
-
-  @Test
-  public void placeholder() {
-    Assert.assertTrue(true);
-  }
-/*
   @Test
   public void fixedShouldProperlyCallPcapJobQuery() throws Exception {
     FixedPcapRequest fixedPcapRequest = new FixedPcapRequest();
-    fixedPcapRequest.setBaseOutputPath("baseOutputPath");
     fixedPcapRequest.setBasePath("basePath");
-    fixedPcapRequest.setStartTime(1L);
-    fixedPcapRequest.setEndTime(2L);
+    fixedPcapRequest.setBaseInterimResultPath("baseOutputPath");
+    fixedPcapRequest.setFinalOutputPath("finalOutputPath");
+    fixedPcapRequest.setStartTimeMs(1L);
+    fixedPcapRequest.setEndTimeMs(2L);
     fixedPcapRequest.setNumReducers(2);
     fixedPcapRequest.setIpSrcAddr("ip_src_addr");
     fixedPcapRequest.setIpDstAddr("ip_dst_addr");
@@ -71,10 +87,19 @@ public class PcapServiceImplTest {
     fixedPcapRequest.setProtocol("tcp");
     fixedPcapRequest.setPacketFilter("filter");
     fixedPcapRequest.setIncludeReverse(true);
+    MockPcapJob mockPcapJob = new MockPcapJob();
+    mockPcapJobSupplier.setMockPcapJob(mockPcapJob);
+    JobManager jobManager = new InMemoryJobManager<>();
 
-    PcapServiceImpl pcapService = spy(new PcapServiceImpl(environment, configuration, pcapJob));
+    PcapServiceImpl pcapService = spy(new PcapServiceImpl(environment, configuration, mockPcapJobSupplier, jobManager));
     FileSystem fileSystem = mock(FileSystem.class);
     doReturn(fileSystem).when(pcapService).getFileSystem();
+    mockPcapJob.setStatus(new JobStatus()
+            .withJobId("jobId")
+            .withDescription("description")
+            .withPercentComplete(0L)
+            .withState(JobStatus.State.RUNNING));
+
     Map<String, String> expectedFields = new HashMap<String, String>() {{
       put(Constants.Fields.SRC_ADDR.getName(), "ip_src_addr");
       put(Constants.Fields.DST_ADDR.getName(), "ip_dst_addr");
@@ -84,72 +109,128 @@ public class PcapServiceImplTest {
       put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "true");
       put(PcapHelper.PacketFields.PACKET_FILTER.getName(), "filter");
     }};
-    List<byte[]> expectedPcaps = Arrays.asList("pcap1".getBytes(), "pcap2".getBytes());
-    SequenceFileIterable results = mock(SequenceFileIterable.class);
-    when(results.iterator()).thenReturn(expectedPcaps.iterator());
-    when(pcapJob.query(eq(new Path("basePath")),
-            eq(new Path("baseOutputPath")),
-            eq(1000000L),
-            eq(2000000L),
-            eq(2),
-            eq(expectedFields),
-            eq(configuration),
-            any(FileSystem.class),
-            any(FixedPcapFilter.Configurator.class))).thenReturn(results);
-
-    PcapResponse pcapsResponse = pcapService.fixed(fixedPcapRequest);
-    Assert.assertEquals(expectedPcaps, pcapsResponse.getPcaps());
+    PcapStatus expectedPcapStatus = new PcapStatus();
+    expectedPcapStatus.setJobId("jobId");
+    expectedPcapStatus.setJobStatus(JobStatus.State.RUNNING.name());
+    expectedPcapStatus.setDescription("description");
+
+    Assert.assertEquals(expectedPcapStatus, pcapService.fixed("user", fixedPcapRequest));
+    Assert.assertEquals(expectedPcapStatus, pcapService.jobStatusToPcapStatus(jobManager.getJob("user", "jobId").getStatus()));
+    Assert.assertEquals("basePath", mockPcapJob.getBasePath());
+    Assert.assertEquals("baseOutputPath", mockPcapJob.getBaseInterrimResultPath());
+    Assert.assertEquals("finalOutputPath", mockPcapJob.getFinalOutputPath());
+    Assert.assertEquals(1000000, mockPcapJob.getStartTimeNs());
+    Assert.assertEquals(2000000, mockPcapJob.getEndTimeNs());
+    Assert.assertEquals(2, mockPcapJob.getNumReducers());
+    Assert.assertEquals(100, mockPcapJob.getRecPerFile());
+    Assert.assertTrue(mockPcapJob.getFilterImpl() instanceof FixedPcapFilter.Configurator);
+    Map<String, String> actualFixedFields = mockPcapJob.getFixedFields();
+    Assert.assertEquals("ip_src_addr", actualFixedFields.get(Constants.Fields.SRC_ADDR.getName()));
+    Assert.assertEquals("1000", actualFixedFields.get(Constants.Fields.SRC_PORT.getName()));
+    Assert.assertEquals("ip_dst_addr", actualFixedFields.get(Constants.Fields.DST_ADDR.getName()));
+    Assert.assertEquals("2000", actualFixedFields.get(Constants.Fields.DST_PORT.getName()));
+    Assert.assertEquals("true", actualFixedFields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName()));
+    Assert.assertEquals("tcp", actualFixedFields.get(Constants.Fields.PROTOCOL.getName()));
+    Assert.assertEquals("filter", actualFixedFields.get(PcapHelper.PacketFields.PACKET_FILTER.getName()));
   }
 
   @Test
   public void fixedShouldProperlyCallPcapJobQueryWithDefaults() throws Exception {
+    long beforeJobTime = System.currentTimeMillis();
+
     FixedPcapRequest fixedPcapRequest = new FixedPcapRequest();
+    MockPcapJob mockPcapJob = new MockPcapJob();
+    mockPcapJobSupplier.setMockPcapJob(mockPcapJob);
+    JobManager jobManager = new InMemoryJobManager<>();
 
-    PcapServiceImpl pcapService = spy(new PcapServiceImpl(environment, configuration, pcapJob));
+    PcapServiceImpl pcapService = spy(new PcapServiceImpl(environment, configuration, mockPcapJobSupplier, jobManager));
     FileSystem fileSystem = mock(FileSystem.class);
     doReturn(fileSystem).when(pcapService).getFileSystem();
-    Map<String, String> expectedFields = new HashMap<String, String>() {{
-      put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "false");
-    }};
-    List<byte[]> expectedPcaps = Arrays.asList("pcap1".getBytes(), "pcap2".getBytes());
-    SequenceFileIterable results = mock(SequenceFileIterable.class);
-    when(results.iterator()).thenReturn(expectedPcaps.iterator());
-    when(pcapJob.query(eq(new Path("/input/path")),
-            eq(new Path("/output/path")),
-            eq(0L),
-            eq(fixedPcapRequest.getEndTime() * 1000000),
-            eq(1),
-            eq(expectedFields),
-            eq(configuration),
-            any(FileSystem.class),
-            any(FixedPcapFilter.Configurator.class))).thenReturn(results);
-
-    PcapResponse pcapsResponse = pcapService.fixed(fixedPcapRequest);
-    Assert.assertEquals(expectedPcaps, pcapsResponse.getPcaps());
+    mockPcapJob.setStatus(new JobStatus()
+            .withJobId("jobId")
+            .withDescription("description")
+            .withPercentComplete(0L)
+            .withState(JobStatus.State.RUNNING));
+
+    PcapStatus expectedPcapStatus = new PcapStatus();
+    expectedPcapStatus.setJobId("jobId");
+    expectedPcapStatus.setJobStatus(JobStatus.State.RUNNING.name());
+    expectedPcapStatus.setDescription("description");
+
+    Assert.assertEquals(expectedPcapStatus, pcapService.fixed("user", fixedPcapRequest));
+    Assert.assertEquals("/base/path", mockPcapJob.getBasePath());
+    Assert.assertEquals("/base/interim/result/path", mockPcapJob.getBaseInterrimResultPath());
+    Assert.assertEquals("/final/output/path", mockPcapJob.getFinalOutputPath());
+    Assert.assertEquals(0, mockPcapJob.getStartTimeNs());
+    Assert.assertTrue(beforeJobTime <= mockPcapJob.getEndTimeNs() / 1000000);
+    Assert.assertTrue(System.currentTimeMillis() >= mockPcapJob.getEndTimeNs() / 1000000);
+    Assert.assertEquals(10, mockPcapJob.getNumReducers());
+    Assert.assertEquals(100, mockPcapJob.getRecPerFile());
+    Assert.assertTrue(mockPcapJob.getFilterImpl() instanceof FixedPcapFilter.Configurator);
+    Assert.assertEquals(new HashMap<>(), mockPcapJob.getFixedFields());
   }
 
   @Test
   public void fixedShouldThrowRestException() throws Exception {
     exception.expect(RestException.class);
-    exception.expectMessage("some exception");
+    exception.expectMessage("some job exception");
 
     FixedPcapRequest fixedPcapRequest = new FixedPcapRequest();
-
-    PcapServiceImpl pcapService = spy(new PcapServiceImpl(environment, configuration, pcapJob));
+    JobManager jobManager = mock(JobManager.class);
+    PcapJobSupplier pcapJobSupplier = new PcapJobSupplier();
+    PcapServiceImpl pcapService = spy(new PcapServiceImpl(environment, configuration, pcapJobSupplier, jobManager));
     FileSystem fileSystem = mock(FileSystem.class);
     doReturn(fileSystem).when(pcapService).getFileSystem();
+    when(jobManager.submit(pcapJobSupplier, "user")).thenThrow(new JobException("some job exception"));
+
+    pcapService.fixed("user", fixedPcapRequest);
+  }
+
+  @Test
+  public void getStatusShouldProperlyReturnStatus() throws Exception {
+    MockPcapJob mockPcapJob = mock(MockPcapJob.class);
+    JobManager jobManager = mock(JobManager.class);
+    JobStatus actualJobStatus = new JobStatus()
+            .withJobId("jobId")
+            .withState(JobStatus.State.SUCCEEDED)
+            .withDescription("description")
+            .withPercentComplete(100.0);
+    Pageable pageable = mock(Pageable.class);
+    when(pageable.getSize()).thenReturn(2);
+    when(mockPcapJob.getStatus()).thenReturn(actualJobStatus);
+    when(mockPcapJob.isDone()).thenReturn(true);
+    when(mockPcapJob.get()).thenReturn(pageable);
+    when(jobManager.getJob("user", "jobId")).thenReturn(mockPcapJob);
+
+    PcapServiceImpl pcapService = new PcapServiceImpl(environment, configuration, mockPcapJobSupplier, jobManager);
+    PcapStatus expectedPcapStatus = new PcapStatus();
+    expectedPcapStatus.setJobId("jobId");
+    expectedPcapStatus.setJobStatus(JobStatus.State.SUCCEEDED.name());
+    expectedPcapStatus.setDescription("description");
+    expectedPcapStatus.setPercentComplete(100.0);
+    expectedPcapStatus.setPageTotal(2);
+
+    Assert.assertEquals(expectedPcapStatus, pcapService.getJobStatus("user", "jobId"));
+  }
+
+  @Test
+  public void getStatusShouldReturnNullOnMissingStatus() throws Exception {
+    JobManager jobManager = new InMemoryJobManager();
+    PcapServiceImpl pcapService = new PcapServiceImpl(environment, configuration, new PcapJobSupplier(), jobManager);
 
-    when(pcapJob.query(any(),
-            any(),
-            eq(0L),
-            eq(fixedPcapRequest.getEndTime() * 1000000),
-            eq(1),
-            any(),
-            any(),
-            any(FileSystem.class),
-            any(FixedPcapFilter.Configurator.class))).thenThrow(new IOException("some exception"));
-
-    pcapService.fixed(fixedPcapRequest);
+    Assert.assertNull(pcapService.getJobStatus("user", "jobId"));
   }
-  */
+
+  @Test
+  public void getStatusShouldThrowRestException() throws Exception {
+    exception.expect(RestException.class);
+    exception.expectMessage("some job exception");
+
+    JobManager jobManager = mock(JobManager.class);
+    when(jobManager.getJob("user", "jobId")).thenThrow(new JobException("some job exception"));
+
+    PcapServiceImpl pcapService = new PcapServiceImpl(environment, configuration, new PcapJobSupplier(), jobManager);
+    pcapService.getJobStatus("user", "jobId");
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigOption.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigOption.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigOption.java
index 473664c..8e4211b 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigOption.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigOption.java
@@ -18,6 +18,8 @@
 
 package org.apache.metron.common.configuration;
 
+import org.apache.metron.stellar.common.utils.ConversionUtils;
+
 import java.util.Map;
 import java.util.function.BiFunction;
 
@@ -32,11 +34,17 @@ public interface ConfigOption {
   }
 
   default <T> T get(Map<String, Object> map, Class<T> clazz) {
-    return clazz.cast(map.get(getKey()));
+    Object obj = map.get(getKey());
+    if(clazz.isInstance(obj)) {
+      return clazz.cast(obj);
+    }
+    else {
+      return ConversionUtils.convert(obj, clazz);
+    }
   }
 
   default <T> T get(Map<String, Object> map, BiFunction<String, Object, T> transform, Class<T> clazz) {
-    return clazz.cast(map.get(getKey()));
+    return clazz.cast(transform.apply(getKey(), map.get(getKey())));
   }
 
   default <T> T getTransformed(Map<String, Object> map, Class<T> clazz) {

http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobNotFoundException.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobNotFoundException.java b/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobNotFoundException.java
new file mode 100644
index 0000000..6a677bf
--- /dev/null
+++ b/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobNotFoundException.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.job;
+
+public class JobNotFoundException extends JobException {
+
+  public JobNotFoundException(String message) {
+    super(message);
+  }
+
+  public JobNotFoundException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-platform/metron-job/src/main/java/org/apache/metron/job/RuntimeJobException.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-job/src/main/java/org/apache/metron/job/RuntimeJobException.java b/metron-platform/metron-job/src/main/java/org/apache/metron/job/RuntimeJobException.java
new file mode 100644
index 0000000..9013ef8
--- /dev/null
+++ b/metron-platform/metron-job/src/main/java/org/apache/metron/job/RuntimeJobException.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.job;
+
+public class RuntimeJobException extends RuntimeException {
+
+  public RuntimeJobException(String message) {
+    super(message);
+  }
+
+  public RuntimeJobException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-platform/metron-job/src/main/java/org/apache/metron/job/manager/InMemoryJobManager.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-job/src/main/java/org/apache/metron/job/manager/InMemoryJobManager.java b/metron-platform/metron-job/src/main/java/org/apache/metron/job/manager/InMemoryJobManager.java
index bf0baa7..1340aa5 100644
--- a/metron-platform/metron-job/src/main/java/org/apache/metron/job/manager/InMemoryJobManager.java
+++ b/metron-platform/metron-job/src/main/java/org/apache/metron/job/manager/InMemoryJobManager.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.function.Supplier;
 import org.apache.metron.job.JobException;
+import org.apache.metron.job.JobNotFoundException;
 import org.apache.metron.job.JobStatus;
 import org.apache.metron.job.Statusable;
 import org.slf4j.Logger;
@@ -52,7 +53,7 @@ public class InMemoryJobManager<PAGE_T> implements JobManager<PAGE_T> {
 
   @Override
   public JobStatus getStatus(String username, String jobId) throws JobException {
-    return jobs.get(username).get(jobId).getStatus();
+    return getJob(username, jobId).getStatus();
   }
 
   @Override
@@ -67,7 +68,11 @@ public class InMemoryJobManager<PAGE_T> implements JobManager<PAGE_T> {
 
   @Override
   public Statusable<PAGE_T> getJob(String username, String jobId) throws JobException {
-    return getUserJobs(username).get(jobId);
+    Map<String, Statusable<PAGE_T>> jobStatusables = getUserJobs(username);
+    if (jobStatusables.size() > 0 && jobStatusables.containsKey(jobId)) {
+      return jobStatusables.get(jobId);
+    }
+    throw new JobNotFoundException("Could not find job " + jobId + " for user " + username);
   }
 
   private Map<String, Statusable<PAGE_T>> getUserJobs(String username) {
@@ -76,7 +81,7 @@ public class InMemoryJobManager<PAGE_T> implements JobManager<PAGE_T> {
 
   @Override
   public List<Statusable<PAGE_T>> getJobs(String username) throws JobException {
-    return new ArrayList<Statusable<PAGE_T>>(getUserJobs(username).values());
+    return new ArrayList<>(getUserJobs(username).values());
   }
 
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java
index 3462921..1a23740 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java
@@ -98,12 +98,6 @@ public class PcapCli {
         fixedParser.printHelp();
         return 0;
       }
-      Pair<Long, Long> time = timeAsNanosecondsSinceEpoch(config.getStartTimeMs(), config.getEndTimeMs());
-      long startTime = time.getLeft();
-      long endTime = time.getRight();
-
-      PcapOptions.START_TIME_NS.put(commonConfig, startTime);
-      PcapOptions.END_TIME_NS.put(commonConfig, endTime);
       PcapOptions.FILTER_IMPL.put(commonConfig, new FixedPcapFilter.Configurator());
       PcapOptions.HADOOP_CONF.put(commonConfig, hadoopConf);
       try {
@@ -128,12 +122,6 @@ public class PcapCli {
         queryParser.printHelp();
         return 0;
       }
-      Pair<Long, Long> time = timeAsNanosecondsSinceEpoch(config.getStartTimeMs(), config.getEndTimeMs());
-      long startTime = time.getLeft();
-      long endTime = time.getRight();
-
-      PcapOptions.START_TIME_NS.put(commonConfig, startTime);
-      PcapOptions.END_TIME_NS.put(commonConfig, endTime);
       PcapOptions.FILTER_IMPL.put(commonConfig, new FixedPcapFilter.Configurator());
       PcapOptions.HADOOP_CONF.put(commonConfig, hadoopConf);
       try {

http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
index 9ea7912..0be33d6 100644
--- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
+++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
@@ -615,8 +615,10 @@ public class PcapTopologyIntegrationTest extends BaseIntegrationTest {
 
   private void waitForJob(Statusable statusable) throws Exception {
     for (int t = 0; t < MAX_RETRIES; ++t, Thread.sleep(SLEEP_MS)) {
-      if (statusable.isDone()) {
-        return;
+      if (!statusable.getStatus().getState().equals(JobStatus.State.RUNNING)) {
+        if (statusable.isDone()) {
+          return;
+        }
       }
     }
     throw new Exception("Job did not complete within " + (MAX_RETRIES * SLEEP_MS) + " seconds");

http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
index 763f0c6..c7d6fdf 100644
--- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
+++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
@@ -208,8 +208,6 @@ public class PcapCliTest {
     PcapOptions.NUM_REDUCERS.put(config, 10);
     PcapOptions.START_TIME_MS.put(config, startAsNanos / 1000000L); // needed bc defaults in config
     PcapOptions.END_TIME_MS.put(config, endAsNanos / 1000000L);  // needed bc defaults in config
-    PcapOptions.START_TIME_NS.put(config, startAsNanos);
-    PcapOptions.END_TIME_NS.put(config, endAsNanos);
     PcapOptions.NUM_RECORDS_PER_FILE.put(config, 1000);
 
     when(jobRunner.submit(isA(Finalizer.class), argThat(mapContaining(config)))).thenReturn(jobRunner);

http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapOptions.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapOptions.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapOptions.java
index 09effd4..3d7c4f6 100644
--- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapOptions.java
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapOptions.java
@@ -24,6 +24,8 @@ import org.apache.metron.common.configuration.ConfigOption;
 
 public enum PcapOptions implements ConfigOption {
   JOB_NAME("jobName"),
+  JOB_ID("jobId"),
+  USERNAME("username"),
   FINAL_FILENAME_PREFIX("finalFilenamePrefix"),
   BASE_PATH("basePath", (s, o) -> o == null ? null : new Path(o.toString())),
   INTERIM_RESULT_PATH("interimResultPath", (s, o) -> o == null ? null : new Path(o.toString())),

http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapCliFinalizer.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapCliFinalizer.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapCliFinalizer.java
index e032158..c379515 100644
--- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapCliFinalizer.java
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapCliFinalizer.java
@@ -36,10 +36,10 @@ public class PcapCliFinalizer extends PcapFinalizer {
   private static final String PCAP_CLI_FILENAME_FORMAT = "%s/pcap-data-%s+%04d.pcap";
 
   @Override
-  protected String getOutputFileName(Map<String, Object> config, int partition) {
+  protected Path getOutputPath(Map<String, Object> config, int partition) {
     Path finalOutputPath = PcapOptions.FINAL_OUTPUT_PATH.get(config, PcapOptions.STRING_TO_PATH, Path.class);
     String prefix = PcapOptions.FINAL_FILENAME_PREFIX.get(config, String.class);
-    return String.format(PCAP_CLI_FILENAME_FORMAT, finalOutputPath, prefix, partition);
+    return new Path(String.format(PCAP_CLI_FILENAME_FORMAT, finalOutputPath, prefix, partition));
   }
 
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizer.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizer.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizer.java
index d5ac675..2c55e15 100644
--- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizer.java
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizer.java
@@ -79,10 +79,10 @@ public abstract class PcapFinalizer implements Finalizer<Path> {
       int part = 1;
       if (partitions.iterator().hasNext()) {
         for (List<byte[]> data : partitions) {
-          String outFileName = getOutputFileName(config, part++);
+          Path outputPath = getOutputPath(config, part++);
           if (data.size() > 0) {
-            getResultsWriter().write(hadoopConfig, data, outFileName);
-            outFiles.add(new Path(outFileName));
+            getResultsWriter().write(hadoopConfig, data, outputPath.toUri().getPath());
+            outFiles.add(outputPath);
           }
         }
       } else {
@@ -100,7 +100,7 @@ public abstract class PcapFinalizer implements Finalizer<Path> {
     return new PcapPages(outFiles);
   }
 
-  protected abstract String getOutputFileName(Map<String, Object> config, int partition);
+  protected abstract Path getOutputPath(Map<String, Object> config, int partition);
 
   /**
    * Returns a lazily-read Iterable over a set of sequence files.


Mime
View raw message