ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From oleew...@apache.org
Subject [2/2] ambari git commit: AMBARI-17561. Enable simulating logfeeder inputs (Miklos Gergely via oleewere)
Date Wed, 06 Jul 2016 10:28:18 GMT
AMBARI-17561. Enable simulating logfeeder inputs (Miklos Gergely via oleewere)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/c5684492
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/c5684492
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/c5684492

Branch: refs/heads/trunk
Commit: c56844922cb958c2879c292b0b57d2e5edf4a8a4
Parents: c80ccd8
Author: Miklos Gergely <mgergely@hortonworks.com>
Authored: Wed Jul 6 12:14:21 2016 +0200
Committer: oleewere <oleewere@gmail.com>
Committed: Wed Jul 6 12:23:57 2016 +0200

----------------------------------------------------------------------
 .../org/apache/ambari/logfeeder/LogFeeder.java  |  21 ++-
 .../ambari/logfeeder/input/InputSimulate.java   | 158 +++++++++++++++++++
 .../src/main/resources/alias_config.json        | 105 ++++++------
 3 files changed, 231 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/c5684492/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
index 88a6737..8697f54 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
@@ -40,6 +40,7 @@ import org.apache.ambari.logfeeder.AliasUtil.ALIAS_PARAM;
 import org.apache.ambari.logfeeder.AliasUtil.ALIAS_TYPE;
 import org.apache.ambari.logfeeder.filter.Filter;
 import org.apache.ambari.logfeeder.input.Input;
+import org.apache.ambari.logfeeder.input.InputSimulate;
 import org.apache.ambari.logfeeder.logconfig.LogfeederScheduler;
 import org.apache.ambari.logfeeder.output.Output;
 import org.apache.ambari.logfeeder.util.FileUtil;
@@ -118,6 +119,9 @@ public class LogFeeder {
           + configFileName);
       }
     }
+    
+    addSimulatedInputs();
+    
     mergeAllConfigs();
     
     LogfeederScheduler.INSTANCE.start();
@@ -196,7 +200,22 @@ public class LogFeeder {
         outputConfigList.addAll(mapList);
       }
     }
