metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nickal...@apache.org
Subject [28/50] [abbrv] metron git commit: METRON-1728: Handle null values in config in Pcap backend more gracefully (mmiklavc via mmiklavc) closes apache/metron#1151
Date Fri, 31 Aug 2018 19:20:24 GMT
METRON-1728: Handle null values in config in Pcap backend more gracefully (mmiklavc via mmiklavc)
closes apache/metron#1151


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/9064cca0
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/9064cca0
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/9064cca0

Branch: refs/remotes/apache/feature/METRON-1699-create-batch-profiler
Commit: 9064cca0317881176471c51abd16e99bf2ad7b10
Parents: 14dcb2d
Author: mmiklavc <michael.miklavcic@gmail.com>
Authored: Thu Aug 9 09:25:29 2018 -0600
Committer: Michael Miklavcic <michael.miklavcic@gmail.com>
Committed: Thu Aug 9 09:25:29 2018 -0600

----------------------------------------------------------------------
 .../common/configuration/ConfigOption.java      |  32 ++++--
 .../common/configuration/ConfigOptionTest.java  | 112 +++++++++++++++++++
 .../org/apache/metron/pcap/query/CliParser.java |  25 +++--
 .../org/apache/metron/pcap/PcapJobTest.java     |  23 ++++
 .../apache/metron/pcap/query/PcapCliTest.java   |  10 +-
 .../metron/pcap/config/PcapGlobalDefaults.java  |  28 +++++
 .../metron/pcap/finalizer/PcapFinalizer.java    |   8 +-
 .../pcap/finalizer/PcapRestFinalizer.java       |  11 +-
 .../java/org/apache/metron/pcap/mr/PcapJob.java |  25 +++--
 9 files changed, 237 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/9064cca0/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigOption.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigOption.java
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigOption.java
index 8e4211b..6308f0a 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigOption.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigOption.java
@@ -18,36 +18,54 @@
 
 package org.apache.metron.common.configuration;
 
-import org.apache.metron.stellar.common.utils.ConversionUtils;
-
 import java.util.Map;
 import java.util.function.BiFunction;
