Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id B73C0200C8B for ; Mon, 22 May 2017 12:50:16 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id B5ADA160BA5; Mon, 22 May 2017 10:50:16 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 3DE43160BD4 for ; Mon, 22 May 2017 12:50:14 +0200 (CEST) Received: (qmail 81255 invoked by uid 500); 22 May 2017 10:50:13 -0000 Mailing-List: contact commits-help@ambari.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: ambari-dev@ambari.apache.org Delivered-To: mailing list commits@ambari.apache.org Received: (qmail 81151 invoked by uid 99); 22 May 2017 10:50:13 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 22 May 2017 10:50:13 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2F776DF9AE; Mon, 22 May 2017 10:50:13 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mgergely@apache.org To: commits@ambari.apache.org Date: Mon, 22 May 2017 10:50:15 -0000 Message-Id: In-Reply-To: <2f32e152209b482ba8f146f844d987f4@git.apache.org> References: <2f32e152209b482ba8f146f844d987f4@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/4] ambari git commit: AMBARI-21033 Log Search use POJOs for input configuration (mgergely) archived-at: Mon, 22 May 2017 10:50:16 -0000 http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputFileBaseDescriptorImpl.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputFileBaseDescriptorImpl.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputFileBaseDescriptorImpl.java new file mode 100644 index 0000000..51c7ec8 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputFileBaseDescriptorImpl.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl; + +import org.apache.ambari.logsearch.config.api.model.inputconfig.InputFileBaseDescriptor; + +import com.google.gson.annotations.Expose; +import com.google.gson.annotations.SerializedName; + +public class InputFileBaseDescriptorImpl extends InputDescriptorImpl implements InputFileBaseDescriptor { + @Expose + @SerializedName("checkpoint_interval_ms") + private Integer checkpointIntervalMs; + + @Expose + @SerializedName("process_file") + private Boolean processFile; + + @Expose + @SerializedName("copy_file") + private Boolean copyFile; + + @Override + public Boolean getProcessFile() { + return processFile; + } + + public void setProcessFile(Boolean processFile) { + this.processFile = processFile; + } + + @Override + public Boolean getCopyFile() { + return copyFile; + } + + public void setCopyFile(Boolean copyFile) { + this.copyFile = copyFile; + } + + @Override + public Integer getCheckpointIntervalMs() { + return checkpointIntervalMs; + } + + public void setCheckpointIntervalMs(Integer checkpointIntervalMs) { + this.checkpointIntervalMs = checkpointIntervalMs; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputFileDescriptorImpl.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputFileDescriptorImpl.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputFileDescriptorImpl.java new file mode 100644 index 0000000..3bfd161 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputFileDescriptorImpl.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl; + +import org.apache.ambari.logsearch.config.api.model.inputconfig.InputFileDescriptor; + +public class InputFileDescriptorImpl extends InputFileBaseDescriptorImpl implements InputFileDescriptor { +} http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputS3FileDescriptorImpl.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputS3FileDescriptorImpl.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputS3FileDescriptorImpl.java new file mode 100644 index 0000000..277a57c --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputS3FileDescriptorImpl.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl; + +import org.apache.ambari.logsearch.config.api.model.inputconfig.InputS3FileDescriptor; + +import com.google.gson.annotations.Expose; +import com.google.gson.annotations.SerializedName; + +public class InputS3FileDescriptorImpl extends InputFileBaseDescriptorImpl implements InputS3FileDescriptor { + @Expose + @SerializedName("s3_access_key") + private String s3AccessKey; + + @Expose + @SerializedName("s3_secret_key") + private String s3SecretKey; + + @Override + public String getS3AccessKey() { + return s3AccessKey; + } + + public void setS3AccessKey(String s3AccessKey) { + this.s3AccessKey = s3AccessKey; + } + + @Override + public String getS3SecretKey() { + return s3SecretKey; + } + + public void setS3SecretKey(String s3SecretKey) { + this.s3SecretKey = s3SecretKey; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapDateDescriptorImpl.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapDateDescriptorImpl.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapDateDescriptorImpl.java new file mode 100644 index 0000000..9daad2b --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapDateDescriptorImpl.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl; + +import org.apache.ambari.logsearch.config.api.model.inputconfig.MapDateDescriptor; + +import com.google.gson.annotations.Expose; +import com.google.gson.annotations.SerializedName; + +public class MapDateDescriptorImpl implements MapDateDescriptor { + @Override + public String getJsonName() { + return "map_date"; + } + + @Expose + @SerializedName("source_date_pattern") + private String sourceDatePattern; + + @Expose + @SerializedName("target_date_pattern") + private String targetDatePattern; + + @Override + public String getSourceDatePattern() { + return sourceDatePattern; + } + + public void setSourceDatePattern(String sourceDatePattern) { + this.sourceDatePattern = sourceDatePattern; + } + + @Override + public String getTargetDatePattern() { + return targetDatePattern; + } + + public void setTargetDatePattern(String targetDatePattern) { + this.targetDatePattern = targetDatePattern; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapFieldCopyDescriptorImpl.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapFieldCopyDescriptorImpl.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapFieldCopyDescriptorImpl.java new file mode 100644 index 0000000..4a8d746 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapFieldCopyDescriptorImpl.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl; + +import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldCopyDescriptor; + +import com.google.gson.annotations.Expose; +import com.google.gson.annotations.SerializedName; + +public class MapFieldCopyDescriptorImpl implements MapFieldCopyDescriptor { + @Override + public String getJsonName() { + return "map_fieldcopy"; + } + + @Expose + @SerializedName("copy_name") + private String copyName; + + @Override + public String getCopyName() { + return copyName; + } + + public void setCopyName(String copyName) { + this.copyName = copyName; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapFieldNameDescriptorImpl.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapFieldNameDescriptorImpl.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapFieldNameDescriptorImpl.java new file mode 100644 index 0000000..333cb67 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapFieldNameDescriptorImpl.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl; + +import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldNameDescriptor; + +import com.google.gson.annotations.Expose; +import com.google.gson.annotations.SerializedName; + +public class MapFieldNameDescriptorImpl implements MapFieldNameDescriptor { + @Override + public String getJsonName() { + return "map_fieldname"; + } + + @Expose + @SerializedName("new_fieldname") + private String newFieldName; + + @Override + public String getNewFieldName() { + return newFieldName; + } + + public void setNewFieldName(String newFieldName) { + this.newFieldName = newFieldName; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapFieldValueDescriptorImpl.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapFieldValueDescriptorImpl.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapFieldValueDescriptorImpl.java new file mode 100644 index 0000000..599e152 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapFieldValueDescriptorImpl.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl; + +import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldValueDescriptor; + +import com.google.gson.annotations.Expose; +import com.google.gson.annotations.SerializedName; + +public class MapFieldValueDescriptorImpl implements MapFieldValueDescriptor { + @Override + public String getJsonName() { + return "map_fieldvalue"; + } + + @Expose + @SerializedName("pre_value") + private String preValue; + + @Expose + @SerializedName("post_value") + private String postValue; + + @Override + public String getPreValue() { + return preValue; + } + + public void setPreValue(String preValue) { + this.preValue = preValue; + } + + @Override + public String getPostValue() { + return postValue; + } + + public void setPostValue(String postValue) { + this.postValue = postValue; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/PostMapValuesAdapter.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/PostMapValuesAdapter.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/PostMapValuesAdapter.java new file mode 100644 index 0000000..32aded8 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/PostMapValuesAdapter.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl; + +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldDescriptor; + +import com.google.gson.JsonArray; +import com.google.gson.JsonDeserializationContext; +import com.google.gson.JsonDeserializer; +import com.google.gson.JsonSerializer; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonSerializationContext; + +public class PostMapValuesAdapter implements JsonDeserializer>, JsonSerializer> { + @Override + public List deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) { + List vals = new ArrayList<>(); + if (json.isJsonArray()) { + for (JsonElement e : json.getAsJsonArray()) { + vals.add(createPostMapValues(e, context)); + } + } else if (json.isJsonObject()) { + vals.add(createPostMapValues(json, context)); + } else { + throw new RuntimeException("Unexpected JSON type: " + json.getClass()); + } + return vals; + } + + private PostMapValuesImpl createPostMapValues(JsonElement e, JsonDeserializationContext context) { + List mappers = new ArrayList<>(); + for (Map.Entry m : e.getAsJsonObject().entrySet()) { + switch (m.getKey()) { + case "map_date": + mappers.add((MapDateDescriptorImpl)context.deserialize(m.getValue(), MapDateDescriptorImpl.class)); + break; + case "map_fieldcopy": + mappers.add((MapFieldCopyDescriptorImpl)context.deserialize(m.getValue(), MapFieldCopyDescriptorImpl.class)); + break; + case "map_fieldname": + mappers.add((MapFieldNameDescriptorImpl)context.deserialize(m.getValue(), MapFieldNameDescriptorImpl.class)); + break; + case "map_fieldvalue": + mappers.add((MapFieldValueDescriptorImpl)context.deserialize(m.getValue(), MapFieldValueDescriptorImpl.class)); + break; + default: + System.out.println("Unknown key: " + m.getKey()); + } + } + + PostMapValuesImpl postMapValues = new PostMapValuesImpl(); + postMapValues.setMappers(mappers); + return postMapValues; + } + + @Override + public JsonElement serialize(List src, Type typeOfSrc, JsonSerializationContext context) { + if (src.size() == 1) { + return createMapperObject(src.get(0), context); + } else { + JsonArray jsonArray = new JsonArray(); + for (PostMapValuesImpl postMapValues : src) { + jsonArray.add(createMapperObject(postMapValues, context)); + } + return jsonArray; + } + } + + private JsonElement createMapperObject(PostMapValuesImpl postMapValues, JsonSerializationContext context) { + JsonObject jsonObject = new JsonObject(); + for (MapFieldDescriptor m : postMapValues.getMappers()) { + jsonObject.add(((MapFieldDescriptor)m).getJsonName(), context.serialize(m)); + } + return jsonObject; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/PostMapValuesImpl.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/PostMapValuesImpl.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/PostMapValuesImpl.java new file mode 100644 index 0000000..4d2254a --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/PostMapValuesImpl.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl; + +import java.util.List; + +import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldDescriptor; +import org.apache.ambari.logsearch.config.api.model.inputconfig.PostMapValues; + +import com.google.gson.annotations.Expose; + +public class PostMapValuesImpl implements PostMapValues { + @Expose + private List mappers; + + public List getMappers() { + return mappers; + } + + public void setMappers(List mappers) { + this.mappers = mappers; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java index c853f42..8d7c69f 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java @@ -34,7 +34,7 @@ import org.apache.ambari.logfeeder.metrics.MetricData; import org.apache.ambari.logfeeder.metrics.MetricsManager; import org.apache.ambari.logfeeder.util.LogFeederUtil; import org.apache.ambari.logfeeder.util.SSLUtil; -import org.apache.curator.shaded.com.google.common.collect.Maps; +import com.google.common.collect.Maps; import org.apache.hadoop.util.ShutdownHookManager; import org.apache.log4j.Logger; http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigBlock.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigBlock.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigBlock.java index 68897e8..cfcc199 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigBlock.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigBlock.java @@ -20,54 +20,19 @@ package org.apache.ambari.logfeeder.common; import java.util.HashMap; -import java.util.List; import java.util.Map; -import org.apache.ambari.logfeeder.metrics.MetricData; import org.apache.ambari.logfeeder.util.LogFeederUtil; -import org.apache.commons.collections.MapUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.log4j.Logger; import org.apache.log4j.Priority; -public abstract class ConfigBlock { - private static final Logger LOG = Logger.getLogger(ConfigBlock.class); - - private boolean drain = false; - +public abstract class ConfigBlock extends ConfigItem { protected Map configs; protected Map contextFields = new HashMap(); - public MetricData statMetric = new MetricData(getStatMetricName(), false); - protected String getStatMetricName() { - return null; - } - public ConfigBlock() { } - /** - * Used while logging. Keep it short and meaningful - */ - public abstract String getShortDescription(); - - /** - * Every implementor need to give name to the thread they create - */ - public String getNameForThread() { - return this.getClass().getSimpleName(); - } - - public void addMetricsContainers(List metricsList) { - metricsList.add(statMetric); - } - - /** - * This method needs to be overwritten by deriving classes. - */ - public void init() throws Exception { - } - public void loadConfig(Map map) { configs = LogFeederUtil.cloneObject(map); @@ -81,46 +46,6 @@ public abstract class ConfigBlock { return configs; } - @SuppressWarnings("unchecked") - public boolean isEnabled() { - boolean isEnabled = getBooleanValue("is_enabled", true); - if (isEnabled) { - // Let's check for static conditions - Map conditions = (Map) configs.get("conditions"); - boolean allow = true; - if (MapUtils.isNotEmpty(conditions)) { - allow = false; - for (String conditionType : conditions.keySet()) { - if (conditionType.equalsIgnoreCase("fields")) { - Map fields = (Map) conditions.get("fields"); - for (String fieldName : fields.keySet()) { - Object values = fields.get(fieldName); - if (values instanceof String) { - allow = isFieldConditionMatch(fieldName, (String) values); - } else { - List listValues = (List) values; - for (String stringValue : listValues) { - allow = isFieldConditionMatch(fieldName, stringValue); - if (allow) { - break; - } - } - } - if (allow) { - break; - } - } - } - if (allow) { - break; - } - } - isEnabled = allow; - } - } - return isEnabled; - } - public boolean isFieldConditionMatch(String fieldName, String stringValue) { boolean allow = false; String fieldValue = (String) configs.get(fieldName); @@ -207,27 +132,17 @@ public abstract class ConfigBlock { return retValue; } - public Map getContextFields() { - return contextFields; - } - - public void incrementStat(int count) { - statMetric.value += count; - } - - public void logStatForMetric(MetricData metric, String prefixStr) { - LogFeederUtil.logStatForMetric(metric, prefixStr, ", key=" + getShortDescription()); + @Override + public boolean isEnabled() { + return getBooleanValue("is_enabled", true); } - public synchronized void logStat() { - logStatForMetric(statMetric, "Stat"); + public Map getContextFields() { + return contextFields; } public boolean logConfigs(Priority level) { - if (level.toInt() == Priority.INFO_INT && !LOG.isInfoEnabled()) { - return false; - } - if (level.toInt() == Priority.DEBUG_INT && !LOG.isDebugEnabled()) { + if (!super.logConfigs(level)) { return false; } LOG.log(level, "Printing configuration Block=" + getShortDescription()); @@ -235,12 +150,4 @@ public abstract class ConfigBlock { LOG.log(level, "contextFields=" + contextFields); return true; } - - public boolean isDrain() { - return drain; - } - - public void setDrain(boolean drain) { - this.drain = drain; - } } http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java index effe980..726ff27 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java @@ -46,13 +46,19 @@ import org.apache.ambari.logfeeder.util.LogFeederUtil; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.BooleanUtils; import org.apache.commons.lang3.StringUtils; import org.apache.ambari.logfeeder.util.AliasUtil.AliasType; import org.apache.ambari.logsearch.config.api.InputConfigMonitor; +import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterDescriptor; +import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig; +import org.apache.ambari.logsearch.config.api.model.inputconfig.InputDescriptor; +import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.FilterDescriptorImpl; +import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputConfigImpl; +import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputDescriptorImpl; import org.apache.log4j.Level; import org.apache.log4j.Logger; -import com.google.common.collect.ImmutableMap; import com.google.gson.reflect.TypeToken; public class ConfigHandler implements InputConfigMonitor { @@ -61,10 +67,11 @@ public class ConfigHandler implements InputConfigMonitor { private final OutputManager outputManager = new OutputManager(); private final InputManager inputManager = new InputManager(); - public static Map globalConfigs = new HashMap<>(); + private final Map globalConfigs = new HashMap<>(); + private final List globalConfigJsons = new ArrayList(); - private final List> inputConfigList = new ArrayList<>(); - private final List> filterConfigList = new ArrayList<>(); + private final List inputConfigList = new ArrayList<>(); + private final List filterConfigList = new ArrayList<>(); private final List> outputConfigList = new ArrayList<>(); private boolean simulateMode = false; @@ -141,11 +148,12 @@ public class ConfigHandler implements InputConfigMonitor { } @Override - public void loadInputConfigs(String serviceName, String inputConfigData) throws Exception { + public void loadInputConfigs(String serviceName, InputConfig inputConfig) throws Exception { inputConfigList.clear(); filterConfigList.clear(); - loadConfigs(inputConfigData); + inputConfigList.addAll(inputConfig.getInput()); + filterConfigList.addAll(inputConfig.getFilter()); if (simulateMode) { InputSimulate.loadTypeToFilePath(inputConfigList); @@ -173,14 +181,7 @@ public class ConfigHandler implements InputConfigMonitor { switch (key) { case "global" : globalConfigs.putAll((Map) configMap.get(key)); - break; - case "input" : - List> inputConfig = (List>) configMap.get(key); - inputConfigList.addAll(inputConfig); - break; - case "filter" : - List> filterConfig = (List>) configMap.get(key); - filterConfigList.addAll(filterConfig); + globalConfigJsons.add(configData); break; case "output" : List> outputConfig = (List>) configMap.get(key); @@ -192,21 +193,28 @@ public class ConfigHandler implements InputConfigMonitor { } } + @Override + public List getGlobalConfigJsons() { + return globalConfigJsons; + } + private void simulateIfNeeded() throws Exception { int simulatedInputNumber = LogFeederUtil.getIntProperty("logfeeder.simulate.input_number", 0); if (simulatedInputNumber == 0) return; - List> simulateInputConfigList = new ArrayList<>(); + InputConfigImpl simulateInputConfig = new InputConfigImpl(); + List inputConfigDescriptors = new ArrayList<>(); + simulateInputConfig.setInput(inputConfigDescriptors); + simulateInputConfig.setFilter(new ArrayList()); for (int i = 0; i < simulatedInputNumber; i++) { - HashMap mapList = new HashMap(); - mapList.put("source", "simulate"); - mapList.put("rowtype", "service"); - simulateInputConfigList.add(mapList); + InputDescriptorImpl inputDescriptor = new InputDescriptorImpl() {}; + inputDescriptor.setSource("simulate"); + inputDescriptor.setRowtype("service"); + inputDescriptor.setAddFields(new HashMap()); + inputConfigDescriptors.add(inputDescriptor); } - Map>> simulateInputConfigMap = ImmutableMap.of("input", simulateInputConfigList); - String simulateInputConfig = LogFeederUtil.getGson().toJson(simulateInputConfigMap); loadInputConfigs("Simulation", simulateInputConfig); simulateMode = true; @@ -233,7 +241,7 @@ public class ConfigHandler implements InputConfigMonitor { output.loadConfig(map); // We will only check for is_enabled out here. Down below we will check whether this output is enabled for the input - if (output.getBooleanValue("is_enabled", true)) { + if (output.isEnabled()) { output.logConfigs(Level.INFO); outputManager.add(output); } else { @@ -243,24 +251,23 @@ public class ConfigHandler implements InputConfigMonitor { } private void loadInputs(String serviceName) { - for (Map map : inputConfigList) { - if (map == null) { + for (InputDescriptor inputDescriptor : inputConfigList) { + if (inputDescriptor == null) { continue; } - mergeBlocks(globalConfigs, map); - String value = (String) map.get("source"); - if (StringUtils.isEmpty(value)) { + String source = (String) inputDescriptor.getSource(); + if (StringUtils.isEmpty(source)) { LOG.error("Input block doesn't have source element"); continue; } - Input input = (Input) AliasUtil.getClassInstance(value, AliasType.INPUT); + Input input = (Input) AliasUtil.getClassInstance(source, AliasType.INPUT); if (input == null) { LOG.error("Input object could not be found"); continue; } - input.setType(value); - input.loadConfig(map); + input.setType(source); + input.loadConfig(inputDescriptor); if (input.isEnabled()) { input.setOutputManager(outputManager); @@ -278,13 +285,20 @@ public class ConfigHandler implements InputConfigMonitor { List toRemoveInputList = new ArrayList(); for (Input input : inputManager.getInputList(serviceName)) { - for (Map map : filterConfigList) { - if (map == null) { + for (FilterDescriptor filterDescriptor : filterConfigList) { + if (filterDescriptor == null) { + continue; + } + if (BooleanUtils.isFalse(filterDescriptor.isEnabled())) { + LOG.debug("Ignoring filter " + filterDescriptor.getFilter() + " because it is disabled"); + continue; + } + if (!input.isFilterRequired(filterDescriptor)) { + LOG.debug("Ignoring filter " + filterDescriptor.getFilter() + " for input " + input.getShortDescription()); continue; } - mergeBlocks(globalConfigs, map); - String value = (String) map.get("filter"); + String value = filterDescriptor.getFilter(); if (StringUtils.isEmpty(value)) { LOG.error("Filter block doesn't have filter element"); continue; @@ -294,16 +308,12 @@ public class ConfigHandler implements InputConfigMonitor { LOG.error("Filter object could not be found"); continue; } - filter.loadConfig(map); + filter.loadConfig(filterDescriptor); filter.setInput(input); - if (filter.isEnabled()) { - filter.setOutputManager(outputManager); - input.addFilter(filter); - filter.logConfigs(Level.INFO); - } else { - LOG.debug("Ignoring filter " + filter.getShortDescription() + " for input " + input.getShortDescription()); - } + filter.setOutputManager(outputManager); + input.addFilter(filter); + filter.logConfigs(Level.INFO); } if (input.getFirstFilter() == null) { @@ -318,43 +328,25 @@ public class ConfigHandler implements InputConfigMonitor { } private void sortFilters() { - Collections.sort(filterConfigList, new Comparator>() { - + Collections.sort(filterConfigList, new Comparator() { @Override - public int compare(Map o1, Map o2) { - Object o1Sort = o1.get("sort_order"); - Object o2Sort = o2.get("sort_order"); + public int compare(FilterDescriptor o1, FilterDescriptor o2) { + Integer o1Sort = o1.getSortOrder(); + Integer o2Sort = o2.getSortOrder(); if (o1Sort == null || o2Sort == null) { return 0; } - int o1Value = parseSort(o1, o1Sort); - int o2Value = parseSort(o2, o2Sort); - - return o1Value - o2Value; - } - - private int parseSort(Map map, Object o) { - if (!(o instanceof Number)) { - try { - return (new Double(Double.parseDouble(o.toString()))).intValue(); - } catch (Throwable t) { - LOG.error("Value is not of type Number. class=" + o.getClass().getName() + ", value=" + o.toString() - + ", map=" + map.toString()); - return 0; - } - } else { - return ((Number) o).intValue(); - } + return o1Sort - o2Sort; } - }); + } ); } private void assignOutputsToInputs(String serviceName) { Set usedOutputSet = new HashSet(); for (Input input : inputManager.getInputList(serviceName)) { for (Output output : outputManager.getOutputs()) { - if (LogFeederUtil.isEnabled(output.getConfigs(), input.getConfigs())) { + if (input.isOutputRequired(output)) { usedOutputSet.add(output); input.addOutput(output); } http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigItem.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigItem.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigItem.java new file mode 100644 index 0000000..5c20a8e --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigItem.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logfeeder.common; + +import java.util.List; + +import org.apache.ambari.logfeeder.metrics.MetricData; +import org.apache.ambari.logfeeder.util.LogFeederUtil; +import org.apache.log4j.Logger; +import org.apache.log4j.Priority; + +public abstract class ConfigItem { + + protected static final Logger LOG = Logger.getLogger(ConfigBlock.class); + private boolean drain = false; + public MetricData statMetric = new MetricData(getStatMetricName(), false); + + public ConfigItem() { + super(); + } + + protected String getStatMetricName() { + return null; + } + + /** + * Used while logging. Keep it short and meaningful + */ + public abstract String getShortDescription(); + + /** + * Every implementor need to give name to the thread they create + */ + public String getNameForThread() { + return this.getClass().getSimpleName(); + } + + public void addMetricsContainers(List metricsList) { + metricsList.add(statMetric); + } + + /** + * This method needs to be overwritten by deriving classes. + */ + public void init() throws Exception { + } + + public abstract boolean isEnabled(); + + public void incrementStat(int count) { + statMetric.value += count; + } + + public void logStatForMetric(MetricData metric, String prefixStr) { + LogFeederUtil.logStatForMetric(metric, prefixStr, ", key=" + getShortDescription()); + } + + public synchronized void logStat() { + logStatForMetric(statMetric, "Stat"); + } + + public boolean logConfigs(Priority level) { + if (level.toInt() == Priority.INFO_INT && !LOG.isInfoEnabled()) { + return false; + } + if (level.toInt() == Priority.DEBUG_INT && !LOG.isDebugEnabled()) { + return false; + } + return true; + } + + public boolean isDrain() { + return drain; + } + + public void setDrain(boolean drain) { + this.drain = drain; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java index afd903e..fd02497 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java @@ -24,7 +24,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.ambari.logfeeder.common.ConfigBlock; +import org.apache.ambari.logfeeder.common.ConfigItem; import org.apache.ambari.logfeeder.common.LogfeederException; import org.apache.ambari.logfeeder.input.Input; import org.apache.ambari.logfeeder.input.InputMarker; @@ -33,18 +33,28 @@ import org.apache.ambari.logfeeder.metrics.MetricData; import org.apache.ambari.logfeeder.output.OutputManager; import org.apache.ambari.logfeeder.util.AliasUtil; import org.apache.ambari.logfeeder.util.AliasUtil.AliasType; -import org.apache.log4j.Logger; +import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterDescriptor; +import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldDescriptor; +import org.apache.ambari.logsearch.config.api.model.inputconfig.PostMapValues; +import org.apache.commons.lang.BooleanUtils; import org.apache.log4j.Priority; -public abstract class Filter extends ConfigBlock { - private static final Logger LOG = Logger.getLogger(Filter.class); - +public abstract class Filter extends ConfigItem { + protected FilterDescriptor filterDescriptor; protected Input input; private Filter nextFilter = null; private OutputManager outputManager; private Map> postFieldValueMappers = new HashMap>(); + public void loadConfig(FilterDescriptor filterDescriptor) { + this.filterDescriptor = filterDescriptor; + } + + public FilterDescriptor getFilterDescriptor() { + return filterDescriptor; + } + @Override public void init() throws Exception { super.init(); @@ -55,28 +65,22 @@ public abstract class Filter extends ConfigBlock { } } - @SuppressWarnings("unchecked") private void initializePostMapValues() { - Map postMapValues = (Map) getConfigValue("post_map_values"); + Map> postMapValues = filterDescriptor.getPostMapValues(); if (postMapValues == null) { return; } for (String fieldName : postMapValues.keySet()) { - List> mapList = null; - Object values = postMapValues.get(fieldName); - if (values instanceof List) { - mapList = (List>) values; - } else { - mapList = new ArrayList>(); - mapList.add((Map) values); - } - for (Map mapObject : mapList) { - for (String mapClassCode : mapObject.keySet()) { + List values = postMapValues.get(fieldName); + for (PostMapValues pmv : values) { + for (MapFieldDescriptor mapFieldDescriptor : pmv.getMappers()) { + String mapClassCode = mapFieldDescriptor.getJsonName(); Mapper mapper = (Mapper) AliasUtil.getClassInstance(mapClassCode, AliasType.MAPPER); if (mapper == null) { - break; + LOG.warn("Unknown mapper type: " + mapClassCode); + continue; } - if (mapper.init(getInput().getShortDescription(), fieldName, mapClassCode, mapObject.get(mapClassCode))) { + if (mapper.init(getInput().getShortDescription(), fieldName, mapClassCode, mapFieldDescriptor)) { List fieldMapList = postFieldValueMappers.get(fieldName); if (fieldMapList == null) { fieldMapList = new ArrayList(); @@ -156,15 +160,8 @@ public abstract class Filter extends ConfigBlock { } @Override - public boolean isFieldConditionMatch(String fieldName, String stringValue) { - if (!super.isFieldConditionMatch(fieldName, stringValue)) { - if (input != null) { - return input.isFieldConditionMatch(fieldName, stringValue); - } else { - return false; - } - } - return true; + public boolean isEnabled() { + return BooleanUtils.isNotFalse(filterDescriptor.isEnabled()); } @Override http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java index 7e2da70..70aea65 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java @@ -38,6 +38,8 @@ import org.apache.ambari.logfeeder.common.LogfeederException; import org.apache.ambari.logfeeder.input.InputMarker; import org.apache.ambari.logfeeder.metrics.MetricData; import org.apache.ambari.logfeeder.util.LogFeederUtil; +import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterGrokDescriptor; +import org.apache.commons.lang3.BooleanUtils; import org.apache.commons.lang3.StringUtils; import org.apache.log4j.Level; import org.apache.log4j.Logger; @@ -75,11 +77,10 @@ public class FilterGrok extends Filter { super.init(); try { - messagePattern = escapePattern(getStringValue("message_pattern")); - multilinePattern = escapePattern(getStringValue("multiline_pattern")); - sourceField = getStringValue("source_field"); - removeSourceField = getBooleanValue("remove_source_field", - removeSourceField); + messagePattern = escapePattern(((FilterGrokDescriptor)filterDescriptor).getMessagePattern()); + multilinePattern = escapePattern(((FilterGrokDescriptor)filterDescriptor).getMultilinePattern()); + sourceField = ((FilterGrokDescriptor)filterDescriptor).getSourceField(); + removeSourceField = BooleanUtils.toBooleanDefaultIfNull(filterDescriptor.isRemoveSourceField(), removeSourceField); LOG.info("init() done. grokPattern=" + messagePattern + ", multilinePattern=" + multilinePattern + ", " + getShortDescription()); http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java index 35f692e..cfccdeb 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java @@ -25,12 +25,9 @@ import org.apache.ambari.logfeeder.common.LogfeederException; import org.apache.ambari.logfeeder.input.InputMarker; import org.apache.ambari.logfeeder.util.DateUtil; import org.apache.ambari.logfeeder.util.LogFeederUtil; -import org.apache.log4j.Logger; public class FilterJSON extends Filter { - private static final Logger LOG = Logger.getLogger(FilterJSON.class); - @Override public void apply(String inputStr, InputMarker inputMarker) throws LogfeederException { Map jsonMap = null; http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java index b04a439..f2a4186 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java @@ -28,13 +28,11 @@ import org.apache.ambari.logfeeder.common.LogfeederException; import org.apache.ambari.logfeeder.input.InputMarker; import org.apache.ambari.logfeeder.metrics.MetricData; import org.apache.ambari.logfeeder.util.LogFeederUtil; +import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterKeyValueDescriptor; import org.apache.commons.lang3.StringUtils; import org.apache.log4j.Level; -import org.apache.log4j.Logger; public class FilterKeyValue extends Filter { - private static final Logger LOG = Logger.getLogger(FilterKeyValue.class); - private String sourceField = null; private String valueSplit = "="; private String fieldSplit = "\t"; @@ -46,10 +44,10 @@ public class FilterKeyValue extends Filter { public void init() throws Exception { super.init(); - sourceField = getStringValue("source_field"); - valueSplit = getStringValue("value_split", valueSplit); - fieldSplit = getStringValue("field_split", fieldSplit); - valueBorders = getStringValue("value_borders"); + sourceField = filterDescriptor.getSourceField(); + valueSplit = StringUtils.defaultString(((FilterKeyValueDescriptor)filterDescriptor).getValueSplit(), valueSplit); + fieldSplit = StringUtils.defaultString(((FilterKeyValueDescriptor)filterDescriptor).getFieldSplit(), fieldSplit); + valueBorders = ((FilterKeyValueDescriptor)filterDescriptor).getValueBorders(); LOG.info("init() done. source_field=" + sourceField + ", value_split=" + valueSplit + ", " + ", field_split=" + fieldSplit + ", " + getShortDescription()); http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java index 41a1fa5..cfa1903 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java @@ -29,14 +29,14 @@ import java.util.HashMap; import java.util.Map; import org.apache.ambari.logfeeder.util.LogFeederUtil; +import org.apache.ambari.logsearch.config.api.model.inputconfig.InputFileBaseDescriptor; +import org.apache.commons.lang.BooleanUtils; +import org.apache.commons.lang.ObjectUtils; import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.StringUtils; import org.apache.log4j.Level; -import org.apache.log4j.Logger; public abstract class AbstractInputFile extends Input { - protected static final Logger LOG = Logger.getLogger(AbstractInputFile.class); - private static final int DEFAULT_CHECKPOINT_INTERVAL_MS = 5 * 1000; protected File[] logFiles; @@ -73,16 +73,16 @@ public abstract class AbstractInputFile extends Input { // Let's close the file and set it to true after we start monitoring it setClosed(true); - logPath = getStringValue("path"); - tail = getBooleanValue("tail", tail); - checkPointIntervalMS = getIntValue("checkpoint.interval.ms", DEFAULT_CHECKPOINT_INTERVAL_MS); + logPath = inputDescriptor.getPath(); + tail = BooleanUtils.toBooleanDefaultIfNull(inputDescriptor.isTail(), tail); + checkPointIntervalMS = (int) ObjectUtils.defaultIfNull(((InputFileBaseDescriptor)inputDescriptor).getCheckpointIntervalMs(), DEFAULT_CHECKPOINT_INTERVAL_MS); if (StringUtils.isEmpty(logPath)) { LOG.error("path is empty for file input. " + getShortDescription()); return; } - String startPosition = getStringValue("start_position"); + String startPosition = inputDescriptor.getStartPosition(); if (StringUtils.isEmpty(startPosition) || startPosition.equalsIgnoreCase("beginning") || startPosition.equalsIgnoreCase("begining") || !tail) { isStartFromBegining = true; @@ -313,7 +313,7 @@ public abstract class AbstractInputFile extends Input { @Override public String getShortDescription() { - return "input:source=" + getStringValue("source") + ", path=" + + return "input:source=" + inputDescriptor.getSource() + ", path=" + (!ArrayUtils.isEmpty(logFiles) ? logFiles[0].getAbsolutePath() : logPath); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java index 9f54d8a..fba596d 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java @@ -21,23 +21,25 @@ package org.apache.ambari.logfeeder.input; import java.io.File; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.ambari.logfeeder.input.cache.LRUCache; -import org.apache.ambari.logfeeder.common.ConfigBlock; +import org.apache.ambari.logfeeder.common.ConfigItem; import org.apache.ambari.logfeeder.common.LogfeederException; import org.apache.ambari.logfeeder.filter.Filter; import org.apache.ambari.logfeeder.metrics.MetricData; import org.apache.ambari.logfeeder.output.Output; import org.apache.ambari.logfeeder.output.OutputManager; import org.apache.ambari.logfeeder.util.LogFeederUtil; -import org.apache.log4j.Logger; - -public abstract class Input extends ConfigBlock implements Runnable { - private static final Logger LOG = Logger.getLogger(Input.class); - +import org.apache.ambari.logsearch.config.api.model.inputconfig.Conditions; +import org.apache.ambari.logsearch.config.api.model.inputconfig.Fields; +import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterDescriptor; +import org.apache.ambari.logsearch.config.api.model.inputconfig.InputDescriptor; +import org.apache.commons.lang.BooleanUtils; +import org.apache.log4j.Priority; + +public abstract class Input extends ConfigItem implements Runnable { private static final boolean DEFAULT_TAIL = true; private static final boolean DEFAULT_USE_EVENT_MD5 = false; private static final boolean DEFAULT_GEN_EVENT_MD5 = true; @@ -47,12 +49,8 @@ public abstract class Input extends ConfigBlock implements Runnable { private static final long DEFAULT_CACHE_DEDUP_INTERVAL = 1000; private static final String DEFAULT_CACHE_KEY_FIELD = "log_message"; - private static final String CACHE_ENABLED = "cache_enabled"; - private static final String CACHE_KEY_FIELD = "cache_key_field"; - private static final String CACHE_LAST_DEDUP_ENABLED = "cache_last_dedup_enabled"; - private static final String CACHE_SIZE = "cache_size"; - private static final String CACHE_DEDUP_INTERVAL = "cache_dedup_interval"; - + protected InputDescriptor inputDescriptor; + protected InputManager inputManager; protected OutputManager outputManager; private List outputList = new ArrayList(); @@ -75,21 +73,12 @@ public abstract class Input extends ConfigBlock implements Runnable { return null; } - @Override - public void loadConfig(Map map) { - super.loadConfig(map); - String typeValue = getStringValue("type"); - if (typeValue != null) { - // Explicitly add type and value to field list - contextFields.put("type", typeValue); - @SuppressWarnings("unchecked") - Map addFields = (Map) map.get("add_fields"); - if (addFields == null) { - addFields = new HashMap(); - map.put("add_fields", addFields); - } - addFields.put("type", typeValue); - } + public void loadConfig(InputDescriptor inputDescriptor) { + this.inputDescriptor = inputDescriptor; + } + + public InputDescriptor getInputDescriptor() { + return inputDescriptor; } public void setType(String type) { @@ -104,6 +93,12 @@ public abstract class Input extends ConfigBlock implements Runnable { this.outputManager = outputManager; } + public boolean isFilterRequired(FilterDescriptor filterDescriptor) { + Conditions conditions = filterDescriptor.getConditions(); + Fields fields = conditions.getFields(); + return fields.getType().contains(inputDescriptor.getType()); + } + public void addFilter(Filter filter) { if (firstFilter == null) { firstFilter = filter; @@ -116,6 +111,22 @@ public abstract class Input extends ConfigBlock implements Runnable { } } + @SuppressWarnings("unchecked") + public boolean isOutputRequired(Output output) { + Map conditions = (Map) output.getConfigs().get("conditions"); + if (conditions == null) { + return false; + } + + Map fields = (Map) conditions.get("fields"); + if (fields == null) { + return false; + } + + List types = (List) fields.get("rowtype"); + return types.contains(inputDescriptor.getRowtype()); + } + public void addOutput(Output output) { outputList.add(output); } @@ -124,9 +135,9 @@ public abstract class Input extends ConfigBlock implements Runnable { public void init() throws Exception { super.init(); initCache(); - tail = getBooleanValue("tail", DEFAULT_TAIL); - useEventMD5 = getBooleanValue("use_event_md5_as_id", DEFAULT_USE_EVENT_MD5); - genEventMD5 = getBooleanValue("gen_event_md5", DEFAULT_GEN_EVENT_MD5); + tail = BooleanUtils.toBooleanDefaultIfNull(inputDescriptor.isTail(), DEFAULT_TAIL); + useEventMD5 = BooleanUtils.toBooleanDefaultIfNull(inputDescriptor.isUseEventMd5AsId(), DEFAULT_USE_EVENT_MD5); + genEventMD5 = BooleanUtils.toBooleanDefaultIfNull(inputDescriptor.isGenEventMd5(), DEFAULT_GEN_EVENT_MD5); if (firstFilter != null) { firstFilter.init(); @@ -236,26 +247,26 @@ public abstract class Input extends ConfigBlock implements Runnable { } private void initCache() { - boolean cacheEnabled = getConfigValue(CACHE_ENABLED) != null - ? getBooleanValue(CACHE_ENABLED, DEFAULT_CACHE_ENABLED) + boolean cacheEnabled = inputDescriptor.isCacheEnabled() != null + ? inputDescriptor.isCacheEnabled() : LogFeederUtil.getBooleanProperty("logfeeder.cache.enabled", DEFAULT_CACHE_ENABLED); if (cacheEnabled) { - String cacheKeyField = getConfigValue(CACHE_KEY_FIELD) != null - ? getStringValue(CACHE_KEY_FIELD) + String cacheKeyField = inputDescriptor.getCacheKeyField() != null + ? inputDescriptor.getCacheKeyField() : LogFeederUtil.getStringProperty("logfeeder.cache.key.field", DEFAULT_CACHE_KEY_FIELD); - setCacheKeyField(getStringValue(cacheKeyField)); + setCacheKeyField(cacheKeyField); - boolean cacheLastDedupEnabled = getConfigValue(CACHE_LAST_DEDUP_ENABLED) != null - ? getBooleanValue(CACHE_LAST_DEDUP_ENABLED, DEFAULT_CACHE_DEDUP_LAST) + boolean cacheLastDedupEnabled = inputDescriptor.getCacheLastDedupEnabled() != null + ? inputDescriptor.getCacheLastDedupEnabled() : LogFeederUtil.getBooleanProperty("logfeeder.cache.last.dedup.enabled", DEFAULT_CACHE_DEDUP_LAST); - int cacheSize = getConfigValue(CACHE_SIZE) != null - ? getIntValue(CACHE_SIZE, DEFAULT_CACHE_SIZE) + int cacheSize = inputDescriptor.getCacheSize() != null + ? inputDescriptor.getCacheSize() : LogFeederUtil.getIntProperty("logfeeder.cache.size", DEFAULT_CACHE_SIZE); - long cacheDedupInterval = getConfigValue(CACHE_DEDUP_INTERVAL) != null - ? getLongValue(CACHE_DEDUP_INTERVAL, DEFAULT_CACHE_DEDUP_INTERVAL) + long cacheDedupInterval = inputDescriptor.getCacheDedupInterval() != null + ? inputDescriptor.getCacheDedupInterval() : Long.parseLong(LogFeederUtil.getStringProperty("logfeeder.cache.dedup.interval", String.valueOf(DEFAULT_CACHE_DEDUP_INTERVAL))); setCache(new LRUCache(cacheSize, filePath, cacheDedupInterval, cacheLastDedupEnabled)); @@ -319,6 +330,11 @@ public abstract class Input extends ConfigBlock implements Runnable { } @Override + public boolean isEnabled() { + return BooleanUtils.isNotFalse(inputDescriptor.isEnabled()); + } + + @Override public String getNameForThread() { if (filePath != null) { try { @@ -331,7 +347,17 @@ public abstract class Input extends ConfigBlock implements Runnable { } @Override + public boolean logConfigs(Priority level) { + if (!super.logConfigs(level)) { + return false; + } + LOG.log(level, "Printing Input=" + getShortDescription()); + LOG.log(level, "description=" + inputDescriptor.getPath()); + return true; + } + + @Override public String toString() { return getShortDescription(); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java index 3737839..fc40ca4 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java @@ -25,7 +25,9 @@ import java.io.FileNotFoundException; import org.apache.ambari.logfeeder.input.reader.LogsearchReaderFactory; import org.apache.ambari.logfeeder.util.FileUtil; +import org.apache.ambari.logsearch.config.api.model.inputconfig.InputFileDescriptor; import org.apache.commons.io.filefilter.WildcardFileFilter; +import org.apache.commons.lang.BooleanUtils; import org.apache.commons.lang3.ArrayUtils; import org.apache.solr.common.util.Base64; @@ -62,7 +64,7 @@ public class InputFile extends AbstractInputFile { @Override void start() throws Exception { - boolean isProcessFile = getBooleanValue("process_file", true); + boolean isProcessFile = BooleanUtils.toBooleanDefaultIfNull(((InputFileDescriptor)inputDescriptor).getProcessFile(), true); if (isProcessFile) { if (tail) { processFile(logFiles[0]); @@ -100,7 +102,7 @@ public class InputFile extends AbstractInputFile { } private void copyFiles(File[] files) { - boolean isCopyFile = getBooleanValue("copy_file", false); + boolean isCopyFile = BooleanUtils.toBooleanDefaultIfNull(((InputFileDescriptor)inputDescriptor).getCopyFile(), false); if (isCopyFile && files != null) { for (File file : files) { try { http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java index f560379..4bf162b 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java @@ -23,6 +23,7 @@ import java.io.File; import java.io.IOException; import org.apache.ambari.logfeeder.util.S3Util; +import org.apache.ambari.logsearch.config.api.model.inputconfig.InputS3FileDescriptor; import org.apache.commons.lang.ArrayUtils; import org.apache.solr.common.util.Base64; @@ -78,8 +79,8 @@ public class InputS3File extends AbstractInputFile { @Override protected BufferedReader openLogFile(File logPathFile) throws IOException { - String s3AccessKey = getStringValue("s3_access_key"); - String s3SecretKey = getStringValue("s3_secret_key"); + String s3AccessKey = ((InputS3FileDescriptor)inputDescriptor).getS3AccessKey(); + String s3SecretKey = ((InputS3FileDescriptor)inputDescriptor).getS3SecretKey(); BufferedReader br = S3Util.getReader(logPathFile.getPath(), s3AccessKey, s3SecretKey); fileKey = getFileKey(logPathFile); base64FileKey = Base64.byteArrayToBase64(fileKey.toString().getBytes()); http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java index d193cdb..5e7bdb3 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java @@ -21,7 +21,6 @@ package org.apache.ambari.logfeeder.input; import java.net.InetAddress; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.List; @@ -35,25 +34,23 @@ import org.apache.ambari.logfeeder.filter.Filter; import org.apache.ambari.logfeeder.filter.FilterJSON; import org.apache.ambari.logfeeder.output.Output; import org.apache.ambari.logfeeder.util.LogFeederUtil; +import org.apache.ambari.logsearch.config.api.model.inputconfig.InputDescriptor; +import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.FilterJsonDescriptorImpl; +import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputDescriptorImpl; import org.apache.commons.collections.MapUtils; -import org.apache.log4j.Logger; import org.apache.solr.common.util.Base64; import com.google.common.base.Joiner; public class InputSimulate extends Input { - private static final Logger LOG = Logger.getLogger(InputSimulate.class); - private static final String LOG_TEXT_PATTERN = "{ logtime=\"%d\", level=\"%s\", log_message=\"%s\", host=\"%s\"}"; private static final Map typeToFilePath = new HashMap<>(); - private static List inputTypes = new ArrayList<>(); - public static void loadTypeToFilePath(List> inputList) { - for (Map input : inputList) { - if (input.containsKey("type") && input.containsKey("path")) { - typeToFilePath.put((String)input.get("type"), (String)input.get("path")); - inputTypes.add((String)input.get("type")); - } + private static final List inputTypes = new ArrayList<>(); + public static void loadTypeToFilePath(List inputList) { + for (InputDescriptor input : inputList) { + typeToFilePath.put(input.getType(), input.getPath()); + inputTypes.add(input.getType()); } } @@ -86,7 +83,7 @@ public class InputSimulate extends Input { this.host = "#" + hostNumber.incrementAndGet() + "-" + LogFeederUtil.hostName; Filter filter = new FilterJSON(); - filter.loadConfig(Collections. emptyMap()); + filter.loadConfig(new FilterJsonDescriptorImpl()); filter.setInput(this); addFilter(filter); } @@ -141,7 +138,7 @@ public class InputSimulate extends Input { String type = types.get(typePos); String filePath = MapUtils.getString(typeToFilePath, type, "path of " + type); - configs.put("type", type); + ((InputDescriptorImpl)inputDescriptor).setType(type); setFilePath(filePath); return type; http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/FilterLogData.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/FilterLogData.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/FilterLogData.java index 1f635af..6173f53 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/FilterLogData.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/FilterLogData.java @@ -44,7 +44,7 @@ public enum FilterLogData { } public boolean isAllowed(Map jsonObj, InputMarker inputMarker) { - if ("audit".equals(inputMarker.input.getConfigs().get(LogFeederConstants.ROW_TYPE))) + if ("audit".equals(inputMarker.input.getInputDescriptor().getRowtype())) return true; boolean isAllowed = applyFilter(jsonObj); http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/Mapper.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/Mapper.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/Mapper.java index 96709c0..5facf76 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/Mapper.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/Mapper.java @@ -21,12 +21,14 @@ package org.apache.ambari.logfeeder.mapper; import java.util.Map; +import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldDescriptor; + public abstract class Mapper { private String inputDesc; protected String fieldName; private String mapClassCode; - public abstract boolean init(String inputDesc, String fieldName, String mapClassCode, Object mapConfigs); + public abstract boolean init(String inputDesc, String fieldName, String mapClassCode, MapFieldDescriptor mapFieldDescriptor); protected void init(String inputDesc, String fieldName, String mapClassCode) { this.inputDesc = inputDesc; http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java index 6a7fad7..5d34c06 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java @@ -26,6 +26,8 @@ import java.util.Map; import org.apache.ambari.logfeeder.common.LogFeederConstants; import org.apache.ambari.logfeeder.util.LogFeederUtil; +import org.apache.ambari.logsearch.config.api.model.inputconfig.MapDateDescriptor; +import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldDescriptor; import org.apache.commons.lang.time.DateUtils; import org.apache.commons.lang3.StringUtils; import org.apache.log4j.Level; @@ -39,18 +41,11 @@ public class MapperDate extends Mapper { private SimpleDateFormat srcDateFormatter=null; @Override - public boolean init(String inputDesc, String fieldName, String mapClassCode, Object mapConfigs) { + public boolean init(String inputDesc, String fieldName, String mapClassCode, MapFieldDescriptor mapFieldDescriptor) { init(inputDesc, fieldName, mapClassCode); - if (!(mapConfigs instanceof Map)) { - LOG.fatal("Can't initialize object. mapConfigs class is not of type Map. " + mapConfigs.getClass().getName() + - ", map=" + this); - return false; - } - @SuppressWarnings("unchecked") - Map mapObjects = (Map) mapConfigs; - String targetDateFormat = (String) mapObjects.get("target_date_pattern"); - String srcDateFormat = (String) mapObjects.get("src_date_pattern"); + String targetDateFormat = ((MapDateDescriptor)mapFieldDescriptor).getTargetDatePattern(); + String srcDateFormat = ((MapDateDescriptor)mapFieldDescriptor).getSourceDatePattern(); if (StringUtils.isEmpty(targetDateFormat)) { LOG.fatal("Date format for map is empty. " + this); } else { http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldCopy.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldCopy.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldCopy.java index 39e1ff4..a463f49 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldCopy.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldCopy.java @@ -21,6 +21,8 @@ package org.apache.ambari.logfeeder.mapper; import java.util.Map; +import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldCopyDescriptor; +import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldDescriptor; import org.apache.commons.lang3.StringUtils; import org.apache.log4j.Logger; @@ -33,16 +35,9 @@ public class MapperFieldCopy extends Mapper { private String copyName = null; @Override - public boolean init(String inputDesc, String fieldName, String mapClassCode, Object mapConfigs) { + public boolean init(String inputDesc, String fieldName, String mapClassCode, MapFieldDescriptor mapFieldDescriptor) { init(inputDesc, fieldName, mapClassCode); - if (!(mapConfigs instanceof Map)) { - LOG.fatal("Can't initialize object. mapConfigs class is not of type Map. " + mapConfigs.getClass().getName()); - return false; - } - - @SuppressWarnings("unchecked") - Map mapObjects = (Map) mapConfigs; - copyName = (String) mapObjects.get("copy_name"); + copyName = ((MapFieldCopyDescriptor)mapFieldDescriptor).getCopyName(); if (StringUtils.isEmpty(copyName)) { LOG.fatal("Map copy name is empty."); return false; http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java index 9b6e83c..3f160da 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java @@ -22,6 +22,8 @@ package org.apache.ambari.logfeeder.mapper; import java.util.Map; import org.apache.ambari.logfeeder.util.LogFeederUtil; +import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldDescriptor; +import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldNameDescriptor; import org.apache.commons.lang3.StringUtils; import org.apache.log4j.Level; import org.apache.log4j.Logger; @@ -35,16 +37,10 @@ public class MapperFieldName extends Mapper { private String newValue = null; @Override - public boolean init(String inputDesc, String fieldName, String mapClassCode, Object mapConfigs) { + public boolean init(String inputDesc, String fieldName, String mapClassCode, MapFieldDescriptor mapFieldDescriptor) { init(inputDesc, fieldName, mapClassCode); - if (!(mapConfigs instanceof Map)) { - LOG.fatal("Can't initialize object. mapConfigs class is not of type Map. " + mapConfigs.getClass().getName()); - return false; - } - - @SuppressWarnings("unchecked") - Map mapObjects = (Map) mapConfigs; - newValue = (String) mapObjects.get("new_fieldname"); + + newValue = ((MapFieldNameDescriptor)mapFieldDescriptor).getNewFieldName(); if (StringUtils.isEmpty(newValue)) { LOG.fatal("Map field value is empty."); return false; http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java index 87cda65..03ff95b 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java @@ -22,6 +22,8 @@ package org.apache.ambari.logfeeder.mapper; import java.util.Map; import org.apache.ambari.logfeeder.util.LogFeederUtil; +import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldDescriptor; +import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldValueDescriptor; import org.apache.commons.lang3.StringUtils; import org.apache.log4j.Level; import org.apache.log4j.Logger; @@ -36,17 +38,11 @@ public class MapperFieldValue extends Mapper { private String newValue = null; @Override - public boolean init(String inputDesc, String fieldName, String mapClassCode, Object mapConfigs) { + public boolean init(String inputDesc, String fieldName, String mapClassCode, MapFieldDescriptor mapFieldDescriptor) { init(inputDesc, fieldName, mapClassCode); - if (!(mapConfigs instanceof Map)) { - LOG.fatal("Can't initialize object. mapConfigs class is not of type Map. " + mapConfigs.getClass().getName()); - return false; - } - @SuppressWarnings("unchecked") - Map mapObjects = (Map) mapConfigs; - prevValue = (String) mapObjects.get("pre_value"); - newValue = (String) mapObjects.get("post_value"); + prevValue = ((MapFieldValueDescriptor)mapFieldDescriptor).getPreValue(); + newValue = ((MapFieldValueDescriptor)mapFieldDescriptor).getPostValue();; if (StringUtils.isEmpty(newValue)) { LOG.fatal("Map field value is empty."); return false; http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java index bc6a553..65b9e19 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java @@ -28,11 +28,8 @@ import org.apache.ambari.logfeeder.common.ConfigBlock; import org.apache.ambari.logfeeder.input.InputMarker; import org.apache.ambari.logfeeder.metrics.MetricData; import org.apache.ambari.logfeeder.util.LogFeederUtil; -import org.apache.log4j.Logger; public abstract class Output extends ConfigBlock { - private static final Logger LOG = Logger.getLogger(Output.class); - private String destination = null; protected MetricData writeBytesMetric = new MetricData(getWriteBytesMetricName(), false); http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineFilter.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineFilter.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineFilter.java index fcf2695..8308a4f 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineFilter.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineFilter.java @@ -41,7 +41,7 @@ public class OutputLineFilter { public Boolean apply(Map lineMap, Input input) { boolean isLogFilteredOut = false; LRUCache inputLruCache = input.getCache(); - if (inputLruCache != null && "service".equals(input.getConfigs().get(LogFeederConstants.ROW_TYPE))) { + if (inputLruCache != null && "service".equals(input.getInputDescriptor().getRowtype())) { String logMessage = (String) lineMap.get(input.getCacheKeyField()); Long timestamp = null; if (lineMap.containsKey((LogFeederConstants.IN_MEMORY_TIMESTAMP))) {