-
+  }
+  
+  private void addSimulatedInputs() {
+    int simulatedInputNumber = LogFeederUtil.getIntProperty("logfeeder.simulate.input_number",
0);
+    if (simulatedInputNumber == 0)
+      return;
+    
+    InputSimulate.loadTypeToFilePath(inputConfigList);
+    inputConfigList.clear();
+    
+    for (int i = 0; i < simulatedInputNumber; i++) {
+      HashMap<String, Object> mapList = new HashMap<String, Object>();
+      mapList.put("source", "simulate");
+      mapList.put("rowtype", "service");
+      inputConfigList.add(mapList);
+    }
   }
 
   private void mergeAllConfigs() {

http://git-wip-us.apache.org/repos/asf/ambari/blob/c5684492/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java
new file mode 100644
index 0000000..72a0586
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.input;
+
+import java.net.Inet4Address;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.ambari.logfeeder.LogFeederUtil;
+import org.apache.ambari.logfeeder.filter.Filter;
+import org.apache.ambari.logfeeder.filter.JSONFilterCode;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.solr.common.util.Base64;
+
+public class InputSimulate extends Input {
+
+  private static final String LOG_MESSAGE_PREFIX = "Simulated log message for testing, line";
+  
+  private static final String LOG_TEXT_PATTERN =
+      "{ logtime=\"%d\", level=\"%s\", log_message=\"<LOG_MESSAGE_PATTERN>\"}";
+  
+  private static final Map<String, String> typeToFilePath = new HashMap<>();
+  public static void loadTypeToFilePath(List<Map<String, Object>> inputList)
{
+    for (Map<String, Object> input : inputList) {
+      if (input.containsKey("type") && input.containsKey("path")) {
+        typeToFilePath.put((String)input.get("type"), (String)input.get("path"));
+      }
+    }
+  }
+  
+  private static final Map<String, Integer> typeToLineNumber = new HashMap<>();
+  
+  private final Random random = new Random(System.currentTimeMillis());
+  
+  private final List<String> types;
+  private final String level;
+  private final String logText;
+  private final long sleepMillis;
+  
+  public InputSimulate() throws Exception {
+    this.types = getSimulatedLogTypes();
+    this.level = LogFeederUtil.getStringProperty("logfeeder.simulate.log_level", "WARN");
+    this.logText = getLogText();
+    this.sleepMillis = LogFeederUtil.getIntProperty("logfeeder.simulate.sleep_milliseconds",
10000);
+    
+    Filter filter = new JSONFilterCode();
+    filter.setInput(this);
+    setFirstFilter(filter);
+  }
+  
+  private List<String> getSimulatedLogTypes() {
+    String logsToSimulate = LogFeederUtil.getStringProperty("logfeeder.simulate.log_ids");
+    if (logsToSimulate == null) {
+      return new ArrayList<>(typeToFilePath.keySet());
+    } else {
+      List<String> simulatedLogTypes = Arrays.asList(logsToSimulate.split(","));
+      simulatedLogTypes.retainAll(typeToFilePath.keySet());
+      return simulatedLogTypes;
+    }
+  }
+
+  private String getLogText() {
+    int logTextSize = LogFeederUtil.getIntProperty("logfeeder.simulate.log_message_size",
100);
+    int fillerSize = Math.max(logTextSize - LOG_MESSAGE_PREFIX.length() - 10, 0);
+    String filler = StringUtils.repeat("X", fillerSize);
+    String logMessagePattern = LOG_MESSAGE_PREFIX + " %08d " + filler;
+    
+    return LOG_TEXT_PATTERN.replaceAll("<LOG_MESSAGE_PATTERN>", logMessagePattern);
+  }
+  
+  @Override
+  public String getNameForThread() {
+    return "Simulated input";
+  }
+
+  @Override
+  public String getShortDescription() {
+    return "Simulated input";
+  }
+  
+  @Override
+  void start() throws Exception {
+    if (types.isEmpty())
+      return;
+    
+    getFirstFilter().setOutputMgr(outputMgr);
+    while (true) {
+      String type = imitateRandomLogFile();
+      
+      InputMarker marker = getInputMarker(type);
+      String line = getLine(marker);
+      
+      outputLine(line, marker);
+      
+      try { Thread.sleep(sleepMillis); } catch(Exception e) {}
+    }
+  }
+
+  private String imitateRandomLogFile() {
+    int typePos = random.nextInt(types.size());
+    String type = types.get(typePos);
+    String filePath = typeToFilePath.get(type);
+    
+    configs.put("type", type);
+    setFilePath(filePath);
+    
+    return type;
+  }
+
+  private InputMarker getInputMarker(String type) throws Exception {
+    InputMarker marker = new InputMarker();
+    marker.input = this;
+    marker.lineNumber = getLineNumber(type);
+    marker.base64FileKey = getBase64FileKey();
+    return marker;
+  }
+
+  private static synchronized int getLineNumber(String type) {
+    if (!typeToLineNumber.containsKey(type)) {
+      typeToLineNumber.put(type, 0);
+    }
+    Integer lineNumber = typeToLineNumber.get(type) + 1;
+    
+    typeToLineNumber.put(type, lineNumber);
+    return lineNumber;
+  }
+
+  private String getBase64FileKey() throws Exception {
+    String fileKey = Inet4Address.getLocalHost().getHostAddress() + "|" + filePath;
+    return Base64.byteArrayToBase64(fileKey.getBytes());
+  }
+
+  private String getLine(InputMarker marker) {
+    Date d = new Date();
+    return String.format(logText, d.getTime(), level, marker.lineNumber);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/c5684492/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/alias_config.json
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/alias_config.json
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/alias_config.json
index 978f581..bc221a0 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/alias_config.json
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/alias_config.json
@@ -1,54 +1,55 @@
 {
-	"input": {
-		"file": {
-			"klass": "org.apache.ambari.logfeeder.input.InputFile"
-		},
-		"s3_file": {
-			"klass": "org.apache.ambari.logfeeder.input.InputS3File"
-		}
-
-	},
-	"filter": {
-		"json": {
-			"klass": "org.apache.ambari.logfeeder.filter.JSONFilterCode"
-		},
-		"keyvalue": {
-			"klass": "org.apache.ambari.logfeeder.filter.FilterKeyValue"
-		},
-		"grok": {
-			"klass": "org.apache.ambari.logfeeder.filter.FilterGrok"
-		}
-	},
-	  
-	 "mapper": {
-		"map_date": {
-			"klass": "org.apache.ambari.logfeeder.mapper.MapperDate"
-		},
-		"map_fieldname": {
-			"klass": "org.apache.ambari.logfeeder.mapper.MapperFieldName"
-		},
-		"map_fieldvalue": {
-			"klass": "org.apache.ambari.logfeeder.mapper.MapperFieldValue"
-		}
-	},
-	  "output": {
-		"solr": {
-			"klass": "org.apache.ambari.logfeeder.output.OutputSolr"
-		},
-		"file": {
-			"klass": "org.apache.ambari.logfeeder.output.OutputFile"
-		},
-		"kafka": {
-			"klass": "org.apache.ambari.logfeeder.output.OutputKafka"
-		},
-		"dev_null": {
-			"klass": "org.apache.ambari.logfeeder.output.OutputDevNull"
-		},
-		"s3_file": {
-			"klass": "org.apache.ambari.logfeeder.output.OutputS3File"
- 		},
- 		"hdfs": {
-			"klass": "org.apache.ambari.logfeeder.output.OutputHDFSFile"
-  		}
-	}
+"input": {
+    "file": {
+      "klass": "org.apache.ambari.logfeeder.input.InputFile"
+    },
+    "s3_file": {
+      "klass": "org.apache.ambari.logfeeder.input.InputS3File"
+    },
+    "simulate": {
+      "klass": "org.apache.ambari.logfeeder.input.InputSimulate"
+    }
+  },
+  "filter": {
+    "json": {
+      "klass": "org.apache.ambari.logfeeder.filter.JSONFilterCode"
+    },
+    "keyvalue": {
+      "klass": "org.apache.ambari.logfeeder.filter.FilterKeyValue"
+    },
+    "grok": {
+      "klass": "org.apache.ambari.logfeeder.filter.FilterGrok"
+    }
+  },
+  "mapper": {
+    "map_date": {
+      "klass": "org.apache.ambari.logfeeder.mapper.MapperDate"
+    },
+    "map_fieldname": {
+      "klass": "org.apache.ambari.logfeeder.mapper.MapperFieldName"
+    },
+    "map_fieldvalue": {
+      "klass": "org.apache.ambari.logfeeder.mapper.MapperFieldValue"
+    }
+  },
+  "output": {
+    "solr": {
+      "klass": "org.apache.ambari.logfeeder.output.OutputSolr"
+    },
+    "file": {
+      "klass": "org.apache.ambari.logfeeder.output.OutputFile"
+    },
+    "kafka": {
+      "klass": "org.apache.ambari.logfeeder.output.OutputKafka"
+    },
+    "dev_null": {
+      "klass": "org.apache.ambari.logfeeder.output.OutputDevNull"
+    },
+    "s3_file": {
+      "klass": "org.apache.ambari.logfeeder.output.OutputS3File"
+    },
+    "hdfs": {
+      "klass": "org.apache.ambari.logfeeder.output.OutputHDFSFile"
+    }
+  }
 }
\ No newline at end of file


Mime
View raw message