Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 8121B200CAC for ; Mon, 19 Jun 2017 12:36:44 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 7FE4F160BE4; Mon, 19 Jun 2017 10:36:44 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A7463160BE1 for ; Mon, 19 Jun 2017 12:36:42 +0200 (CEST) Received: (qmail 21907 invoked by uid 500); 19 Jun 2017 10:36:41 -0000 Mailing-List: contact commits-help@ambari.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: ambari-dev@ambari.apache.org Delivered-To: mailing list commits@ambari.apache.org Received: (qmail 21898 invoked by uid 99); 19 Jun 2017 10:36:41 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 19 Jun 2017 10:36:41 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9BC14DFE22; Mon, 19 Jun 2017 10:36:41 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mgergely@apache.org To: commits@ambari.apache.org Message-Id: <3b1f88727f9f447ab136a01b7d01fdd7@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: ambari git commit: AMBARI-21181 Ability to anonymize data during log processing (mgergely) Date: Mon, 19 Jun 2017 10:36:41 +0000 (UTC) archived-at: Mon, 19 Jun 2017 10:36:44 -0000 Repository: ambari Updated Branches: refs/heads/trunk b7d422526 -> 49da047ec AMBARI-21181 Ability to anonymize data during log processing (mgergely) Change-Id: I93c62828a945567d75b638c475046cb77ad3f5e0 Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/49da047e Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/49da047e Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/49da047e Branch: refs/heads/trunk Commit: 49da047ecd0effd228c413425c73432f40073ac3 Parents: b7d4225 Author: Miklos Gergely Authored: Mon Jun 19 12:36:31 2017 +0200 Committer: Miklos Gergely Committed: Mon Jun 19 12:36:31 2017 +0200 ---------------------------------------------------------------------- .../inputconfig/MapAnonymizeDescriptor.java | 26 ++++ .../model/inputconfig/MapDateDescriptor.java | 2 +- .../model/inputconfig/MapFieldDescriptor.java | 2 +- .../inputconfig/MapFieldValueDescriptor.java | 2 +- .../impl/MapAnonymizeDescriptorImpl.java | 57 +++++++++ .../inputconfig/impl/PostMapValuesAdapter.java | 3 + .../docs/postMapValues.md | 9 ++ .../logfeeder/input/AbstractInputFile.java | 116 +++++++++--------- .../apache/ambari/logfeeder/input/Input.java | 5 +- .../ambari/logfeeder/input/InputManager.java | 5 + .../ambari/logfeeder/input/InputS3File.java | 6 + .../logfeeder/mapper/MapperAnonymize.java | 120 +++++++++++++++++++ .../apache/ambari/logfeeder/util/FileUtil.java | 7 ++ .../src/main/resources/alias_config.json | 3 + .../logfeeder/mapper/MapperAnonymizeTest.java | 79 ++++++++++++ .../src/test/resources/log4j.xml | 2 +- .../logsearch/model/common/LSServerMapDate.java | 3 - .../model/common/LSServerMapField.java | 3 + .../model/common/LSServerMapFieldAnonymize.java | 65 ++++++++++ .../LSServerPostMapValuesListDeserializer.java | 4 + .../shipper-conf/input.config-hst.json | 4 +- .../shipper-conf/input.config-zookeeper.json | 4 +- 22 files changed, 458 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/49da047e/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/MapAnonymizeDescriptor.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/MapAnonymizeDescriptor.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/MapAnonymizeDescriptor.java new file mode 100644 index 0000000..2533155 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/MapAnonymizeDescriptor.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logsearch.config.api.model.inputconfig; + +public interface MapAnonymizeDescriptor extends MapFieldDescriptor { + String getPattern(); + + Character getHideChar(); +} http://git-wip-us.apache.org/repos/asf/ambari/blob/49da047e/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/MapDateDescriptor.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/MapDateDescriptor.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/MapDateDescriptor.java index f88435f..985d221 100644 --- a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/MapDateDescriptor.java +++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/MapDateDescriptor.java @@ -22,5 +22,5 @@ package org.apache.ambari.logsearch.config.api.model.inputconfig; public interface MapDateDescriptor extends MapFieldDescriptor { String getSourceDatePattern(); - public String getTargetDatePattern(); + String getTargetDatePattern(); } http://git-wip-us.apache.org/repos/asf/ambari/blob/49da047e/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/MapFieldDescriptor.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/MapFieldDescriptor.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/MapFieldDescriptor.java index db086c5..f5a2e35 100644 --- a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/MapFieldDescriptor.java +++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/MapFieldDescriptor.java @@ -20,5 +20,5 @@ package org.apache.ambari.logsearch.config.api.model.inputconfig; public interface MapFieldDescriptor { - public String getJsonName(); + String getJsonName(); } http://git-wip-us.apache.org/repos/asf/ambari/blob/49da047e/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/MapFieldValueDescriptor.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/MapFieldValueDescriptor.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/MapFieldValueDescriptor.java index cf37e62..f039958 100644 --- a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/MapFieldValueDescriptor.java +++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/MapFieldValueDescriptor.java @@ -22,5 +22,5 @@ package org.apache.ambari.logsearch.config.api.model.inputconfig; public interface MapFieldValueDescriptor extends MapFieldDescriptor { String getPreValue(); - public String getPostValue(); + String getPostValue(); } http://git-wip-us.apache.org/repos/asf/ambari/blob/49da047e/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapAnonymizeDescriptorImpl.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapAnonymizeDescriptorImpl.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapAnonymizeDescriptorImpl.java new file mode 100644 index 0000000..5fdbbab --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapAnonymizeDescriptorImpl.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl; + +import org.apache.ambari.logsearch.config.api.model.inputconfig.MapAnonymizeDescriptor; + +import com.google.gson.annotations.Expose; +import com.google.gson.annotations.SerializedName; + +public class MapAnonymizeDescriptorImpl implements MapAnonymizeDescriptor { + @Override + public String getJsonName() { + return "map_anonymize"; + } + + @Expose + private String pattern; + + @Expose + @SerializedName("hide_char") + private Character hideChar; + + @Override + public String getPattern() { + return pattern; + } + + public void setPattern(String pattern) { + this.pattern = pattern; + } + + @Override + public Character getHideChar() { + return hideChar; + } + + public void setHideChar(Character hideChar) { + this.hideChar = hideChar; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/49da047e/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/PostMapValuesAdapter.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/PostMapValuesAdapter.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/PostMapValuesAdapter.java index 32aded8..3c21fd8 100644 --- a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/PostMapValuesAdapter.java +++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/PostMapValuesAdapter.java @@ -66,6 +66,9 @@ public class PostMapValuesAdapter implements JsonDeserializer" string. | +| hide\_char | The character to hide with, if it is not specified then the default is 'X' | http://git-wip-us.apache.org/repos/asf/ambari/blob/49da047e/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java index 2bc4e68..ab50eb7 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java @@ -28,6 +28,7 @@ import java.util.Date; import java.util.HashMap; import java.util.Map; +import org.apache.ambari.logfeeder.util.FileUtil; import org.apache.ambari.logfeeder.util.LogFeederUtil; import org.apache.ambari.logsearch.config.api.model.inputconfig.InputFileBaseDescriptor; import org.apache.commons.lang.BooleanUtils; @@ -48,7 +49,6 @@ public abstract class AbstractInputFile extends Input { private String checkPointExtension; private File checkPointFile; - private RandomAccessFile checkPointWriter; private long lastCheckPointTimeMS; private int checkPointIntervalMS; private Map jsonCheckPoint; @@ -93,7 +93,6 @@ public abstract class AbstractInputFile extends Input { LOG.info("Monitoring logPath=" + logPath + ", logPathFile=" + logPathFile); BufferedReader br = null; checkPointFile = null; - checkPointWriter = null; jsonCheckPoint = null; int lineCount = 0; @@ -105,6 +104,7 @@ public abstract class AbstractInputFile extends Input { boolean resume = true; int resumeFromLineNumber = getResumeFromLineNumber(); if (resumeFromLineNumber > 0) { + LOG.info("Resuming log file " + logPathFile.getAbsolutePath() + " from line number " + resumeFromLineNumber); resume = false; } @@ -211,27 +211,29 @@ public abstract class AbstractInputFile extends Input { String checkPointFileName = base64FileKey + checkPointExtension; File checkPointFolder = inputManager.getCheckPointFolderFile(); checkPointFile = new File(checkPointFolder, checkPointFileName); - checkPointWriter = new RandomAccessFile(checkPointFile, "rw"); - - try { - int contentSize = checkPointWriter.readInt(); - byte b[] = new byte[contentSize]; - int readSize = checkPointWriter.read(b, 0, contentSize); - if (readSize != contentSize) { - LOG.error("Couldn't read expected number of bytes from checkpoint file. expected=" + contentSize + ", read=" + - readSize + ", checkPointFile=" + checkPointFile + ", input=" + getShortDescription()); - } else { - String jsonCheckPointStr = new String(b, 0, readSize); - jsonCheckPoint = LogFeederUtil.toJSONObject(jsonCheckPointStr); - - resumeFromLineNumber = LogFeederUtil.objectToInt(jsonCheckPoint.get("line_number"), 0, "line_number"); - - LOG.info("CheckPoint. checkPointFile=" + checkPointFile + ", json=" + jsonCheckPointStr + - ", resumeFromLineNumber=" + resumeFromLineNumber); + if (!checkPointFile.exists()) { + LOG.info("Checkpoint file for log file " + filePath + " doesn't exist, starting to read it from the beginning"); + } else { + try (RandomAccessFile checkPointWriter = new RandomAccessFile(checkPointFile, "rw")) { + int contentSize = checkPointWriter.readInt(); + byte b[] = new byte[contentSize]; + int readSize = checkPointWriter.read(b, 0, contentSize); + if (readSize != contentSize) { + LOG.error("Couldn't read expected number of bytes from checkpoint file. expected=" + contentSize + ", read=" + + readSize + ", checkPointFile=" + checkPointFile + ", input=" + getShortDescription()); + } else { + String jsonCheckPointStr = new String(b, 0, readSize); + jsonCheckPoint = LogFeederUtil.toJSONObject(jsonCheckPointStr); + + resumeFromLineNumber = LogFeederUtil.objectToInt(jsonCheckPoint.get("line_number"), 0, "line_number"); + + LOG.info("CheckPoint. checkPointFile=" + checkPointFile + ", json=" + jsonCheckPointStr + + ", resumeFromLineNumber=" + resumeFromLineNumber); + } + } catch (EOFException eofEx) { + LOG.info("EOFException. Will reset checkpoint file " + checkPointFile.getAbsolutePath() + " for " + + getShortDescription(), eofEx); } - } catch (EOFException eofEx) { - LOG.info("EOFException. Will reset checkpoint file " + checkPointFile.getAbsolutePath() + " for " + - getShortDescription()); } if (jsonCheckPoint == null) { // This seems to be first time, so creating the initial checkPoint object @@ -250,43 +252,48 @@ public abstract class AbstractInputFile extends Input { @Override public synchronized void checkIn(InputMarker inputMarker) { - if (checkPointWriter != null) { - try { - int lineNumber = LogFeederUtil.objectToInt(jsonCheckPoint.get("line_number"), 0, "line_number"); - if (lineNumber > inputMarker.lineNumber) { - // Already wrote higher line number for this input - return; - } - // If interval is greater than last checkPoint time, then write - long currMS = System.currentTimeMillis(); - if (!isClosed() && (currMS - lastCheckPointTimeMS) < checkPointIntervalMS) { - // Let's save this one so we can update the check point file on flush - lastCheckPointInputMarker = inputMarker; - return; - } - lastCheckPointTimeMS = currMS; + try { + int lineNumber = LogFeederUtil.objectToInt(jsonCheckPoint.get("line_number"), 0, "line_number"); + if (lineNumber > inputMarker.lineNumber) { + // Already wrote higher line number for this input + return; + } + // If interval is greater than last checkPoint time, then write + long currMS = System.currentTimeMillis(); + if (!isClosed() && (currMS - lastCheckPointTimeMS) < checkPointIntervalMS) { + // Let's save this one so we can update the check point file on flush + lastCheckPointInputMarker = inputMarker; + return; + } + lastCheckPointTimeMS = currMS; - jsonCheckPoint.put("line_number", "" + new Integer(inputMarker.lineNumber)); - jsonCheckPoint.put("last_write_time_ms", "" + new Long(currMS)); - jsonCheckPoint.put("last_write_time_date", new Date()); + jsonCheckPoint.put("line_number", "" + new Integer(inputMarker.lineNumber)); + jsonCheckPoint.put("last_write_time_ms", "" + new Long(currMS)); + jsonCheckPoint.put("last_write_time_date", new Date()); - String jsonStr = LogFeederUtil.getGson().toJson(jsonCheckPoint); + String jsonStr = LogFeederUtil.getGson().toJson(jsonCheckPoint); - // Let's rewind - checkPointWriter.seek(0); - checkPointWriter.writeInt(jsonStr.length()); - checkPointWriter.write(jsonStr.getBytes()); + File tmpCheckPointFile = new File(checkPointFile.getAbsolutePath() + ".tmp"); + if (tmpCheckPointFile.exists()) { + tmpCheckPointFile.delete(); + } + RandomAccessFile tmpRaf = new RandomAccessFile(tmpCheckPointFile, "rws"); + tmpRaf.writeInt(jsonStr.length()); + tmpRaf.write(jsonStr.getBytes()); + tmpRaf.getFD().sync(); + tmpRaf.close(); + + FileUtil.move(tmpCheckPointFile, checkPointFile); - if (isClosed()) { - String logMessageKey = this.getClass().getSimpleName() + "_FINAL_CHECKIN"; - LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Wrote final checkPoint, input=" + getShortDescription() + - ", checkPointFile=" + checkPointFile.getAbsolutePath() + ", checkPoint=" + jsonStr, null, LOG, Level.INFO); - } - } catch (Throwable t) { - String logMessageKey = this.getClass().getSimpleName() + "_CHECKIN_EXCEPTION"; - LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Caught exception checkIn. , input=" + getShortDescription(), t, - LOG, Level.ERROR); + if (isClosed()) { + String logMessageKey = this.getClass().getSimpleName() + "_FINAL_CHECKIN"; + LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Wrote final checkPoint, input=" + getShortDescription() + + ", checkPointFile=" + checkPointFile.getAbsolutePath() + ", checkPoint=" + jsonStr, null, LOG, Level.INFO); } + } catch (Throwable t) { + String logMessageKey = this.getClass().getSimpleName() + "_CHECKIN_EXCEPTION"; + LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Caught exception checkIn. , input=" + getShortDescription(), t, + LOG, Level.ERROR); } } @@ -302,6 +309,7 @@ public abstract class AbstractInputFile extends Input { super.close(); LOG.info("close() calling checkPoint checkIn(). " + getShortDescription()); lastCheckIn(); + isClosed = true; } @Override http://git-wip-us.apache.org/repos/asf/ambari/blob/49da047e/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java index fba596d..27d16c4 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java @@ -59,7 +59,7 @@ public abstract class Input extends ConfigItem implements Runnable { private String type; protected String filePath; private Filter firstFilter; - private boolean isClosed; + protected boolean isClosed; protected boolean tail; private boolean useEventMD5; @@ -237,13 +237,10 @@ public abstract class Input extends ConfigItem implements Runnable { try { if (firstFilter != null) { firstFilter.close(); - } else { - outputManager.close(); } } catch (Throwable t) { // Ignore } - isClosed = true; } private void initCache() { http://git-wip-us.apache.org/repos/asf/ambari/blob/49da047e/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java index 8c76785..19894ae 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java @@ -77,6 +77,11 @@ public class InputManager { for (Input input : inputList) { input.setDrain(true); } + for (Input input : inputList) { + while (!input.isClosed()) { + try { Thread.sleep(100); } catch (InterruptedException e) {} + } + } inputList.clear(); inputs.remove(serviceName); } http://git-wip-us.apache.org/repos/asf/ambari/blob/49da047e/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java index 4bf162b..2b19503 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java @@ -92,4 +92,10 @@ public class InputS3File extends AbstractInputFile { protected Object getFileKey(File logFile) { return logFile.getPath(); } + + @Override + public void close() { + super.close(); + isClosed = true; + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/49da047e/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperAnonymize.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperAnonymize.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperAnonymize.java new file mode 100644 index 0000000..c85ad49 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperAnonymize.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logfeeder.mapper; + +import java.util.Map; + +import org.apache.ambari.logfeeder.util.LogFeederUtil; +import org.apache.ambari.logsearch.config.api.model.inputconfig.MapAnonymizeDescriptor; +import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldDescriptor; +import org.apache.commons.lang.CharUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; + +import com.google.common.base.Splitter; + +public class MapperAnonymize extends Mapper { + private static final Logger LOG = Logger.getLogger(MapperAnonymize.class); + + private static final char DEFAULT_HIDE_CHAR = '*'; + + private String pattern; + private Iterable patternParts; + private char hideChar; + + @Override + public boolean init(String inputDesc, String fieldName, String mapClassCode, MapFieldDescriptor mapFieldDescriptor) { + init(inputDesc, fieldName, mapClassCode); + + pattern = ((MapAnonymizeDescriptor)mapFieldDescriptor).getPattern(); + if (StringUtils.isEmpty(pattern)) { + LOG.fatal("pattern is empty."); + return false; + } + + patternParts = Splitter.on("").omitEmptyStrings().split(pattern); + hideChar = CharUtils.toChar(((MapAnonymizeDescriptor)mapFieldDescriptor).getHideChar(), DEFAULT_HIDE_CHAR); + + return true; + } + + @Override + public Object apply(Map jsonObj, Object value) { + if (value != null) { + try { + hide((String)value, jsonObj); + } catch (Throwable t) { + LogFeederUtil.logErrorMessageByInterval(this.getClass().getSimpleName() + ":apply", "Error applying anonymization." + + " pattern=" + pattern + ", hideChar=" + hideChar, t, LOG, Level.ERROR); + } + } + return value; + } + + private void hide(String value, Map jsonObj) { + StringBuilder sb = new StringBuilder(); + boolean first = true; + String rest = value; + for (String patternPart : patternParts) { + int pos = rest.indexOf(patternPart); + if (pos == -1) { + return; + } + + int end = pos + patternPart.length(); + if (first) { + if (pattern.startsWith("")) { + String beginning = rest.substring(0, pos); + int spacePos = beginning.lastIndexOf(" "); + if (spacePos == -1) { + sb.append(StringUtils.repeat(hideChar, beginning.length())); + } else { + sb.append(beginning.substring(0, spacePos+1)); + sb.append(StringUtils.repeat(hideChar, beginning.length() - spacePos - 1)); + } + sb.append(rest.substring(pos, end)); + } else { + sb.append(rest.substring(0, end)); + } + first = false; + } else { + sb.append(StringUtils.repeat(hideChar, pos)); + sb.append(rest.substring(pos, end)); + } + rest = rest.substring(end); + } + + if (pattern.endsWith("")) { + int spacePos = rest.indexOf(" "); + if (spacePos == -1) { + sb.append(StringUtils.repeat(hideChar, rest.length())); + rest = ""; + } else { + sb.append(StringUtils.repeat(hideChar, spacePos)); + rest = rest.substring(spacePos); + } + } + + sb.append(rest); + + jsonObj.put(fieldName, sb.toString()); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/49da047e/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java index ffd6cec..94d6558 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java @@ -25,6 +25,7 @@ import java.net.URL; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; import java.nio.file.attribute.BasicFileAttributes; import java.util.ArrayList; import java.util.HashMap; @@ -89,4 +90,10 @@ public class FileUtil { } return new HashMap(); } + + public static void move(File source, File target) throws IOException { + Path sourcePath = Paths.get(source.getAbsolutePath()); + Path targetPath = Paths.get(target.getAbsolutePath()); + Files.move(sourcePath, targetPath, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING); + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/49da047e/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/alias_config.json ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/alias_config.json b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/alias_config.json index e2ed625..4656b5b 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/alias_config.json +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/alias_config.json @@ -33,6 +33,9 @@ }, "map_fieldvalue": { "klass": "org.apache.ambari.logfeeder.mapper.MapperFieldValue" + }, + "map_anonymize": { + "klass": "org.apache.ambari.logfeeder.mapper.MapperAnonymize" } }, "output": { http://git-wip-us.apache.org/repos/asf/ambari/blob/49da047e/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperAnonymizeTest.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperAnonymizeTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperAnonymizeTest.java new file mode 100644 index 0000000..a0b96c0 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperAnonymizeTest.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.logfeeder.mapper; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.MapAnonymizeDescriptorImpl; +import org.apache.log4j.Logger; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class MapperAnonymizeTest { + private static final Logger LOG = Logger.getLogger(MapperAnonymizeTest.class); + + @Test + public void testMapperAnonymize_anonymize() { + LOG.info("testMapperAnonymize_anonymize()"); + + MapAnonymizeDescriptorImpl mapAnonymizeDescriptorImpl = new MapAnonymizeDescriptorImpl(); + mapAnonymizeDescriptorImpl.setPattern("secret / is here"); + + MapperAnonymize mapperAnonymize = new MapperAnonymize(); + assertTrue("Could not initialize!", mapperAnonymize.init(null, "someField", null, mapAnonymizeDescriptorImpl)); + + Map jsonObj = new HashMap<>(); + mapperAnonymize.apply(jsonObj, "something else secret SECRET1 / SECRET2 is here something else 2"); + + assertEquals("Field wasnt anonymized", "something else secret XXXXXXX / XXXXXXX is here something else 2", jsonObj.remove("someField")); + assertTrue("jsonObj is not empty", jsonObj.isEmpty()); + } + + @Test + public void testMapperAnonymize_anonymize2() { + LOG.info("testMapperAnonymize_anonymize2()"); + + MapAnonymizeDescriptorImpl mapAnonymizeDescriptorImpl = new MapAnonymizeDescriptorImpl(); + mapAnonymizeDescriptorImpl.setPattern(" / is the secret"); + mapAnonymizeDescriptorImpl.setHideChar('*'); + + MapperAnonymize mapperAnonymize = new MapperAnonymize(); + assertTrue("Could not initialize!", mapperAnonymize.init(null, "someField", null, mapAnonymizeDescriptorImpl)); + + Map jsonObj = new HashMap<>(); + mapperAnonymize.apply(jsonObj, "something else SECRET1 / SECRET2 is the secret something else 2"); + + assertEquals("Field wasnt anonymized", "something else ******* / ******* is the secret something else 2", jsonObj.remove("someField")); + assertTrue("jsonObj is not empty", jsonObj.isEmpty()); + } + + @Test + public void testMapperAnonymize_noPattern() { + LOG.info("testMapperAnonymize_noPattern()"); + + MapAnonymizeDescriptorImpl mapAnonymizeDescriptorImpl = new MapAnonymizeDescriptorImpl(); + + MapperAnonymize mapperAnonymize = new MapperAnonymize(); + assertFalse("Was not able to initialize!", mapperAnonymize.init(null, "someField", null, mapAnonymizeDescriptorImpl)); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/49da047e/ambari-logsearch/ambari-logsearch-logfeeder/src/test/resources/log4j.xml ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/resources/log4j.xml b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/resources/log4j.xml index e641018..1d28fcc 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/resources/log4j.xml +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/resources/log4j.xml @@ -15,7 +15,7 @@ See the License for the specific language governing permissions and limitations under the License. --> - + http://git-wip-us.apache.org/repos/asf/ambari/blob/49da047e/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerMapDate.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerMapDate.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerMapDate.java index dcb0393..96e0287 100644 --- a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerMapDate.java +++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerMapDate.java @@ -23,11 +23,8 @@ import javax.validation.constraints.NotNull; import org.apache.ambari.logsearch.config.api.model.inputconfig.MapDateDescriptor; -import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonInclude.Include; -@JsonInclude(Include.NON_NULL) public class LSServerMapDate extends LSServerMapField { @Override public String getName() { http://git-wip-us.apache.org/repos/asf/ambari/blob/49da047e/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerMapField.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerMapField.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerMapField.java index b18439c..df33da1 100644 --- a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerMapField.java +++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerMapField.java @@ -20,11 +20,14 @@ package org.apache.ambari.logsearch.model.common; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonInclude.Include; import io.swagger.annotations.ApiModel; @ApiModel @JsonIgnoreProperties(value = { "name" }) +@JsonInclude(Include.NON_NULL) public abstract class LSServerMapField { public abstract String getName(); } http://git-wip-us.apache.org/repos/asf/ambari/blob/49da047e/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerMapFieldAnonymize.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerMapFieldAnonymize.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerMapFieldAnonymize.java new file mode 100644 index 0000000..9fb589e --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerMapFieldAnonymize.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logsearch.model.common; + +import javax.validation.constraints.NotNull; + +import org.apache.ambari.logsearch.config.api.model.inputconfig.MapAnonymizeDescriptor; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import io.swagger.annotations.ApiModel; + +@ApiModel +public class LSServerMapFieldAnonymize extends LSServerMapField { + @Override + public String getName() { + return "map_anonymize"; + } + + @NotNull + private String pattern; + + @JsonProperty("hide_char") + private Character hideChar; + + public LSServerMapFieldAnonymize() {} + + public LSServerMapFieldAnonymize(MapAnonymizeDescriptor mapAnonymizeDescriptor) { + this.pattern = mapAnonymizeDescriptor.getPattern(); + this.hideChar = mapAnonymizeDescriptor.getHideChar(); + } + + public String getPattern() { + return pattern; + } + + public void setPattern(String pattern) { + this.pattern = pattern; + } + + public Character getHideChar() { + return hideChar; + } + + public void setHideChar(Character hideChar) { + this.hideChar = hideChar; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/49da047e/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerPostMapValuesListDeserializer.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerPostMapValuesListDeserializer.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerPostMapValuesListDeserializer.java index 18744e2..258f64a 100644 --- a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerPostMapValuesListDeserializer.java +++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerPostMapValuesListDeserializer.java @@ -64,6 +64,10 @@ public class LSServerPostMapValuesListDeserializer extends JsonDeserializer