metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From o...@apache.org
Subject [11/21] metron git commit: METRON-1378: Create a summarizer closes apache/incubator-metron#879
Date Thu, 25 Jan 2018 14:05:34 GMT
http://git-wip-us.apache.org/repos/asf/metron/blob/5d3e73ab/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/LocalSummarizer.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/LocalSummarizer.java
b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/LocalSummarizer.java
new file mode 100644
index 0000000..7042e86
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/LocalSummarizer.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.nonbulk.flatfile.importer;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.metron.common.utils.SerDeUtils;
+import org.apache.metron.dataloads.extractor.ExtractorCapabilities;
+import org.apache.metron.dataloads.extractor.ExtractorHandler;
+import org.apache.metron.dataloads.extractor.StatefulExtractor;
+import org.apache.metron.dataloads.nonbulk.flatfile.SummarizeOptions;
+import org.apache.metron.dataloads.nonbulk.flatfile.writer.InvalidWriterOutput;
+import org.apache.metron.dataloads.nonbulk.flatfile.writer.Writer;
+import org.apache.metron.dataloads.nonbulk.flatfile.writer.Writers;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class LocalSummarizer extends AbstractLocalImporter<SummarizeOptions, LocalSummarizer.SummarizationState>
{
+  List<SummarizationState> stateList;
+
+  public LocalSummarizer() {
+    stateList = Collections.synchronizedList(new ArrayList<>());
+  }
+
+  public static class SummarizationState {
+    AtomicReference<Object> state;
+    StatefulExtractor extractor;
+    public SummarizationState(StatefulExtractor extractor, Object initState) {
+      this.state = new AtomicReference<>(initState);
+      this.extractor = extractor;
+    }
+
+    public AtomicReference<Object> getState() {
+      return state;
+    }
+
+    public StatefulExtractor getExtractor() {
+      return extractor;
+    }
+
+  }
+
+  @Override
+  protected boolean isQuiet(EnumMap<SummarizeOptions, Optional<Object>> config)
{
+    return (boolean) config.getOrDefault(SummarizeOptions.QUIET, Optional.of(false)).get();
+  }
+
+  @Override
+  protected int batchSize(EnumMap<SummarizeOptions, Optional<Object>> config)
{
+    return (int) config.getOrDefault(SummarizeOptions.BATCH_SIZE, Optional.of(1)).get();
+  }
+
+  @Override
+  protected int numThreads(EnumMap<SummarizeOptions, Optional<Object>> config,
ExtractorHandler handler) {
+    if(handler.getExtractor().getCapabilities().contains(ExtractorCapabilities.MERGEABLE))
{
+      return (int) config.get(SummarizeOptions.NUM_THREADS).get();
+    }
+    else {
+      //force one thread in the case it's not mergeable.
+      return 1;
+    }
+  }
+
+  @Override
+  protected void validateState(EnumMap<SummarizeOptions, Optional<Object>> config,
ExtractorHandler handler) {
+    if(!(handler.getExtractor() instanceof StatefulExtractor)){
+      throw new IllegalStateException("Extractor must be a stateful extractor and " + handler.getExtractor().getClass().getName()
+ " is not.");
+    }
+    assertOption(config, SummarizeOptions.OUTPUT);
+    if(!handler.getExtractor().getCapabilities().contains(ExtractorCapabilities.STATEFUL))
{
+      throw new IllegalStateException("Unable to operate on a non-stateful extractor.  "
+
+              "If you have not specified \"stateUpdate\" in your Extractor config, there
is nothing to do here and nothing will be written.");
+    }
+
+  }
+
+  @Override
+  protected ThreadLocal<SummarizationState> createState(EnumMap<SummarizeOptions,
Optional<Object>> config, Configuration hadoopConfig, ExtractorHandler handler) {
+    final StatefulExtractor extractor = (StatefulExtractor)handler.getExtractor();
+    return ThreadLocal.withInitial(() -> {
+      Object initState = extractor.initializeState(handler.getConfig());
+      SummarizationState ret = new SummarizationState(extractor, initState);
+      stateList.add(ret);
+      return ret;
+    });
+  }
+
+
+  @Override
+  protected void extract(SummarizationState state, String line) throws IOException {
+    state.getExtractor().extract(line, state.getState());
+  }
+
+  @Override
+  public void importData(EnumMap<SummarizeOptions, Optional<Object>> config,
ExtractorHandler handler, Configuration hadoopConfig) throws IOException, InvalidWriterOutput
{
+    Writer writer = (Writer) config.get(SummarizeOptions.OUTPUT_MODE).get();
+    Optional<String> fileName = Optional.ofNullable((String)config.get(SummarizeOptions.OUTPUT).orElse(null));
+    writer.validate(fileName, hadoopConfig);
+    super.importData(config, handler, hadoopConfig);
+    StatefulExtractor extractor = (StatefulExtractor) handler.getExtractor();
+    Object finalState = null;
+    if(stateList.size() == 1) {
+      finalState = stateList.get(0).getState().get();
+    }
+    else if(stateList.size() > 1) {
+      List<Object> states = new ArrayList<>();
+      for(SummarizationState s : stateList) {
+        states.add(s.getState().get());
+      }
+      finalState = extractor.mergeStates(states);
+    }
+    writer.write(finalState, fileName, hadoopConfig);
+  }
+
+  @Override
+  protected List<String> getInputs(EnumMap<SummarizeOptions, Optional<Object>>
config) {
+    Object o = config.get(SummarizeOptions.INPUT).get();
+    if(o == null) {
+      return new ArrayList<>();
+    }
+    if(o instanceof String) {
+      return ImmutableList.of((String)o);
+    }
+    return (List<String>) config.get(SummarizeOptions.INPUT).get();
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/5d3e73ab/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/MapReduceImporter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/MapReduceImporter.java
b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/MapReduceImporter.java
index 401ace2..1b34ed4 100644
--- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/MapReduceImporter.java
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/MapReduceImporter.java
@@ -38,7 +38,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
-public enum MapReduceImporter implements Importer{
+public enum MapReduceImporter implements Importer<LoadOptions> {
   INSTANCE
   ;
 

http://git-wip-us.apache.org/repos/asf/metron/blob/5d3e73ab/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/Summarizers.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/Summarizers.java
b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/Summarizers.java
new file mode 100644
index 0000000..180aa23
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/Summarizers.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.nonbulk.flatfile.importer;
+
+import java.util.Optional;
+
+public enum Summarizers {
+  LOCAL(new LocalSummarizer());
+
+  private Importer importer;
+
+  Summarizers(Importer importer) {
+    this.importer = importer;
+  }
+
+  public Importer getSummarizer() {
+    return importer;
+  }
+
+  public static Optional<Summarizers> getStrategy(String strategyName) {
+    if(strategyName == null) {
+      return Optional.empty();
+    }
+    for(Summarizers strategy : values()) {
+      if(strategy.name().equalsIgnoreCase(strategyName.trim())) {
+        return Optional.of(strategy);
+      }
+    }
+    return Optional.empty();
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/5d3e73ab/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/ConsoleWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/ConsoleWriter.java
b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/ConsoleWriter.java
new file mode 100644
index 0000000..22f4aa1
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/ConsoleWriter.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.nonbulk.flatfile.writer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.metron.common.utils.SerDeUtils;
+
+import java.io.IOException;
+import java.util.Optional;
+
+public class ConsoleWriter implements Writer{
+  @Override
+  public void validate(Optional<String> output, Configuration hadoopConfig) {
+
+  }
+
+  @Override
+  public void write(Object obj, Optional<String> output, Configuration hadoopConfig)
throws IOException {
+    System.out.println(obj);
+  }
+
+  @Override
+  public void write(byte[] obj, Optional<String> output, Configuration hadoopConfig)
throws IOException {
+    System.out.println(SerDeUtils.fromBytes(obj, Object.class));
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/5d3e73ab/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/HDFSWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/HDFSWriter.java
b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/HDFSWriter.java
new file mode 100644
index 0000000..1c0c726
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/HDFSWriter.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.nonbulk.flatfile.writer;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.Optional;
+
+public class HDFSWriter implements Writer {
+  @Override
+  public void validate(Optional<String> fileNameOptional, Configuration hadoopConfig)
throws InvalidWriterOutput {
+    if(!fileNameOptional.isPresent()) {
+      throw new InvalidWriterOutput("Filename is not present.");
+    }
+    String fileName = fileNameOptional.get();
+    if(StringUtils.isEmpty(fileName) || fileName.trim().equals(".") || fileName.trim().equals("..")
|| fileName.trim().endsWith("/")) {
+      throw new InvalidWriterOutput("Filename is empty or otherwise invalid.");
+    }
+  }
+
+  @Override
+  public void write(byte[] obj, Optional<String> output, Configuration hadoopConfig)
throws IOException {
+    FileSystem fs = FileSystem.get(hadoopConfig);
+    try(FSDataOutputStream stream = fs.create(new Path(output.get()))) {
+      IOUtils.write(obj, stream);
+      stream.flush();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/5d3e73ab/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/InvalidWriterOutput.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/InvalidWriterOutput.java
b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/InvalidWriterOutput.java
new file mode 100644
index 0000000..7c237c8
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/InvalidWriterOutput.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.nonbulk.flatfile.writer;
+
+public class InvalidWriterOutput extends Exception {
+  public InvalidWriterOutput(String message) {
+    super(message);
+  }
+
+  public InvalidWriterOutput(String message, Throwable t) {
+    super(message, t);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/5d3e73ab/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/LocalWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/LocalWriter.java
b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/LocalWriter.java
new file mode 100644
index 0000000..d8bda81
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/LocalWriter.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.nonbulk.flatfile.writer;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Optional;
+
+public class LocalWriter implements Writer {
+
+  @Override
+  public void validate(Optional<String> fileNameOptional, Configuration hadoopConfig)
throws InvalidWriterOutput {
+    if(!fileNameOptional.isPresent()) {
+      throw new InvalidWriterOutput("Filename is not present.");
+    }
+    String fileName = fileNameOptional.get();
+    if(StringUtils.isEmpty(fileName) || fileName.trim().equals(".") || fileName.trim().equals("..")
|| fileName.trim().endsWith("/")) {
+      throw new InvalidWriterOutput("Filename is empty or otherwise invalid.");
+    }
+  }
+
+  @Override
+  public void write(byte[] obj, Optional<String> output, Configuration hadoopConfig)
throws IOException {
+    File outFile = new File(output.get());
+    if(!outFile.getParentFile().exists()) {
+      outFile.getParentFile().mkdirs();
+    }
+    try(FileOutputStream fs = new FileOutputStream(outFile)) {
+      IOUtils.write(obj, fs);
+      fs.flush();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/5d3e73ab/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/Writer.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/Writer.java
b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/Writer.java
new file mode 100644
index 0000000..ba13ba1
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/Writer.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.nonbulk.flatfile.writer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.metron.common.utils.SerDeUtils;
+
+import java.io.IOException;
+import java.util.Optional;
+
+public interface Writer {
+  void validate(Optional<String> output, Configuration hadoopConfig) throws InvalidWriterOutput;
+  default void write(Object obj, Optional<String> output, Configuration hadoopConfig)
throws IOException {
+    if(obj != null) {
+      write(SerDeUtils.toBytes(obj), output, hadoopConfig);
+    }
+  }
+  void write(byte[] obj, Optional<String> output, Configuration hadoopConfig) throws
IOException;
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/5d3e73ab/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/Writers.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/Writers.java
b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/Writers.java
new file mode 100644
index 0000000..785ad21
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/Writers.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.nonbulk.flatfile.writer;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.util.Optional;
+
+public enum Writers implements Writer {
+  LOCAL(new LocalWriter()),
+  HDFS(new HDFSWriter()),
+  CONSOLE(new ConsoleWriter())
+  ;
+  private Writer writer;
+
+  Writers(Writer writer) {
+    this.writer = writer;
+  }
+  public static Optional<Writers> getStrategy(String strategyName) {
+    if(strategyName == null) {
+      return Optional.empty();
+    }
+    for(Writers strategy : values()) {
+      if(strategy.name().equalsIgnoreCase(strategyName.trim())) {
+        return Optional.of(strategy);
+      }
+    }
+    return Optional.empty();
+  }
+
+  @Override
+  public void validate(Optional<String> output, Configuration hadoopConf) throws InvalidWriterOutput
{
+    writer.validate(output, hadoopConf);
+  }
+
+  @Override
+  public void write(byte[] obj, Optional<String> output, Configuration hadoopConf)
throws IOException {
+    writer.write(obj, output, hadoopConf);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/5d3e73ab/metron-platform/metron-data-management/src/main/scripts/flatfile_summarizer.sh
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/scripts/flatfile_summarizer.sh
b/metron-platform/metron-data-management/src/main/scripts/flatfile_summarizer.sh
new file mode 100755
index 0000000..018d61a
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/scripts/flatfile_summarizer.sh
@@ -0,0 +1,51 @@
+#!/bin/bash
+# 
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#     http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# 
+
+BIGTOP_DEFAULTS_DIR=${BIGTOP_DEFAULTS_DIR-/etc/default}
+[ -n "${BIGTOP_DEFAULTS_DIR}" -a -r ${BIGTOP_DEFAULTS_DIR}/hbase ] && . ${BIGTOP_DEFAULTS_DIR}/hbase
+
+# Autodetect JAVA_HOME if not defined
+if [ -e /usr/libexec/bigtop-detect-javahome ]; then
+  . /usr/libexec/bigtop-detect-javahome
+elif [ -e /usr/lib/bigtop-utils/bigtop-detect-javahome ]; then
+  . /usr/lib/bigtop-utils/bigtop-detect-javahome
+fi
+
+export METRON_VERSION=${project.version}
+export METRON_HOME=/usr/metron/$METRON_VERSION
+export CLASSNAME="org.apache.metron.dataloads.nonbulk.flatfile.SimpleFlatFileSummarizer"
+export DM_JAR=${project.artifactId}-$METRON_VERSION.jar
+export HBASE_HOME=${HBASE_HOME:-/usr/hdp/current/hbase-client}
+export HADOOP_OPTS="$HADOOP_OPTS $METRON_JVMFLAGS"
+if [ $(which hadoop) ]
+then
+  HADOOP_CLASSPATH=${HBASE_HOME}/lib/hbase-server.jar:`${HBASE_HOME}/bin/hbase classpath`
+  for jar in $(echo $HADOOP_CLASSPATH | sed 's/:/ /g');do
+    if [ -f $jar ];then
+      LIBJARS="$jar,$LIBJARS"
+    fi
+  done
+  export HADOOP_CLASSPATH
+  hadoop jar $METRON_HOME/lib/$DM_JAR $CLASSNAME -libjars ${LIBJARS} "$@"
+else
+  echo "Warning: Metron cannot find the hadoop client on this node.  This means that loading
via Map Reduce will NOT function."
+  CP=$METRON_HOME/lib/$DM_JAR:/usr/metron/${METRON_VERSION}/lib/taxii-1.1.0.1.jar:`${HBASE_HOME}/bin/hbase
classpath`
+  java $METRON_JVMFLAGS -cp $CP $CLASSNAME "$@"
+fi
+

http://git-wip-us.apache.org/repos/asf/metron/blob/5d3e73ab/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleFlatFileSummarizerTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleFlatFileSummarizerTest.java
b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleFlatFileSummarizerTest.java
new file mode 100644
index 0000000..17e3206
--- /dev/null
+++ b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleFlatFileSummarizerTest.java
@@ -0,0 +1,293 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.nonbulk.flatfile;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.PosixParser;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.metron.dataloads.extractor.ExtractorHandler;
+import org.apache.metron.dataloads.nonbulk.flatfile.importer.LocalSummarizer;
+import org.apache.metron.dataloads.nonbulk.flatfile.location.Location;
+import org.apache.metron.dataloads.nonbulk.flatfile.location.RawLocation;
+import org.apache.metron.dataloads.nonbulk.flatfile.writer.InvalidWriterOutput;
+import org.apache.metron.dataloads.nonbulk.flatfile.writer.Writer;
+import org.apache.metron.stellar.common.utils.StellarProcessorUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class SimpleFlatFileSummarizerTest {
+  /**
+   {
+   "config" : {
+     "columns" : {
+       "rank" : 0,
+       "domain" : 1
+     },
+     "value_transform" : {
+       "domain" : "DOMAIN_REMOVE_TLD(domain)"
+     },
+     "value_filter" : "LENGTH(domain) > 0",
+     "state_init" : "MULTISET_INIT()",
+     "state_update" : {
+       "state" : "MULTISET_ADD(state, domain)"
+     },
+     "state_merge" : "MULTISET_MERGE(states)",
+     "separator" : ","
+     },
+     "extractor" : "CSV"
+   }
+   */
+  @Multiline
+  public static String stellarExtractorConfigLineByLine;
+
+  /**
+   {
+   "config" : {
+     "columns" : {
+       "rank" : 0,
+       "domain" : 1
+     },
+     "value_transform" : {
+       "domain" : "DOMAIN_REMOVE_TLD(domain)"
+     },
+     "value_filter" : "LENGTH(domain) > 0",
+     "state_init" : "MULTISET_INIT()",
+     "state_update" : {
+       "state" : "MULTISET_ADD(state, domain)"
+     },
+     "state_merge" : "MULTISET_MERGE(states)",
+     "separator" : ","
+     },
+     "extractor" : "CSV",
+     "inputFormat" : "WHOLE_FILE"
+   }
+   */
+  @Multiline
+  public static String stellarExtractorConfigWholeFile;
+
+
+  public static List<String> domains = ImmutableList.of(
+          "google.com",
+          "youtube.com",
+          "facebook.com",
+          "baidu.com",
+          "wikipedia.org",
+          "yahoo.com",
+          "google.co.in",
+          "reddit.com",
+          "qq.com",
+          "amazon.com",
+          "taobao.com",
+          "tmall.com",
+          "twitter.com",
+          "live.com",
+          "vk.com",
+          "google.co.jp",
+          "instagram.com",
+          "sohu.com",
+          "sina.com.cn",
+          "jd.com"
+  );
+
+  public static String generateData() {
+    List<String> tmp = new ArrayList<>();
+    int i = 1;
+    for(String d : domains) {
+      tmp.add(i + "," + d);
+    }
+    return Joiner.on("\n").join(tmp);
+  }
+
+  @Test
+  public void testArgs() throws Exception {
+    String[] argv = { "-e extractor.json"
+            , "-o out.ser"
+            , "-l log4j", "-i input.csv"
+            , "-p 2", "-b 128", "-q"
+    };
+
+    Configuration config = new Configuration();
+    String[] otherArgs = new GenericOptionsParser(config, argv).getRemainingArgs();
+
+    CommandLine cli = SummarizeOptions.parse(new PosixParser(), otherArgs);
+    Assert.assertEquals("extractor.json", SummarizeOptions.EXTRACTOR_CONFIG.get(cli).trim());
+    Assert.assertEquals("input.csv", SummarizeOptions.INPUT.get(cli).trim());
+    Assert.assertEquals("log4j", SummarizeOptions.LOG4J_PROPERTIES.get(cli).trim());
+    Assert.assertEquals("2", SummarizeOptions.NUM_THREADS.get(cli).trim());
+    Assert.assertEquals("128", SummarizeOptions.BATCH_SIZE.get(cli).trim());
+  }
+
+  public static class InMemoryLocation implements RawLocation {
+    Map<String, String> inMemoryData;
+    public InMemoryLocation(Map<String, String> inMemoryData)
+    {
+      this.inMemoryData = inMemoryData;
+    }
+
+    @Override
+    public Optional<List<String>> list(String loc) throws IOException {
+      if(loc.equals(".")) {
+        ArrayList<String> ret = new ArrayList<>(inMemoryData.keySet());
+        return Optional.of(ret);
+      }
+      return Optional.empty();
+    }
+
+    @Override
+    public boolean exists(String loc) {
+      return loc.equals(".") ? true:inMemoryData.containsKey(loc);
+    }
+
+    @Override
+    public boolean isDirectory(String loc) throws IOException {
+      return loc.equals(".")?true:false;
+    }
+
+    @Override
+    public InputStream openInputStream(String loc) throws IOException {
+      return new ByteArrayInputStream(inMemoryData.get(loc).getBytes());
+    }
+
+    @Override
+    public boolean match(String loc) {
+      return exists(loc);
+    }
+  }
+
+  public class MockSummarizer extends LocalSummarizer {
+    Map<String, String> mockData;
+    public MockSummarizer(Map<String, String> mockData) {
+      this.mockData = mockData;
+    }
+
+    @Override
+    protected List<Location> getLocationsRecursive(List<String> inputs, FileSystem
fs) throws IOException {
+      Set<Location> ret = new HashSet<>();
+      for(String input : inputs) {
+        if(input.equals(".")) {
+          for(String s : mockData.keySet()) {
+            ret.add(resolveLocation(s, fs));
+          }
+        }
+        else {
+          ret.add(resolveLocation(input, fs));
+        }
+      }
+      return new ArrayList<>(ret);
+    }
+
+    @Override
+    protected Location resolveLocation(String input, FileSystem fs) {
+      return new Location(input, new InMemoryLocation(mockData));
+    }
+  }
+
+  public static class PeekingWriter implements Writer {
+    AtomicReference<Object> ref;
+    public PeekingWriter(AtomicReference<Object> ref) {
+      this.ref = ref;
+    }
+
+    @Override
+    public void validate(Optional<String> output, Configuration hadoopConfig) {
+
+    }
+    @Override
+    public void write(Object obj, Optional<String> output, Configuration hadoopConfig)
throws IOException {
+      ref.set(obj);
+    }
+
+    @Override
+    public void write(byte[] obj, Optional<String> output, Configuration hadoopConfig)
throws IOException {
+
+    }
+  }
+
+  @Test
+  public void testLineByLine() throws IOException, InvalidWriterOutput {
+    testLineByLine(5);
+    testLineByLine(1);
+  }
+
+  public void testLineByLine(final int numThreads) throws IOException, InvalidWriterOutput
{
+    ExtractorHandler handler = ExtractorHandler.load(stellarExtractorConfigLineByLine);
+    LocalSummarizer summarizer = new MockSummarizer(
+            ImmutableMap.of("input.csv", generateData())
+    );
+    final AtomicReference<Object> finalObj = new AtomicReference<>(null);
+    EnumMap<SummarizeOptions, Optional<Object>> options = new EnumMap<SummarizeOptions,
Optional<Object>>(SummarizeOptions.class) {{
+      put(SummarizeOptions.INPUT, Optional.of("input.csv"));
+      put(SummarizeOptions.BATCH_SIZE, Optional.of(5));
+      put(SummarizeOptions.QUIET, Optional.of(true));
+      put(SummarizeOptions.OUTPUT_MODE, Optional.of(new PeekingWriter(finalObj)));
+      put(SummarizeOptions.OUTPUT, Optional.of("out"));
+      put(SummarizeOptions.NUM_THREADS, Optional.of(numThreads));
+    }};
+    summarizer.importData(options, handler, new Configuration());
+    String expr = "MAP_GET(DOMAIN_REMOVE_TLD(domain), s) > 0";
+    for(String domain : domains) {
+      Boolean b = (Boolean)StellarProcessorUtils.run(expr, ImmutableMap.of("s", finalObj.get(),
"domain", domain));
+      Assert.assertTrue("Can't find " + domain, b);
+    }
+  }
+
+  @Test
+  public void testWholeFile() throws Exception {
+    testWholeFile(5);
+    testWholeFile(1);
+  }
+
+  public void testWholeFile(final int numThreads) throws IOException, InvalidWriterOutput
{
+    ExtractorHandler handler = ExtractorHandler.load(stellarExtractorConfigWholeFile);
+    LocalSummarizer summarizer = new MockSummarizer(
+            new HashMap<String, String>() {{
+              for(String domain : domains) {
+                put(domain, "1," + domain);
+              }
+            }}
+    );
+    final AtomicReference<Object> finalObj = new AtomicReference<>(null);
+    EnumMap<SummarizeOptions, Optional<Object>> options = new EnumMap<SummarizeOptions,
Optional<Object>>(SummarizeOptions.class) {{
+      put(SummarizeOptions.INPUT, Optional.of("."));
+      put(SummarizeOptions.BATCH_SIZE, Optional.of(5));
+      put(SummarizeOptions.QUIET, Optional.of(true));
+      put(SummarizeOptions.OUTPUT_MODE, Optional.of(new PeekingWriter(finalObj)));
+      put(SummarizeOptions.OUTPUT, Optional.of("out"));
+      put(SummarizeOptions.NUM_THREADS, Optional.of(numThreads));
+    }};
+    summarizer.importData(options, handler, new Configuration());
+    String expr = "MAP_GET(DOMAIN_REMOVE_TLD(domain), s) > 0";
+    for(String domain : domains) {
+      Boolean b = (Boolean)StellarProcessorUtils.run(expr, ImmutableMap.of("s", finalObj.get(),
"domain", domain));
+      Assert.assertTrue("Can't find " + domain, b);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/5d3e73ab/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/NetworkFunctions.java
----------------------------------------------------------------------
diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/NetworkFunctions.java
b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/NetworkFunctions.java
index ab8ced1..1d2655d 100644
--- a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/NetworkFunctions.java
+++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/NetworkFunctions.java
@@ -18,6 +18,7 @@
 
 package org.apache.metron.stellar.dsl.functions;
 
+import com.google.common.base.Joiner;
 import com.google.common.base.Splitter;
 import com.google.common.collect.Iterables;
 import com.google.common.net.InternetDomainName;
@@ -229,7 +230,13 @@ public class NetworkFunctions {
   private static String extractTld(InternetDomainName idn, String dn) {
 
     if(idn != null && idn.hasPublicSuffix()) {
-      return idn.publicSuffix().toString();
+      String ret = idn.publicSuffix().toString();
+      if(ret.startsWith("InternetDomainName")) {
+        return Joiner.on(".").join(idn.publicSuffix().parts());
+      }
+      else {
+        return ret;
+      }
     }
     else if(dn != null) {
       StringBuffer tld = new StringBuffer("");


Mime
View raw message