+import org.apache.metron.stellar.common.utils.ConversionUtils;
 
 public interface ConfigOption {
+
   String getKey();
+
   default BiFunction<String, Object, Object> transform() {
-    return (s,o) -> o;
+    return (s, o) -> o;
   }
 
   default void put(Map<String, Object> map, Object value) {
     map.put(getKey(), value);
   }
 
+  default <T> T getOrDefault(Map<String, Object> map, Class<T> clazz, T
defaultValue) {
+    T val;
+    return ((val = get(map, clazz)) == null ? defaultValue : val);
+  }
+
   default <T> T get(Map<String, Object> map, Class<T> clazz) {
     Object obj = map.get(getKey());
-    if(clazz.isInstance(obj)) {
+    if (clazz.isInstance(obj)) {
       return clazz.cast(obj);
-    }
-    else {
+    } else {
       return ConversionUtils.convert(obj, clazz);
     }
   }
 
-  default <T> T get(Map<String, Object> map, BiFunction<String, Object, T>
transform, Class<T> clazz) {
+  default <T> T getOrDefault(Map<String, Object> map, BiFunction<String, Object,
T> transform,
+      Class<T> clazz, T defaultValue) {
+    T val;
+    return ((val = get(map, transform, clazz)) == null ? defaultValue : val);
+  }
+
+  default <T> T get(Map<String, Object> map, BiFunction<String, Object, T>
transform,
+      Class<T> clazz) {
     return clazz.cast(transform.apply(getKey(), map.get(getKey())));
   }
 
+  default <T> T getTransformedOrDefault(Map<String, Object> map, Class<T>
clazz, T defaultValue) {
+    T val;
+    return ((val = getTransformed(map, clazz)) == null ? defaultValue : val);
+  }
+
   default <T> T getTransformed(Map<String, Object> map, Class<T> clazz)
{
     return clazz.cast(transform().apply(getKey(), map.get(getKey())));
   }
+
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/9064cca0/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/ConfigOptionTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/ConfigOptionTest.java
b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/ConfigOptionTest.java
new file mode 100644
index 0000000..95db080
--- /dev/null
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/ConfigOptionTest.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.configuration;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.BiFunction;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test the default interface methods
+ */
+public class ConfigOptionTest {
+
+  @Before
+  public void setup() {
+  }
+
+  @Test
+  public void gets_value_of_specified_type() {
+    ConfigOption option = newOption("foo");
+    Map<String, Object> config = new HashMap<>();
+    option.put(config, 25L);
+    assertThat(option.get(config, Long.class), equalTo(25L));
+    assertThat(option.get(mapWith("foo", 25L), Long.class), equalTo(25L));
+  }
+
+  @Test
+  public void gets_value_of_specified_type_with_transform() {
+    ConfigOption option = newOption("foo");
+    Map<String, Object> config = new HashMap<>();
+    option.put(config, "25");
+    BiFunction<String, Object, Long> transform = (s, o) -> o == null ? null
+        : new Long(o.toString());
+    assertThat(option.get(config, transform, Long.class), equalTo(25L));
+    assertThat(option.get(mapWith("foo", "25"), transform, Long.class), equalTo(25L));
+  }
+
+  @Test
+  public void gets_default_value_of_specified_type_with_transform() {
+    ConfigOption option = newOption("foo");
+    Map<String, Object> config = new HashMap<>();
+    option.put(config, null);
+    BiFunction<String, Object, Long> transform = (s, o) -> o == null ? null
+        : new Long(o.toString());
+    assertThat(option.getOrDefault(config, transform, Long.class, 25L), equalTo(25L));
+    assertThat(option.getOrDefault(mapWith("foo", null), transform, Long.class, 25L), equalTo(25L));
+  }
+
+  @Test
+  public void gets_default_when_null_value() {
+    ConfigOption option = newOption("foo");
+    Map<String, Object> config = new HashMap<>();
+    option.put(config, null);
+    assertThat(option.getOrDefault(config, Long.class, 0L), equalTo(0L));
+    assertThat(option.getOrDefault(mapWith("foo", null), Long.class, 0L), equalTo(0L));
+  }
+
+  @Test
+  public void gets_object_transformed_by_class_cast() {
+    ConfigOption option = newOption("foo");
+    Map<String, Object> config = new HashMap<>();
+    option.put(config, (Object) 25L);
+    assertThat(option.getTransformed(config, Long.class), equalTo(25L));
+    assertThat(option.getTransformed(mapWith("foo", (Object) 25L), Long.class), equalTo(25L));
+  }
+
+  @Test
+  public void gets_default_null_with_cast_when_null() {
+    ConfigOption option = newOption("foo");
+    Map<String, Object> config = new HashMap<>();
+    option.put(config, null);
+    assertThat(option.getTransformedOrDefault(config, Long.class, 25L), equalTo(25L));
+    assertThat(option.getTransformedOrDefault(mapWith("foo", null), Long.class, 25L), equalTo(25L));
+  }
+
+  private <K, V> Map<K, V> mapWith(K key, V val) {
+    Map<K, V> map = new HashMap<>();
+    map.put(key, val);
+    return map;
+  }
+
+  private ConfigOption newOption(final String key) {
+    return new ConfigOption() {
+      @Override
+      public String getKey() {
+        return key;
+      }
+    };
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/9064cca0/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java
b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java
index 4ad6ffa..2d15e8b 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java
@@ -18,17 +18,23 @@
 
 package org.apache.metron.pcap.query;
 
-import org.apache.commons.cli.*;
+import static org.apache.metron.pcap.config.PcapGlobalDefaults.BASE_INTERIM_RESULT_PATH_DEFAULT;
+import static org.apache.metron.pcap.config.PcapGlobalDefaults.BASE_INPUT_PATH_DEFAULT;
+import static org.apache.metron.pcap.config.PcapGlobalDefaults.NUM_RECORDS_PER_FILE_DEFAULT;
+import static org.apache.metron.pcap.config.PcapGlobalDefaults.NUM_REDUCERS_DEFAULT;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
 import org.apache.metron.pcap.config.PcapConfig;
 
 /**
  * Provides commmon required fields for the PCAP filter jobs
  */
 public class CliParser {
-  public static final String BASE_PATH_DEFAULT = "/apps/metron/pcap/input";
-  public static final String BASE_INTERIM_OUTPUT_PATH_DEFAULT = "/tmp";
-  public static final int NUM_REDUCERS_DEFAULT = 10;
-  public static final int NUM_RECORDS_PER_FILE_DEFAULT = 10000;
   private CommandLineParser parser;
   protected PcapConfig.PrefixStrategy prefixStrategy;
 
@@ -40,9 +46,10 @@ public class CliParser {
   public Options buildOptions() {
     Options options = new Options();
     options.addOption(newOption("h", "help", false, "Display help"));
-    options.addOption(newOption("bp", "base_path", true, String.format("Base PCAP data path.
Default is '%s'", BASE_PATH_DEFAULT)));
+    options.addOption(newOption("bp", "base_path", true, String.format("Base PCAP data path.
Default is '%s'",
+        BASE_INPUT_PATH_DEFAULT)));
     options.addOption(newOption("bop", "base_output_path", true, String.format("Query result
output path. Default is '%s'",
-        BASE_INTERIM_OUTPUT_PATH_DEFAULT)));
+        BASE_INTERIM_RESULT_PATH_DEFAULT)));
     options.addOption(newOption("st", "start_time", true, "(required) Packet start time range.",
true));
     options.addOption(newOption("nr", "num_reducers", true, String.format("Number of reducers
to use (defaults to %s)", NUM_REDUCERS_DEFAULT)));
     options.addOption(newOption("rpf", "records_per_file", true, String.format("Number of
records to include in each output pcap file (defaults to %s)", NUM_RECORDS_PER_FILE_DEFAULT)));
@@ -71,12 +78,12 @@ public class CliParser {
     if (commandLine.hasOption("base_path")) {
       config.setBasePath(commandLine.getOptionValue("base_path"));
     } else {
-      config.setBasePath(BASE_PATH_DEFAULT);
+      config.setBasePath(BASE_INPUT_PATH_DEFAULT);
     }
     if (commandLine.hasOption("base_output_path")) {
       config.setBaseInterimResultPath(commandLine.getOptionValue("base_output_path"));
     } else {
-      config.setBaseInterimResultPath(BASE_INTERIM_OUTPUT_PATH_DEFAULT);
+      config.setBaseInterimResultPath(BASE_INTERIM_RESULT_PATH_DEFAULT);
     }
     if (commandLine.hasOption("start_time")) {
       try {

http://git-wip-us.apache.org/repos/asf/metron/blob/9064cca0/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java
b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java
index 14963fd..796c8a5 100644
--- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java
+++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java
@@ -264,4 +264,27 @@ public class PcapJobTest {
     Assert.assertThat(status.getState(), equalTo(State.KILLED));
   }
 
+  @Test
+  public void handles_null_values_with_defaults() throws Exception {
+    PcapOptions.START_TIME_NS.put(config, null);
+    PcapOptions.END_TIME_NS.put(config, null);
+    PcapOptions.NUM_REDUCERS.put(config, null);
+    PcapOptions.NUM_RECORDS_PER_FILE.put(config, null);
+
+    pageableResult = new PcapPages(
+        Arrays.asList(new Path("1.txt"), new Path("2.txt"), new Path("3.txt")));
+    when(finalizer.finalizeJob(any())).thenReturn(pageableResult);
+    when(mrJob.isComplete()).thenReturn(true);
+    when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.SUCCEEDED);
+    when(mrJob.getStatus()).thenReturn(mrStatus);
+    Statusable<Path> statusable = testJob.submit(finalizer, config);
+    timer.updateJobStatus();
+    Pageable<Path> results = statusable.get();
+    Assert.assertThat(results.getSize(), equalTo(3));
+    JobStatus status = statusable.getStatus();
+    Assert.assertThat(status.getState(), equalTo(State.SUCCEEDED));
+    Assert.assertThat(status.getPercentComplete(), equalTo(100.0));
+    Assert.assertThat(status.getJobId(), equalTo(jobIdVal));
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/9064cca0/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
index c7d6fdf..96ca354 100644
--- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
+++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
@@ -17,6 +17,8 @@
  */
 package org.apache.metron.pcap.query;
 
+import static org.apache.metron.pcap.config.PcapGlobalDefaults.BASE_INTERIM_RESULT_PATH_DEFAULT;
+import static org.apache.metron.pcap.config.PcapGlobalDefaults.BASE_INPUT_PATH_DEFAULT;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Matchers.argThat;
@@ -91,8 +93,8 @@ public class PcapCliTest {
       put(PcapHelper.PacketFields.PACKET_FILTER.getName(), "`casey`");
     }};
     FixedPcapConfig config = new FixedPcapConfig(prefixStrategy);
-    PcapOptions.BASE_PATH.put(config, CliParser.BASE_PATH_DEFAULT);
-    PcapOptions.BASE_INTERIM_RESULT_PATH.put(config, CliParser.BASE_INTERIM_OUTPUT_PATH_DEFAULT);
+    PcapOptions.BASE_PATH.put(config, BASE_INPUT_PATH_DEFAULT);
+    PcapOptions.BASE_INTERIM_RESULT_PATH.put(config, BASE_INTERIM_RESULT_PATH_DEFAULT);
     PcapOptions.FIELDS.put(config, query);
     PcapOptions.NUM_REDUCERS.put(config, 10);
     PcapOptions.START_TIME_MS.put(config, 500L);
@@ -237,8 +239,8 @@ public class PcapCliTest {
 
     String query = "some query string";
     FixedPcapConfig config = new FixedPcapConfig(prefixStrategy);
-    PcapOptions.BASE_PATH.put(config, CliParser.BASE_PATH_DEFAULT);
-    PcapOptions.BASE_INTERIM_RESULT_PATH.put(config, CliParser.BASE_INTERIM_OUTPUT_PATH_DEFAULT);
+    PcapOptions.BASE_PATH.put(config, BASE_INPUT_PATH_DEFAULT);
+    PcapOptions.BASE_INTERIM_RESULT_PATH.put(config, BASE_INTERIM_RESULT_PATH_DEFAULT);
     PcapOptions.FIELDS.put(config, query);
     PcapOptions.NUM_REDUCERS.put(config, 10);
     PcapOptions.START_TIME_MS.put(config, 500L);

http://git-wip-us.apache.org/repos/asf/metron/blob/9064cca0/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapGlobalDefaults.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapGlobalDefaults.java
b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapGlobalDefaults.java
new file mode 100644
index 0000000..b8c674c
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapGlobalDefaults.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.pcap.config;
+
+public class PcapGlobalDefaults {
+  public static final String BASE_PCAP_PATH_DEFAULT = "/apps/metron/pcap";
+  public static final String BASE_INPUT_PATH_DEFAULT = BASE_PCAP_PATH_DEFAULT + "/input";
+  public static final String BASE_INTERIM_RESULT_PATH_DEFAULT = BASE_PCAP_PATH_DEFAULT +
"/interim";
+  public static final String FINAL_OUTPUT_PATH_DEFAULT = BASE_PCAP_PATH_DEFAULT + "/output";
+  public static final int NUM_REDUCERS_DEFAULT = 10;
+  public static final int NUM_RECORDS_PER_FILE_DEFAULT = 10000;
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/9064cca0/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizer.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizer.java
b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizer.java
index 8dcc401..5a61f9b 100644
--- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizer.java
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizer.java
@@ -18,6 +18,8 @@
 
 package org.apache.metron.pcap.finalizer;
 
+import static org.apache.metron.pcap.config.PcapGlobalDefaults.NUM_RECORDS_PER_FILE_DEFAULT;
+
 import com.google.common.collect.Iterables;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
@@ -62,9 +64,9 @@ public abstract class PcapFinalizer implements Finalizer<Path> {
   @Override
   public Pageable<Path> finalizeJob(Map<String, Object> config) throws JobException
{
     Configuration hadoopConfig = PcapOptions.HADOOP_CONF.get(config, Configuration.class);
-    int recPerFile = PcapOptions.NUM_RECORDS_PER_FILE.get(config, Integer.class);
-    Path interimResultPath = PcapOptions.INTERIM_RESULT_PATH
-        .get(config, PcapOptions.STRING_TO_PATH, Path.class);
+    int recPerFile = PcapOptions.NUM_RECORDS_PER_FILE
+        .getOrDefault(config, Integer.class, NUM_RECORDS_PER_FILE_DEFAULT);
+    Path interimResultPath = PcapOptions.INTERIM_RESULT_PATH.get(config, PcapOptions.STRING_TO_PATH,
Path.class);
     FileSystem fs = PcapOptions.FILESYSTEM.get(config, FileSystem.class);
 
     SequenceFileIterable interimResults = null;

http://git-wip-us.apache.org/repos/asf/metron/blob/9064cca0/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapRestFinalizer.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapRestFinalizer.java
b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapRestFinalizer.java
index 93a3222..13fa795 100644
--- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapRestFinalizer.java
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapRestFinalizer.java
@@ -18,14 +18,15 @@
 
 package org.apache.metron.pcap.finalizer;
 
+import static org.apache.metron.pcap.config.PcapGlobalDefaults.FINAL_OUTPUT_PATH_DEFAULT;
+
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.metron.job.Statusable;
 import org.apache.metron.pcap.config.PcapOptions;
-
-import java.util.Map;
 import org.apache.metron.pcap.writer.PcapResultsWriter;
 
 /**
@@ -45,10 +46,12 @@ public class PcapRestFinalizer extends PcapFinalizer {
 
   @Override
   protected Path getOutputPath(Map<String, Object> config, int partition) {
-    String finalOutputPath = PcapOptions.FINAL_OUTPUT_PATH.get(config, String.class);
+    String finalOutputPath = PcapOptions.FINAL_OUTPUT_PATH
+        .getOrDefault(config, String.class, FINAL_OUTPUT_PATH_DEFAULT);
     String user = PcapOptions.USERNAME.get(config, String.class);
     String jobId = PcapOptions.JOB_ID.get(config, String.class);
-    return new Path(String.format(PCAP_REST_FILEPATH_FORMAT, finalOutputPath, user, jobType,
jobId, partition));
+    return new Path(
+        String.format(PCAP_REST_FILEPATH_FORMAT, finalOutputPath, user, jobType, jobId, partition));
   }
 
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/9064cca0/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
index ea2aa29..23bd510 100644
--- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
@@ -20,6 +20,7 @@ package org.apache.metron.pcap.mr;
 
 import static org.apache.metron.pcap.PcapHelper.greaterThanOrEqualTo;
 import static org.apache.metron.pcap.PcapHelper.lessThanOrEqualTo;
+import static org.apache.metron.pcap.config.PcapGlobalDefaults.NUM_REDUCERS_DEFAULT;
 
 import com.google.common.base.Joiner;
 import java.io.IOException;
@@ -51,6 +52,7 @@ import org.apache.hadoop.mapreduce.Partitioner;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.metron.common.utils.timestamp.TimestampConverters;
 import org.apache.metron.job.Finalizer;
 import org.apache.metron.job.JobException;
 import org.apache.metron.job.JobStatus;
@@ -60,6 +62,7 @@ import org.apache.metron.job.Statusable;
 import org.apache.metron.pcap.PacketInfo;
 import org.apache.metron.pcap.PcapHelper;
 import org.apache.metron.pcap.PcapPages;
+import org.apache.metron.pcap.config.PcapGlobalDefaults;
 import org.apache.metron.pcap.config.PcapOptions;
 import org.apache.metron.pcap.filter.PcapFilter;
 import org.apache.metron.pcap.filter.PcapFilterConfigurator;
@@ -216,20 +219,22 @@ public class PcapJob<T> implements Statusable<Path> {
     Configuration hadoopConf = PcapOptions.HADOOP_CONF.get(configuration, Configuration.class);
     FileSystem fileSystem = PcapOptions.FILESYSTEM.get(configuration, FileSystem.class);
     Path basePath = PcapOptions.BASE_PATH.getTransformed(configuration, Path.class);
-    Path baseInterimResultPath = PcapOptions.BASE_INTERIM_RESULT_PATH.getTransformed(configuration,
Path.class);
-    long startTime;
+    Path baseInterimResultPath = PcapOptions.BASE_INTERIM_RESULT_PATH
+        .getTransformedOrDefault(configuration, Path.class,
+            new Path(PcapGlobalDefaults.BASE_INTERIM_RESULT_PATH_DEFAULT));
+    long startTimeNs;
     if (configuration.containsKey(PcapOptions.START_TIME_NS.getKey())) {
-      startTime = PcapOptions.START_TIME_NS.get(configuration, Long.class);
+      startTimeNs = PcapOptions.START_TIME_NS.getOrDefault(configuration, Long.class, 0L);
     } else {
-      startTime = PcapOptions.START_TIME_MS.get(configuration, Long.class) * 1000000;
+      startTimeNs = TimestampConverters.MILLISECONDS.toNanoseconds(PcapOptions.START_TIME_MS.getOrDefault(configuration,
Long.class, 0L));
     }
-    long endTime;
+    long endTimeNs;
     if (configuration.containsKey(PcapOptions.END_TIME_NS.getKey())) {
-      endTime = PcapOptions.END_TIME_NS.get(configuration, Long.class);
+      endTimeNs = PcapOptions.END_TIME_NS.getOrDefault(configuration, Long.class, TimestampConverters.MILLISECONDS.toNanoseconds(System.currentTimeMillis()));
     } else {
-      endTime = PcapOptions.END_TIME_MS.get(configuration, Long.class) * 1000000;
+      endTimeNs = TimestampConverters.MILLISECONDS.toNanoseconds(PcapOptions.END_TIME_MS.getOrDefault(configuration,
Long.class, System.currentTimeMillis()));
     }
-    int numReducers = PcapOptions.NUM_REDUCERS.get(configuration, Integer.class);
+    int numReducers = PcapOptions.NUM_REDUCERS.getOrDefault(configuration, Integer.class,
NUM_REDUCERS_DEFAULT);
     T fields = (T) PcapOptions.FIELDS.get(configuration, Object.class);
     PcapFilterConfigurator<T> filterImpl = PcapOptions.FILTER_IMPL.get(configuration,
PcapFilterConfigurator.class);
 
@@ -237,8 +242,8 @@ public class PcapJob<T> implements Statusable<Path> {
       Statusable<Path> statusable = query(jobName,
           basePath,
           baseInterimResultPath,
-          startTime,
-          endTime,
+          startTimeNs,
+          endTimeNs,
           numReducers,
           fields,
           // create a new copy for each job, bad things happen when hadoop config is reused


Mime
View raw message